Commit f495df98 authored by Markus Lehtonen's avatar Markus Lehtonen Committed by Guido Günther

Refactor deb helpers: move UpstreamSource class

to pkg base module. This refactor is preparation to the upcoming rpm
support.
parent 082679da
......@@ -28,7 +28,7 @@ import gbp.command_wrappers as gbpc
from gbp.errors import GbpError
from gbp.git import GitRepositoryError
from gbp.deb.changelog import ChangeLog, NoChangeLogError
from gbp.pkg import (PkgPolicy, compressor_opts, compressor_aliases)
from gbp.pkg import (PkgPolicy, UpstreamSource, compressor_opts)
# When trying to parse a version-number from a dsc or changes file, these are
# the valid characters.
......@@ -113,212 +113,6 @@ class DpkgCompareVersions(gbpc.Command):
return 0
class UpstreamSource(object):
"""
Upstream source. Can be either an unpacked dir, a tarball or another type
of archive
@cvar _orig: are the upstream sources already suitable as an upstream
tarball
@type _orig: boolean
@cvar _path: path to the upstream sources
@type _path: string
@cvar _unpacked: path to the unpacked source tree
@type _unpacked: string
"""
def __init__(self, name, unpacked=None):
self._orig = False
self._path = name
self.unpacked = unpacked
self._check_orig()
if self.is_dir():
self.unpacked = self.path
def _check_orig(self):
"""
Check if upstream source format can be used as orig tarball.
This doesn't imply that the tarball is correctly named.
@return: C{True} if upstream source format is suitable
as upstream tarball, C{False} otherwise.
@rtype: C{bool}
"""
if self.is_dir():
self._orig = False
return
parts = self._path.split('.')
try:
if parts[-1] == 'tgz':
self._orig = True
elif parts[-2] == 'tar':
if (parts[-1] in compressor_opts or
parts[-1] in compressor_aliases):
self._orig = True
except IndexError:
self._orig = False
def is_orig(self):
"""
@return: C{True} if sources are suitable as upstream source,
C{False} otherwise
@rtype: C{bool}
"""
return self._orig
def is_dir(self):
"""
@return: C{True} if if upstream sources are an unpacked directory,
C{False} otherwise
@rtype: C{bool}
"""
return True if os.path.isdir(self._path) else False
@property
def path(self):
return self._path.rstrip('/')
def unpack(self, dir, filters=[]):
"""
Unpack packed upstream sources into a given directory
and determine the toplevel of the source tree.
"""
if self.is_dir():
raise GbpError, "Cannot unpack directory %s" % self.path
if not filters:
filters = []
if type(filters) != type([]):
raise GbpError, "Filters must be a list"
self._unpack_archive(dir, filters)
self.unpacked = self._unpacked_toplevel(dir)
def _unpack_archive(self, dir, filters):
"""
Unpack packed upstream sources into a given directory.
"""
ext = os.path.splitext(self.path)[1]
if ext in [ ".zip", ".xpi" ]:
self._unpack_zip(dir)
else:
self._unpack_tar(dir, filters)
def _unpack_zip(self, dir):
try:
gbpc.UnpackZipArchive(self.path, dir)()
except gbpc.CommandExecFailed:
raise GbpError, "Unpacking of %s failed" % self.path
def _unpacked_toplevel(self, dir):
"""unpacked archives can contain a leading directory or not"""
unpacked = glob.glob('%s/*' % dir)
unpacked.extend(glob.glob("%s/.*" % dir)) # include hidden files and folders
# Check that dir contains nothing but a single folder:
if len(unpacked) == 1 and os.path.isdir(unpacked[0]):
return unpacked[0]
else:
return dir
def _unpack_tar(self, dir, filters):
"""
Unpack a tarball to I{dir} applying a list of I{filters}. Leave the
cleanup to the caller in case of an error.
"""
try:
unpackArchive = gbpc.UnpackTarArchive(self.path, dir, filters)
unpackArchive()
except gbpc.CommandExecFailed:
# unpackArchive already printed an error message
raise GbpError
def pack(self, newarchive, filters=[]):
"""
Recreate a new archive from the current one
@param newarchive: the name of the new archive
@type newarchive: string
@param filters: tar filters to apply
@type filters: array of strings
@return: the new upstream source
@rtype: UpstreamSource
"""
if not self.unpacked:
raise GbpError, "Need an unpacked source tree to pack"
if not filters:
filters = []
if type(filters) != type([]):
raise GbpError, "Filters must be a list"
try:
unpacked = self.unpacked.rstrip('/')
repackArchive = gbpc.PackTarArchive(newarchive,
os.path.dirname(unpacked),
os.path.basename(unpacked),
filters)
repackArchive()
except gbpc.CommandExecFailed:
# repackArchive already printed an error
raise GbpError
return UpstreamSource(newarchive)
@staticmethod
def known_compressions():
return [ args[1][-1] for args in compressor_opts.items() ]
def guess_version(self, extra_regex=r''):
"""
Guess the package name and version from the filename of an upstream
archive.
>>> UpstreamSource('foo-bar_0.2.orig.tar.gz').guess_version()
('foo-bar', '0.2')
>>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version()
>>> UpstreamSource('git-bar-0.2.tar.gz').guess_version()
('git-bar', '0.2')
>>> UpstreamSource('git-bar-0.2-rc1.tar.gz').guess_version()
('git-bar', '0.2-rc1')
>>> UpstreamSource('git-bar-0.2:~-rc1.tar.gz').guess_version()
('git-bar', '0.2:~-rc1')
>>> UpstreamSource('git-Bar-0A2d:rc1.tar.bz2').guess_version()
('git-Bar', '0A2d:rc1')
>>> UpstreamSource('git-1.tar.bz2').guess_version()
('git', '1')
>>> UpstreamSource('kvm_87+dfsg.orig.tar.gz').guess_version()
('kvm', '87+dfsg')
>>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version()
>>> UpstreamSource('foo-Bar-a.b.tar.gz').guess_version()
>>> UpstreamSource('foo-bar_0.2.orig.tar.xz').guess_version()
('foo-bar', '0.2')
>>> UpstreamSource('foo-bar_0.2.orig.tar.lzma').guess_version()
('foo-bar', '0.2')
@param extra_regex: additional regex to apply, needs a 'package' and a
'version' group
@return: (package name, version) or None.
@rtype: tuple
"""
version_chars = r'[a-zA-Z\d\.\~\-\:\+]'
extensions = r'\.tar\.(%s)' % "|".join(self.known_compressions())
version_filters = map ( lambda x: x % (version_chars, extensions),
( # Debian package_<version>.orig.tar.gz:
r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)\.orig%s',
# Upstream package-<version>.tar.gz:
r'^(?P<package>[a-zA-Z\d\.\+\-]+)-(?P<version>[0-9]%s*)%s'))
if extra_regex:
version_filters = extra_regex + version_filters
for filter in version_filters:
m = re.match(filter, os.path.basename(self.path))
if m:
return (m.group('package'), m.group('version'))
class DscFile(object):
"""Keeps all needed data read from a dscfile"""
compressions = r"(%s)" % '|'.join(UpstreamSource.known_compressions())
......
......@@ -102,3 +102,208 @@ class PkgPolicy(object):
return False
return True
class UpstreamSource(object):
"""
Upstream source. Can be either an unpacked dir, a tarball or another type
of archive
@cvar _orig: are the upstream sources already suitable as an upstream
tarball
@type _orig: boolean
@cvar _path: path to the upstream sources
@type _path: string
@cvar _unpacked: path to the unpacked source tree
@type _unpacked: string
"""
def __init__(self, name, unpacked=None):
self._orig = False
self._path = name
self.unpacked = unpacked
self._check_orig()
if self.is_dir():
self.unpacked = self.path
def _check_orig(self):
"""
Check if upstream source format can be used as orig tarball.
This doesn't imply that the tarball is correctly named.
@return: C{True} if upstream source format is suitable
as upstream tarball, C{False} otherwise.
@rtype: C{bool}
"""
if self.is_dir():
self._orig = False
return
parts = self._path.split('.')
try:
if parts[-1] == 'tgz':
self._orig = True
elif parts[-2] == 'tar':
if (parts[-1] in compressor_opts or
parts[-1] in compressor_aliases):
self._orig = True
except IndexError:
self._orig = False
def is_orig(self):
"""
@return: C{True} if sources are suitable as upstream source,
C{False} otherwise
@rtype: C{bool}
"""
return self._orig
def is_dir(self):
"""
@return: C{True} if if upstream sources are an unpacked directory,
C{False} otherwise
@rtype: C{bool}
"""
return True if os.path.isdir(self._path) else False
@property
def path(self):
return self._path.rstrip('/')
def unpack(self, dir, filters=[]):
"""
Unpack packed upstream sources into a given directory
and determine the toplevel of the source tree.
"""
if self.is_dir():
raise GbpError, "Cannot unpack directory %s" % self.path
if not filters:
filters = []
if type(filters) != type([]):
raise GbpError, "Filters must be a list"
self._unpack_archive(dir, filters)
self.unpacked = self._unpacked_toplevel(dir)
def _unpack_archive(self, dir, filters):
"""
Unpack packed upstream sources into a given directory.
"""
ext = os.path.splitext(self.path)[1]
if ext in [ ".zip", ".xpi" ]:
self._unpack_zip(dir)
else:
self._unpack_tar(dir, filters)
def _unpack_zip(self, dir):
try:
gbpc.UnpackZipArchive(self.path, dir)()
except gbpc.CommandExecFailed:
raise GbpError, "Unpacking of %s failed" % self.path
def _unpacked_toplevel(self, dir):
"""unpacked archives can contain a leading directory or not"""
unpacked = glob.glob('%s/*' % dir)
unpacked.extend(glob.glob("%s/.*" % dir)) # include hidden files and folders
# Check that dir contains nothing but a single folder:
if len(unpacked) == 1 and os.path.isdir(unpacked[0]):
return unpacked[0]
else:
return dir
def _unpack_tar(self, dir, filters):
"""
Unpack a tarball to I{dir} applying a list of I{filters}. Leave the
cleanup to the caller in case of an error.
"""
try:
unpackArchive = gbpc.UnpackTarArchive(self.path, dir, filters)
unpackArchive()
except gbpc.CommandExecFailed:
# unpackArchive already printed an error message
raise GbpError
def pack(self, newarchive, filters=[]):
"""
Recreate a new archive from the current one
@param newarchive: the name of the new archive
@type newarchive: string
@param filters: tar filters to apply
@type filters: array of strings
@return: the new upstream source
@rtype: UpstreamSource
"""
if not self.unpacked:
raise GbpError, "Need an unpacked source tree to pack"
if not filters:
filters = []
if type(filters) != type([]):
raise GbpError, "Filters must be a list"
try:
unpacked = self.unpacked.rstrip('/')
repackArchive = gbpc.PackTarArchive(newarchive,
os.path.dirname(unpacked),
os.path.basename(unpacked),
filters)
repackArchive()
except gbpc.CommandExecFailed:
# repackArchive already printed an error
raise GbpError
return UpstreamSource(newarchive)
@staticmethod
def known_compressions():
return [ args[1][-1] for args in compressor_opts.items() ]
def guess_version(self, extra_regex=r''):
"""
Guess the package name and version from the filename of an upstream
archive.
>>> UpstreamSource('foo-bar_0.2.orig.tar.gz').guess_version()
('foo-bar', '0.2')
>>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version()
>>> UpstreamSource('git-bar-0.2.tar.gz').guess_version()
('git-bar', '0.2')
>>> UpstreamSource('git-bar-0.2-rc1.tar.gz').guess_version()
('git-bar', '0.2-rc1')
>>> UpstreamSource('git-bar-0.2:~-rc1.tar.gz').guess_version()
('git-bar', '0.2:~-rc1')
>>> UpstreamSource('git-Bar-0A2d:rc1.tar.bz2').guess_version()
('git-Bar', '0A2d:rc1')
>>> UpstreamSource('git-1.tar.bz2').guess_version()
('git', '1')
>>> UpstreamSource('kvm_87+dfsg.orig.tar.gz').guess_version()
('kvm', '87+dfsg')
>>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version()
>>> UpstreamSource('foo-Bar-a.b.tar.gz').guess_version()
>>> UpstreamSource('foo-bar_0.2.orig.tar.xz').guess_version()
('foo-bar', '0.2')
>>> UpstreamSource('foo-bar_0.2.orig.tar.lzma').guess_version()
('foo-bar', '0.2')
@param extra_regex: additional regex to apply, needs a 'package' and a
'version' group
@return: (package name, version) or None.
@rtype: tuple
"""
version_chars = r'[a-zA-Z\d\.\~\-\:\+]'
extensions = r'\.tar\.(%s)' % "|".join(self.known_compressions())
version_filters = map ( lambda x: x % (version_chars, extensions),
( # Debian package_<version>.orig.tar.gz:
r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)\.orig%s',
# Upstream package-<version>.tar.gz:
r'^(?P<package>[a-zA-Z\d\.\+\-]+)-(?P<version>[0-9]%s*)%s'))
if extra_regex:
version_filters = extra_regex + version_filters
for filter in version_filters:
m = re.match(filter, os.path.basename(self.path))
if m:
return (m.group('package'), m.group('version'))
......@@ -38,7 +38,7 @@ from gbp.scripts.common.buildpackage import (index_name, wc_name,
git_archive_submodules,
git_archive_single, dump_tree,
write_wc, drop_index)
from gbp.pkg import (compressor_opts, compressor_aliases)
from gbp.pkg import (UpstreamSource, compressor_opts, compressor_aliases)
def git_archive(repo, cp, output_dir, treeish, comp_type, comp_level, with_submodules):
"create a compressed orig tarball in output_dir using git_archive"
......@@ -169,7 +169,7 @@ def extract_orig(orig_tarball, dest_dir):
gbp.log.info("Extracting %s to '%s'" % (os.path.basename(orig_tarball), dest_dir))
move_old_export(dest_dir)
upstream = gbp.deb.UpstreamSource(orig_tarball)
upstream = UpstreamSource(orig_tarball)
upstream.unpack(dest_dir)
# Check if tarball extracts into a single folder or not:
......
......@@ -20,7 +20,7 @@
import os
import tempfile
import gbp.command_wrappers as gbpc
from gbp.deb import UpstreamSource
from gbp.pkg import UpstreamSource
from gbp.errors import GbpError
import gbp.log
......
......@@ -26,8 +26,9 @@ import glob
import pipes
import time
import gbp.command_wrappers as gbpc
from gbp.pkg import UpstreamSource
from gbp.deb import (debian_version_chars,
parse_dsc, DscFile, UpstreamSource)
parse_dsc, DscFile)
from gbp.deb.git import (DebianGitRepository, GitRepositoryError)
from gbp.deb.changelog import ChangeLog
from gbp.git import rfc822_date_to_git
......
......@@ -49,7 +49,7 @@ class TestUnpack:
def test_upstream_source_type(self):
for (comp, archive) in self.archives.iteritems():
source = gbp.deb.UpstreamSource(archive[0])
source = gbp.pkg.UpstreamSource(archive[0])
assert source.is_orig() == True
assert source.is_dir() == False
assert source.unpacked == None
......@@ -60,13 +60,13 @@ class TestUnpack:
def test_upstream_source_unpack(self):
for (comp, archive) in self.archives.iteritems():
source = gbp.deb.UpstreamSource(archive[0])
source = gbp.pkg.UpstreamSource(archive[0])
source.unpack(".")
self._check_files(archive[1], comp)
def test_upstream_source_unpack_no_filter(self):
for (comp, archive) in self.archives.iteritems():
source = gbp.deb.UpstreamSource(archive[0])
source = gbp.pkg.UpstreamSource(archive[0])
source.unpack(".", [])
self._check_files(archive[1], comp)
......@@ -74,7 +74,7 @@ class TestUnpack:
exclude = "README"
for (comp, archive) in self.archives.iteritems():
source = gbp.deb.UpstreamSource(archive[0])
source = gbp.pkg.UpstreamSource(archive[0])
source.unpack(".", [exclude])
archive[1].remove(exclude)
self._check_files(archive[1], comp)
......
......@@ -10,7 +10,7 @@ import tempfile
import unittest
import zipfile
from gbp.deb import UpstreamSource
from gbp.pkg import UpstreamSource
class TestDir(unittest.TestCase):
def test_directory(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment