rpm.py 3.37 KB
Newer Older
1 2
# -*- coding: utf-8 -*-
#
Jérémy Bobbio's avatar
Jérémy Bobbio committed
3
# diffoscope: in-depth comparison of files, archives, and directories
4 5
#
# Copyright © 2015 Reiner Herrmann <reiner@reiner-h.de>
6
#             2015 Jérémy Bobbio <lunar@debian.org>
7
#
Jérémy Bobbio's avatar
Jérémy Bobbio committed
8
# diffoscope is free software: you can redistribute it and/or modify
9 10 11 12
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
Jérémy Bobbio's avatar
Jérémy Bobbio committed
13
# diffoscope is distributed in the hope that it will be useful,
14 15 16 17 18
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
19
# along with diffoscope.  If not, see <https://www.gnu.org/licenses/>.
20

Chris Lamb's avatar
Chris Lamb committed
21 22
import io
import rpm
23
import os.path
24
import logging
Chris Lamb's avatar
Chris Lamb committed
25
import binascii
26
import subprocess
Chris Lamb's avatar
Chris Lamb committed
27

28
from diffoscope.tools import tool_required
29
from diffoscope.tempfiles import get_temporary_directory
30
from diffoscope.difference import Difference
31 32

from .rpm_fallback import AbstractRpmFile
33
from .utils.archive import Archive
Chris Lamb's avatar
Chris Lamb committed
34

35 36
logger = logging.getLogger(__name__)

37

38 39 40 41 42 43 44 45 46 47
def convert_header_field(io, header):
    if isinstance(header, list):
        if len(header) == 0:
            io.write(u"[]")
        else:
            io.write(u"\n")
            for item in header:
                io.write(u" - ")
                convert_header_field(io, item)
    elif isinstance(header, str):
Jérémy Bobbio's avatar
Jérémy Bobbio committed
48 49
        io.write(header)
    elif isinstance(header, bytes):
50 51 52
        try:
            io.write(header.decode('utf-8'))
        except UnicodeDecodeError:
Chris Lamb's avatar
Chris Lamb committed
53
            io.write(binascii.hexlify(header).decode('us-ascii'))
54
    else:
Jérémy Bobbio's avatar
Jérémy Bobbio committed
55
        io.write(repr(header))
56 57


58
def get_rpm_header(path, ts):
Chris Lamb's avatar
Chris Lamb committed
59
    s = io.StringIO()
60
    with open(path, 'r') as f:
61 62
        try:
            hdr = ts.hdrFromFdno(f)
63
        except rpm.error as e:
64 65
            logger.error("reading rpm header failed: %s", str(e))
            return str(e)
66 67 68
        for rpmtag in sorted(rpm.tagnames):
            if rpmtag not in hdr:
                continue
69 70 71 72
            s.write(u"%s: " % rpm.tagnames[rpmtag])
            convert_header_field(s, hdr[rpmtag])
            s.write(u"\n")
    return s.getvalue()
73 74


75
def compare_rpm_headers(path1, path2):
76
    # compare headers
77
    with get_temporary_directory() as rpmdb_dir:
78 79 80 81 82
        rpm.addMacro("_dbpath", rpmdb_dir)
        ts = rpm.TransactionSet()
        ts.setVSFlags(-1)
        header1 = get_rpm_header(path1, ts)
        header2 = get_rpm_header(path2, ts)
83
    return Difference.from_text(header1, header2, path1, path2, source="header")
84 85 86


class RpmContainer(Archive):
87
    def open_archive(self):
88 89
        return self

90 91 92
    def close_archive(self):
        pass

93 94 95 96 97 98 99
    def get_member_names(self):
        return ['content']

    @tool_required('rpm2cpio')
    def extract(self, member_name, dest_dir):
        assert member_name == 'content'
        dest_path = os.path.join(dest_dir, 'content')
100
        cmd = ['rpm2cpio', self.source.path]
101
        with open(dest_path, 'wb') as dest:
102 103
            subprocess.check_call(
                cmd, shell=False, stdout=dest, stderr=subprocess.PIPE)
104
        return dest_path
105 106


107
class RpmFile(AbstractRpmFile):
108
    DESCRIPTION = "RPM archives"
109 110
    CONTAINER_CLASS = RpmContainer

111
    def compare_details(self, other, source=None):
112
        return [compare_rpm_headers(self.path, other.path)]