squashfs.py 8.72 KB
Newer Older
1 2
# -*- coding: utf-8 -*-
#
Jérémy Bobbio's avatar
Jérémy Bobbio committed
3
# diffoscope: in-depth comparison of files, archives, and directories
4 5
#
# Copyright © 2015 Reiner Herrmann <reiner@reiner-h.de>
6
#             2015 Jérémy Bobbio <lunar@debian.org>
7
#
Jérémy Bobbio's avatar
Jérémy Bobbio committed
8
# diffoscope is free software: you can redistribute it and/or modify
9 10 11 12
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
Jérémy Bobbio's avatar
Jérémy Bobbio committed
13
# diffoscope is distributed in the hope that it will be useful,
14 15 16 17 18
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
19
# along with diffoscope.  If not, see <https://www.gnu.org/licenses/>.
20

21
import os
22
import re
23
import stat
24
import logging
25
import functools
Chris Lamb's avatar
Chris Lamb committed
26 27 28
import subprocess
import collections

29
from diffoscope.tools import tool_required
Chris Lamb's avatar
Chris Lamb committed
30
from diffoscope.difference import Difference
31
from diffoscope.tempfiles import get_temporary_directory
32

33
from .utils.file import File
34 35 36
from .device import Device
from .symlink import Symlink
from .directory import Directory
37 38
from .utils.archive import Archive, ArchiveMember
from .utils.command import Command
39

40 41
logger = logging.getLogger(__name__)

42 43 44 45 46 47

class SquashfsSuperblock(Command):
    @tool_required('unsquashfs')
    def cmdline(self):
        return ['unsquashfs', '-s', self.path]

48 49
    def filter(self, line):
        # strip filename
Chris Lamb's avatar
Chris Lamb committed
50 51 52 53 54
        return re.sub(
            r'^(Found a valid .*) on .*',
            '\\1',
            line.decode('utf-8'),
        ).encode('utf-8')
55

56 57 58 59 60

class SquashfsListing(Command):
    @tool_required('unsquashfs')
    def cmdline(self):
        return ['unsquashfs', '-d', '', '-lls', self.path]
61 62


63 64 65 66
class SquashfsInvalidLineFormat(Exception):
    pass


67 68 69 70 71 72 73 74 75 76
class SquashfsMember(ArchiveMember):
    def is_directory(self):
        return False

    def is_symlink(self):
        return False

    def is_device(self):
        return False

77 78 79 80 81 82
    @property
    def path(self):
        # Use our extracted version and also avoid creating a temporary
        # directory per-file in ArchiveMember.path.
        return os.path.join(self.container._temp_dir, self._name)

83 84 85 86 87
    @property
    def name(self):
        # Don't include the leading "." in the output  (eg. "./etc/shadow")
        return self._name[1:]

88 89 90 91

class SquashfsRegularFile(SquashfsMember):
    # Example line:
    # -rw-r--r-- user/group   446 2015-06-24 14:49 squashfs-root/text
92 93
    LINE_RE = re.compile(
        r'^\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+(?P<member_name>.*)$')
94

95 96
    @staticmethod
    def parse(line):
97
        m = SquashfsRegularFile.LINE_RE.match(line)
98
        if not m:
99
            raise SquashfsInvalidLineFormat("invalid line format")
100 101 102 103
        return m.groupdict()

    def __init__(self, archive, member_name):
        SquashfsMember.__init__(self, archive, member_name)
104 105 106 107 108


class SquashfsDirectory(Directory, SquashfsMember):
    # Example line:
    # drwxr-xr-x user/group    51 2015-06-24 14:47 squashfs-root
109 110
    LINE_RE = re.compile(
        r'^\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+(?P<member_name>.*)$')
111

112 113
    @staticmethod
    def parse(line):
114
        m = SquashfsDirectory.LINE_RE.match(line)
115
        if not m:
116
            raise SquashfsInvalidLineFormat("invalid line format")
117 118 119 120
        return m.groupdict()

    def __init__(self, archive, member_name):
        SquashfsMember.__init__(self, archive, member_name or '/')
121 122 123 124 125 126 127

    def compare(self, other, source=None):
        return None

    def has_same_content_as(self, other):
        return False

128 129
    @property
    def path(self):
130 131
        raise NotImplementedError(
            "SquashfsDirectory is not meant to be extracted.")
132

133 134 135 136
    def is_directory(self):
        return True

    def get_member_names(self):
137
        raise ValueError("squashfs are compared as a whole.")  # noqa
138 139

    def get_member(self, member_name):
140
        raise ValueError("squashfs are compared as a whole.")  # noqa
141 142 143 144 145


class SquashfsSymlink(Symlink, SquashfsMember):
    # Example line:
    # lrwxrwxrwx user/group   6 2015-06-24 14:47 squashfs-root/link -> broken
Chris Lamb's avatar
Chris Lamb committed
146 147 148
    LINE_RE = re.compile(
        r'^\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+(?P<member_name>.*)\s+->\s+(?P<destination>.*)$',
    )
149

150 151
    @staticmethod
    def parse(line):
152
        m = SquashfsSymlink.LINE_RE.match(line)
153
        if not m:
154
            raise SquashfsInvalidLineFormat("invalid line format")
155 156 157 158 159
        return m.groupdict()

    def __init__(self, archive, member_name, destination):
        SquashfsMember.__init__(self, archive, member_name)
        self._destination = destination
160 161 162 163 164 165 166 167 168 169 170 171

    def is_symlink(self):
        return True

    @property
    def symlink_destination(self):
        return self._destination


class SquashfsDevice(Device, SquashfsMember):
    # Example line:
    # crw-r--r-- root/root  1,  3 2015-06-24 14:47 squashfs-root/null
Chris Lamb's avatar
Chris Lamb committed
172 173 174
    LINE_RE = re.compile(
        r'^(?P<kind>c|b)\S+\s+\S+\s+(?P<major>\d+),\s*(?P<minor>\d+)\s+\S+\s+\S+\s+(?P<member_name>.*)$',
    )
175

Chris Lamb's avatar
Chris Lamb committed
176 177 178 179
    KIND_MAP = {
        'c': stat.S_IFCHR,
        'b': stat.S_IFBLK,
    }
180

181 182
    @staticmethod
    def parse(line):
183
        m = SquashfsDevice.LINE_RE.match(line)
184
        if not m:
185
            raise SquashfsInvalidLineFormat("invalid line format")
Chris Lamb's avatar
Chris Lamb committed
186

187 188 189 190 191
        d = m.groupdict()
        try:
            d['mode'] = SquashfsDevice.KIND_MAP[d['kind']]
            del d['kind']
        except KeyError:
192 193
            raise SquashfsInvalidLineFormat(
                "unknown device kind %s" % d['kind'])
Chris Lamb's avatar
Chris Lamb committed
194

195 196 197
        try:
            d['major'] = int(d['major'])
        except ValueError:
Chris Lamb's avatar
Chris Lamb committed
198 199 200 201
            raise SquashfsInvalidLineFormat(
                "unable to parse major number %s" % d['major'],
            )

202 203 204
        try:
            d['minor'] = int(d['minor'])
        except ValueError:
Chris Lamb's avatar
Chris Lamb committed
205 206 207
            raise SquashfsInvalidLineFormat(
                "unable to parse minor number %s" % d['minor'],
            )
208 209 210 211 212 213 214
        return d

    def __init__(self, archive, member_name, mode, major, minor):
        SquashfsMember.__init__(self, archive, member_name)
        self._mode = mode
        self._major = major
        self._minor = minor
215 216 217 218 219 220 221 222 223

    def get_device(self):
        return (self._mode, self._major, self._minor)

    def is_device(self):
        return True


class SquashfsContainer(Archive):
224 225
    auto_diff_metadata = False

226 227 228 229 230 231 232
    MEMBER_CLASS = {
        'd': SquashfsDirectory,
        'l': SquashfsSymlink,
        'c': SquashfsDevice,
        'b': SquashfsDevice,
        '-': SquashfsRegularFile
    }
233

234
    def open_archive(self):
235
        return True
236 237 238 239

    def close_archive(self):
        pass

240 241 242 243 244 245 246 247 248 249
    def get_member(self, member_name):
        self.ensure_unpacked()
        cls, kwargs = self._members[member_name]
        return cls(self, member_name, **kwargs)

    def extract(self, member_name, destdir):
        # Ignore destdir argument and use our unpacked path
        self.ensure_unpacked()
        return member_name

250
    def get_member_names(self):
251 252
        self.ensure_unpacked()
        return self._members.keys()
253

254 255 256
    def ensure_unpacked(self):
        if hasattr(self, '_members'):
            return
257

258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
        self._members = collections.OrderedDict()
        self._temp_dir = get_temporary_directory().name

        logger.debug("Extracting %s to %s", self.source.path, self._temp_dir)

        output = subprocess.check_output((
            'unsquashfs',
            '-n',
            '-f',
            '-no',
            '-li',
            '-d', '.',
            self.source.path,
        ), stderr=subprocess.PIPE, cwd=self._temp_dir)

        output = iter(output.decode('utf-8').rstrip('\n').split('\n'))

        # Skip headers
        for _ in iter(functools.partial(next, output), ''):
            pass

        for line in output:
            if not line:
                continue

            try:
                cls = self.MEMBER_CLASS[line[0]]
            except KeyError:
                logger.debug("Unknown squashfs entry: %s", line)
                continue

            try:
                kwargs = cls.parse(line)
            except SquashfsInvalidLineFormat:
                continue

294 295
            # Pop to avoid duplicating member name in the key and the value
            member_name = kwargs.pop('member_name')
296 297 298 299 300 301 302

            self._members[member_name] = (cls, kwargs)

        logger.debug(
            "Extracted %d entries from %s to %s",
            len(self._members), self.source.path, self._temp_dir,
        )
303 304 305


class SquashfsFile(File):
306
    DESCRIPTION = "SquashFS filesystems"
307
    CONTAINER_CLASS = SquashfsContainer
308
    FILE_TYPE_RE = re.compile(r'^Squashfs filesystem\b')
309 310

    def compare_details(self, other, source=None):
Chris Lamb's avatar
Chris Lamb committed
311 312 313 314
        return [
            Difference.from_command(SquashfsSuperblock, self.path, other.path),
            Difference.from_command(SquashfsListing, self.path, other.path),
        ]