Imported Upstream version 0.10.33

parent 9daaca33
......@@ -11,3 +11,4 @@ build
deb_dist
dist
src/rosdep.egg-info
nose*
......@@ -2,6 +2,7 @@ language: python
python:
- "2.6"
- "2.7"
- "3.2"
- "3.3"
# command to install dependencies
install:
......
__version__ = '0.10.27'
__version__ = '0.10.33'
......@@ -69,6 +69,12 @@ class RosdepInternalError(Exception):
def __str__(self):
return self.message
class CachePermissionError(Exception):
"""Failure when writing the cache."""
pass
class DownloadFailure(Exception):
"""
Failure downloading sources list data for I/O or other format reasons.
......
......@@ -36,11 +36,6 @@ from rospkg.os_detect import OsDetect
from .core import rd_debug, RosdepInternalError, InstallFailed, print_bold, InvalidData
# use OsDetect.get_version() for OS version key
TYPE_VERSION = 'version'
# use OsDetect.get_codename() for OS version key
TYPE_CODENAME = 'codename'
# kwc: InstallerContext is basically just a bunch of dictionaries with
# defined lookup methods. It really encompasses two facets of a
# rosdep configuration: the pluggable nature of installers and
......@@ -95,11 +90,11 @@ class InstallerContext(object):
self.os_override = os_name, os_version
def get_os_version_type(self, os_name):
return self.os_version_type.get(os_name, TYPE_VERSION)
return self.os_version_type.get(os_name, OsDetect.get_version)
def set_os_version_type(self, os_name, version_type):
if version_type not in (TYPE_VERSION, TYPE_CODENAME):
raise ValueError("version type not TYPE_VERSION or TYPE_CODENAME")
if not hasattr(version_type, '__call__'):
raise ValueError("version type should be a method")
self.os_version_type[os_name] = version_type
def get_os_name_and_version(self):
......@@ -115,10 +110,8 @@ class InstallerContext(object):
return self.os_override
else:
os_name = self.os_detect.get_name()
if self.get_os_version_type(os_name) == TYPE_CODENAME:
os_version = self.os_detect.get_codename()
else:
os_version = self.os_detect.get_version()
os_key = self.get_os_version_type(os_name)
os_version = os_key(self.os_detect)
return os_name, os_version
def get_os_detect(self):
......@@ -260,14 +253,14 @@ class Installer(object):
"""
raise NotImplementedError("is_installed", resolved_item)
def get_install_command(self, resolved, interactive=True, reinstall=False):
def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False):
"""
:param resolved: list of resolved installation items, ``[opaque]``
:param interactive: If `False`, disable interactive prompts,
e.g. Pass through ``-y`` or equivalant to package manager.
:param reinstall: If `True`, install everything even if already installed
"""
raise NotImplementedError("get_package_install_command", resolved, interactive, reinstall)
raise NotImplementedError("get_package_install_command", resolved, interactive, reinstall, quiet)
def get_depends(self, rosdep_args):
"""
......@@ -320,6 +313,17 @@ class PackageManagerInstaller(Installer):
"""
self.detect_fn = detect_fn
self.supports_depends = supports_depends
self.as_root = True
self.sudo_command = 'sudo'
def elevate_priv(self, cmd):
"""
Prepend *self.sudo_command* to the command if *self.as_root* is ``True``.
:param list cmd: list of strings comprising the command
:returns: a list of commands
"""
return (self.sudo_command.split() if self.as_root else []) + cmd
def resolve(self, rosdep_args):
"""
......@@ -358,8 +362,8 @@ class PackageManagerInstaller(Installer):
def is_installed(self, resolved_item):
return not self.get_packages_to_install([resolved_item])
def get_install_command(self, resolved, interactive=True, reinstall=False):
raise NotImplementedError('subclasses must implement', resolved, interactive, reinstall)
def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False):
raise NotImplementedError('subclasses must implement', resolved, interactive, reinstall, quiet)
def get_depends(self, rosdep_args):
"""
......@@ -426,7 +430,7 @@ class RosdepInstaller(object):
return uninstalled, errors
def install(self, uninstalled, interactive=True, simulate=False,
continue_on_error=False, reinstall=False, verbose=False):
continue_on_error=False, reinstall=False, verbose=False, quiet=False):
"""
Install the uninstalled rosdeps. This API is for the bulk
workflow of rosdep (see example below). For a more targeted
......@@ -474,7 +478,7 @@ class RosdepInstaller(object):
try:
self.install_resolved(installer_key, resolved, simulate=simulate,
interactive=interactive, reinstall=reinstall, continue_on_error=continue_on_error,
verbose=verbose)
verbose=verbose, quiet=quiet)
except InstallFailed as e:
if not continue_on_error:
raise
......@@ -485,7 +489,7 @@ class RosdepInstaller(object):
raise InstallFailed(failures=failures)
def install_resolved(self, installer_key, resolved, simulate=False, interactive=True,
reinstall=False, continue_on_error=False, verbose=False):
reinstall=False, continue_on_error=False, verbose=False, quiet=False):
"""
Lower-level API for installing a rosdep dependency. The
rosdep keys have already been resolved to *installer_key* and
......@@ -498,12 +502,13 @@ class RosdepInstaller(object):
:param reinstall: If ``True``, install dependencies if even
already installed (default ``False``).
:param verbose: If ``True``, print verbose output to screen (default ``False``)
:param quiet: If ``True``, supress output except for errors (default ``False``)
:raises: :exc:`InstallFailed` if any of *resolved* fail to install.
"""
installer_context = self.installer_context
installer = installer_context.get_installer(installer_key)
command = installer.get_install_command(resolved, interactive=interactive, reinstall=reinstall)
command = installer.get_install_command(resolved, interactive=interactive, reinstall=reinstall, quiet=quiet)
if not command:
if verbose:
print("#No packages to install")
......
This diff is collapsed.
......@@ -55,10 +55,10 @@ class PacmanInstaller(PackageManagerInstaller):
def __init__(self):
super(PacmanInstaller, self).__init__(pacman_detect)
def get_install_command(self, resolved, interactive=True, reinstall=False):
def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False):
#TODO: interactive switch
packages = self.get_packages_to_install(resolved, reinstall=reinstall)
if not packages:
return []
else:
return [['sudo', 'pacman', '-Sy', '--needed', p] for p in packages]
return [self.elevate_priv(['pacman', '-Sy', '--needed', p]) for p in packages]
......@@ -62,14 +62,16 @@ class AptCygInstaller(PackageManagerInstaller):
def __init__(self):
super(AptCygInstaller, self).__init__(cygcheck_detect)
self.as_root = False
self.sudo_command = 'cygstart --action=runas'
def get_install_command(self, resolved, interactive=True, reinstall=False):
def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False):
packages = self.get_packages_to_install(resolved, reinstall=reinstall)
#TODO: interactive
if not packages:
return []
else:
return [['apt-cyg', '-m', 'ftp://sourceware.org/pub/cygwinports', 'install']+packages]
return [self.elevate_priv(['apt-cyg', '-m', 'ftp://sourceware.org/pub/cygwinports', 'install'])+packages]
if __name__ == '__main__':
print("test cygcheck_detect(true)", cygcheck_detect('cygwin'))
......@@ -28,12 +28,12 @@
# Author Tully Foote, Ken Conley
from rospkg.os_detect import OS_DEBIAN, OS_UBUNTU
from rospkg.os_detect import OS_DEBIAN, OS_UBUNTU, OsDetect
from .pip import PIP_INSTALLER
from .gem import GEM_INSTALLER
from .source import SOURCE_INSTALLER
from ..installers import PackageManagerInstaller, TYPE_CODENAME
from ..installers import PackageManagerInstaller
from ..shell_utils import read_stdout
# apt package manager key
......@@ -52,7 +52,7 @@ def register_debian(context):
context.add_os_installer_key(OS_DEBIAN, GEM_INSTALLER)
context.add_os_installer_key(OS_DEBIAN, SOURCE_INSTALLER)
context.set_default_os_installer_key(OS_DEBIAN, APT_INSTALLER)
context.set_os_version_type(OS_DEBIAN, TYPE_CODENAME)
context.set_os_version_type(OS_DEBIAN, OsDetect.get_codename)
def register_ubuntu(context):
context.add_os_installer_key(OS_UBUNTU, APT_INSTALLER)
......@@ -60,7 +60,7 @@ def register_ubuntu(context):
context.add_os_installer_key(OS_UBUNTU, GEM_INSTALLER)
context.add_os_installer_key(OS_UBUNTU, SOURCE_INSTALLER)
context.set_default_os_installer_key(OS_UBUNTU, APT_INSTALLER)
context.set_os_version_type(OS_UBUNTU, TYPE_CODENAME)
context.set_os_version_type(OS_UBUNTU, OsDetect.get_codename)
def dpkg_detect(pkgs, exec_fn=None):
"""
......@@ -92,6 +92,7 @@ def dpkg_detect(pkgs, exec_fn=None):
ret_list.append( pkg_row[0])
return [version_lock_map[r] for r in ret_list]
class AptInstaller(PackageManagerInstaller):
"""
An implementation of the Installer for use on debian style
......@@ -100,11 +101,15 @@ class AptInstaller(PackageManagerInstaller):
def __init__(self):
super(AptInstaller, self).__init__(dpkg_detect)
def get_install_command(self, resolved, interactive=True, reinstall=False):
def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False):
packages = self.get_packages_to_install(resolved, reinstall=reinstall)
if not packages:
return []
if not interactive and quiet:
return [self.elevate_priv(['apt-get', 'install', '-y', '-qq', p]) for p in packages]
elif quiet:
return [self.elevate_priv(['apt-get', 'install', '-qq', p]) for p in packages]
if not interactive:
return [['sudo', 'apt-get', 'install', '-y', p] for p in packages]
return [self.elevate_priv(['apt-get', 'install', '-y', p]) for p in packages]
else:
return [['sudo', 'apt-get', 'install', p] for p in packages]
return [self.elevate_priv(['apt-get', 'install', p]) for p in packages]
......@@ -78,10 +78,10 @@ class PkgAddInstaller(Installer):
def __init__(self):
super(PkgAddInstaller, self).__init__(pkg_info_detect)
def get_install_command(self, resolved, interactive=True, reinstall=False):
def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False):
packages = self.get_packages_to_install(resolved, reinstall=reinstall)
if not packages:
return []
else:
#pkg_add does not have a non-interactive command
return [['sudo', '/usr/sbin/pkg_add', '-r']+packages]
return [self.elevate_priv(['/usr/sbin/pkg_add', '-r'])+packages]
......@@ -75,12 +75,12 @@ class GemInstaller(PackageManagerInstaller):
def __init__(self):
super(GemInstaller, self).__init__(gem_detect, supports_depends=True)
def get_install_command(self, resolved, interactive=True, reinstall=False):
def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False):
if not is_gem_installed():
raise InstallFailed((GEM_INSTALLER, "gem is not installed"))
packages = self.get_packages_to_install(resolved, reinstall=reinstall)
if not packages:
return []
else:
return [['sudo', 'gem', 'install', p] for p in packages]
return [self.elevate_priv(['gem', 'install', p]) for p in packages]
......@@ -107,10 +107,10 @@ class PortageInstaller(PackageManagerInstaller):
super(PortageInstaller, self).__init__(portage_detect)
def get_install_command(self, resolved, interactive=True, reinstall=False):
def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False):
atoms = self.get_packages_to_install(resolved, reinstall=reinstall)
cmd = [ 'sudo', 'emerge' ]
cmd = self.elevate_priv(['emerge'])
if not atoms:
return []
......
......@@ -60,11 +60,11 @@ class ZypperInstaller(PackageManagerInstaller):
def __init__(self):
super(ZypperInstaller, self).__init__(rpm_detect)
def get_install_command(self, resolved, interactive=True, reinstall=False):
def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False):
packages = self.get_packages_to_install(resolved, reinstall=reinstall)
if not packages:
return []
if not interactive:
return [['sudo', 'zypper', 'install', '-yl']+packages]
return [self.elevate_priv(['zypper', 'install', '-yl'])+packages]
else:
return [['sudo', 'zypper', 'install']+packages]
return [self.elevate_priv(['zypper', 'install'])+packages]
This diff is collapsed.
......@@ -74,12 +74,12 @@ class PipInstaller(PackageManagerInstaller):
def __init__(self):
super(PipInstaller, self).__init__(pip_detect, supports_depends=True)
def get_install_command(self, resolved, interactive=True, reinstall=False):
def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False):
if not is_pip_installed():
raise InstallFailed((PIP_INSTALLER, "pip is not installed"))
packages = self.get_packages_to_install(resolved, reinstall=reinstall)
if not packages:
return []
else:
return [['sudo', 'pip', 'install', '-U', p] for p in packages]
return [self.elevate_priv(['pip', 'install', '-U', p]) for p in packages]
......@@ -34,13 +34,18 @@ from rospkg.os_detect import OS_RHEL, OS_FEDORA
from .pip import PIP_INSTALLER
from .source import SOURCE_INSTALLER
from ..installers import PackageManagerInstaller, TYPE_CODENAME
from ..core import rd_debug
from ..installers import PackageManagerInstaller
from ..shell_utils import read_stdout
# dnf package manager key
DNF_INSTALLER='dnf'
# yum package manager key
YUM_INSTALLER='yum'
def register_installers(context):
context.set_installer(DNF_INSTALLER, DnfInstaller())
context.set_installer(YUM_INSTALLER, YumInstaller())
def register_platforms(context):
......@@ -49,10 +54,11 @@ def register_platforms(context):
def register_fedora(context):
context.add_os_installer_key(OS_FEDORA, PIP_INSTALLER)
context.add_os_installer_key(OS_FEDORA, DNF_INSTALLER)
context.add_os_installer_key(OS_FEDORA, YUM_INSTALLER)
context.add_os_installer_key(OS_FEDORA, SOURCE_INSTALLER)
context.set_default_os_installer_key(OS_FEDORA, YUM_INSTALLER)
context.set_os_version_type(OS_FEDORA, TYPE_CODENAME)
context.set_os_version_type(OS_FEDORA, lambda self: self.get_version() if int(self.get_version()) > 20 else self.get_codename())
def register_rhel(context):
context.add_os_installer_key(OS_RHEL, PIP_INSTALLER)
......@@ -60,23 +66,91 @@ def register_rhel(context):
context.add_os_installer_key(OS_RHEL, SOURCE_INSTALLER)
context.set_default_os_installer_key(OS_RHEL, YUM_INSTALLER)
def rpm_detect(packages, exec_fn=None):
def rpm_detect_py(packages):
ret_list = []
import rpm
ts = rpm.TransactionSet()
for raw_req in packages:
req = rpm_expand_py(raw_req)
rpms = ts.dbMatch(rpm.RPMTAG_PROVIDES, req)
if len(rpms) > 0:
ret_list += [raw_req]
return ret_list
def rpm_detect_cmd(raw_packages, exec_fn=None):
ret_list = []
#cmd = ['rpm', '-q', '--qf ""'] # suppress output for installed packages
cmd = ['rpm', '-q', '--whatprovides', '--qf', '[%{PROVIDES}\n]'] # output: "pkg_name" for installed, error text for not installed packages
cmd.extend(packages)
if exec_fn is None:
exec_fn = read_stdout
packages = [rpm_expand_cmd(package, exec_fn) for package in raw_packages]
cmd = ['rpm', '-q', '--whatprovides', '--qf', '[%{PROVIDES}\n]']
cmd.extend(packages)
std_out = exec_fn(cmd)
out_lines = std_out.split('\n')
for line in out_lines:
# if there is no space, it's not an error text -> it's installed
if line and ' ' not in line:
ret_list.append(line)
out_lines = std_out.split()
for index, package in enumerate(packages):
if package in out_lines:
ret_list.append(raw_packages[index])
return ret_list
def rpm_detect(packages, exec_fn=None):
try:
return rpm_detect_py(packages)
except ImportError:
return rpm_detect_cmd(packages, exec_fn)
def rpm_expand_py(macro):
import rpm
expanded = rpm.expandMacro(macro)
rd_debug('Expanded rpm macro in \'%s\' to \'%s\'' % (macro, expanded))
return expanded
def rpm_expand_cmd(macro, exec_fn=None):
cmd = ['rpm', '-E', macro]
if exec_fn is None:
exec_fn = read_stdout
expanded = exec_fn(cmd).strip()
rd_debug('Expanded rpm macro in \'%s\' to \'%s\'' % (macro, expanded))
return expanded
def rpm_expand(package, exec_fn=None):
if not '%' in package:
return package
try:
return rpm_expand_py(package)
except ImportError:
return rpm_expand_cmd(package, exec_fn)
class DnfInstaller(PackageManagerInstaller):
"""
This class provides the functions for installing using dnf
it's methods partially implement the Rosdep OS api to complement
the roslib.OSDetect API.
"""
def __init__(self):
super(DnfInstaller, self).__init__(rpm_detect)
def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False):
raw_packages = self.get_packages_to_install(resolved, reinstall=reinstall)
packages = [rpm_expand(package) for package in raw_packages]
if not packages:
return []
elif not interactive and quiet:
return [['sudo', 'dnf', '--assumeyes', '--quiet', 'install'] + packages]
elif quiet:
return [['sudo', 'dnf', '--quiet', 'install'] + packages]
elif not interactive:
return [['sudo', 'dnf', '--assumeyes', 'install'] + packages]
else:
return [['sudo', 'dnf', 'install'] + packages]
class YumInstaller(PackageManagerInstaller):
"""
This class provides the functions for installing using yum
......@@ -87,12 +161,17 @@ class YumInstaller(PackageManagerInstaller):
def __init__(self):
super(YumInstaller, self).__init__(rpm_detect)
def get_install_command(self, resolved, interactive=True, reinstall=False):
packages = self.get_packages_to_install(resolved, reinstall=reinstall)
def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False):
raw_packages = self.get_packages_to_install(resolved, reinstall=reinstall)
packages = [rpm_expand(package) for package in raw_packages]
if not packages:
return []
elif not interactive and quiet:
return [self.elevate_priv(['yum', '--assumeyes', '--quiet', '--skip-broken', 'install']) + packages]
elif quiet:
return [self.elevate_priv(['yum', '--quiet', '--skip-broken', 'install']) + packages]
elif not interactive:
return [['sudo', 'yum', '-y', '--skip-broken', 'install'] + packages]
return [self.elevate_priv(['yum', '--assumeyes', '--skip-broken', 'install']) + packages]
else:
return [['sudo', 'yum', '--skip-broken', 'install'] + packages]
return [self.elevate_priv(['yum', '--skip-broken', 'install']) + packages]
......@@ -217,7 +217,7 @@ class SourceInstaller(PackageManagerInstaller):
except InvalidRdmanifest as ex:
raise InvalidData(str(ex))
def get_install_command(self, resolved, interactive=True, reinstall=False):
def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False):
# Instead of attempting to describe the source-install steps
# inside of the rosdep command chain, we shell out to an
# external rosdep-source command. This separation means that
......
......@@ -45,7 +45,7 @@ try:
except ImportError:
import pickle
from .core import InvalidData, DownloadFailure
from .core import InvalidData, DownloadFailure, CachePermissionError
from .gbpdistro_support import get_gbprepo_as_rosdep_data, download_gbpdistro_as_rosdep_data
try:
......@@ -79,6 +79,19 @@ CACHE_INDEX = 'index'
# extension for binary cache
PICKLE_CACHE_EXT = '.pickle'
SOURCE_PATH_ENV = 'ROSDEP_SOURCE_PATH'
def get_sources_list_dirs(source_list_dir):
if SOURCE_PATH_ENV in os.environ:
sdirs = os.environ[SOURCE_PATH_ENV].split(os.pathsep)
else:
sdirs = [source_list_dir]
for p in list(sdirs):
if not os.path.exists(p):
sdirs.remove(p)
return sdirs
def get_sources_list_dir():
# base of where we read config files from
......@@ -88,8 +101,13 @@ def get_sources_list_dir():
etc_ros = rospkg.get_etc_ros_dir()
else:
etc_ros = '/etc/ros'
# compute cache directory
return os.path.join(etc_ros, 'rosdep', SOURCES_LIST_DIR)
# compute default system wide sources directory
sys_sources_list_dir = os.path.join(etc_ros, 'rosdep', SOURCES_LIST_DIR)
sources_list_dirs = get_sources_list_dirs(sys_sources_list_dir)
if sources_list_dirs:
return sources_list_dirs[0]
else:
return sys_sources_list_dir
def get_default_sources_list_file():
return os.path.join(get_sources_list_dir(), '20-default.list')
......@@ -374,14 +392,14 @@ def parse_sources_list(sources_list_dir=None):
"""
if sources_list_dir is None:
sources_list_dir = get_sources_list_dir()
if not os.path.exists(sources_list_dir):
# no sources on this system. this is a valid state.
return []
sources_list_dirs = get_sources_list_dirs(sources_list_dir)
filelist = [f for f in os.listdir(sources_list_dir) if f.endswith('.list')]
filelist = []
for sdir in sources_list_dirs:
filelist += sorted([os.path.join(sdir, f) for f in os.listdir(sdir) if f.endswith('.list')])
sources_list = []
for f in sorted(filelist):
sources_list.extend(parse_sources_file(os.path.join(sources_list_dir, f)))
for f in filelist:
sources_list.extend(parse_sources_file(f))
return sources_list
def update_sources_list(sources_list_dir=None, sources_cache_dir=None,
......@@ -492,7 +510,10 @@ def write_cache_file(source_cache_d, filename_key, rosdep_data):
os.makedirs(source_cache_d)
key_hash = compute_filename_hash(filename_key)
filepath = os.path.join(source_cache_d, key_hash)
try:
write_atomic(filepath + PICKLE_CACHE_EXT, pickle.dumps(rosdep_data, -1), True)
except OSError as e:
raise CachePermissionError("Failed to write cache file: " + str(e))
try:
os.unlink(filepath)
except OSError:
......
[DEFAULT]
Depends: python-rospkg, python-yaml, python-catkin-pkg, python-rosdistro (>= 0.3.0)
Suite: lucid oneiric precise quantal raring saucy trusty wheezy jessie
XS-Python-Version: >= 2.6
Depends3: python3-rospkg, python3-yaml, python3-catkin-pkg, python3-rosdistro (>= 0.3.0)
Suite: oneiric precise quantal raring saucy trusty utopic wheezy jessie
X-Python3-Version: >= 3.2
[DEFAULT]
Package: python3-rosdep
Depends: python3-rospkg, python3-yaml, python3-catkin-pkg, python3-rosdistro (>= 0.3.0)
Suite: precise quantal raring saucy trusty wheezy jessie
X-Python3-Version: >= 3.3
subversion:[{"name":"subversion","homepage":"http://subversion.apache.org/","versions":{"stable":"1.8.8","bottle":true,"devel":null,"head":null},"revision":0,"installed":[{"version":"1.8.8","used_options":[],"built_as_bottle":null,"poured_from_bottle":true}],"linked_keg":"1.8.8","keg_only":false,"dependencies":["pkg-config","autoconf","automake","libtool","sqlite","scons","openssl"],"conflicts_with":[],"caveats":"svntools have been installed to:\n /usr/local/opt/subversion/libexec\n","options":[{"option":"--universal","description":"Build a universal binary"},{"option":"--java","description":"Build Java bindings"},{"option":"--perl","description":"Build Perl bindings"},{"option":"--ruby","description":"Build Ruby bindings"},{"option":"--with-python","description":"Build with python support"}]}]
bazaar:[{"name":"bazaar","homepage":"http://bazaar-vcs.org/","versions":{"stable":"2.6.0","bottle":false,"devel":null,"head":null},"revision":0,"installed":[{"version":"2.6.0","used_options":[],"built_as_bottle":null,"poured_from_bottle":false}],"linked_keg":"2.6.0","keg_only":false,"dependencies":[],"conflicts_with":[],"caveats":null,"options":[]}]
......@@ -32,7 +32,6 @@ import sys
def test_create_default_installer_context():
import rosdep2
from rosdep2.installers import TYPE_CODENAME
# test both constructors
for context in [rosdep2.create_default_installer_context(), rosdep2.create_default_installer_context(verbose=True)]:
......@@ -40,10 +39,10 @@ def test_create_default_installer_context():
assert isinstance(context, rosdep2.InstallerContext)
#this is just tripwire as we actual value will change over time
from rospkg.os_detect import OS_UBUNTU
from rospkg.os_detect import OS_UBUNTU, OsDetect
assert OS_UBUNTU in context.get_os_keys()
assert context.get_installer('apt') is not None
assert 'apt' in context.get_os_installer_keys(OS_UBUNTU)
assert TYPE_CODENAME == context.get_os_version_type(OS_UBUNTU)
assert OsDetect.get_codename == context.get_os_version_type(OS_UBUNTU)
......@@ -80,8 +80,8 @@ def test_InstallerContext_ctor():
assert len(context.get_os_keys()) == 0
def test_InstallerContext_get_os_version_type():
from rospkg.os_detect import OS_UBUNTU
from rosdep2.installers import InstallerContext, TYPE_CODENAME, TYPE_VERSION
from rospkg.os_detect import OS_UBUNTU, OsDetect
from rosdep2.installers import InstallerContext
context = InstallerContext()
try:
......@@ -90,12 +90,13 @@ def test_InstallerContext_get_os_version_type():
except ValueError:
pass
assert TYPE_VERSION == context.get_os_version_type(OS_UBUNTU)
context.set_os_version_type(OS_UBUNTU, TYPE_CODENAME)
assert TYPE_CODENAME == context.get_os_version_type(OS_UBUNTU)
assert OsDetect.get_version == context.get_os_version_type(OS_UBUNTU)
context.set_os_version_type(OS_UBUNTU, OsDetect.get_codename)
assert OsDetect.get_codename == context.get_os_version_type(OS_UBUNTU)
def test_InstallerContext_os_version_and_name():
from rosdep2.installers import InstallerContext, TYPE_CODENAME, TYPE_VERSION
from rospkg.os_detect import OsDetect
from rosdep2.installers import InstallerContext
context = InstallerContext()
context.set_verbose(True)
os_name, os_version = context.get_os_name_and_version()
......@@ -113,12 +114,12 @@ def test_InstallerContext_os_version_and_name():
os_detect_mock.get_version.return_value = 'fakeos-version'
os_detect_mock.get_codename.return_value = 'fakeos-codename'
context = InstallerContext(os_detect_mock)
context.set_os_version_type('fakeos', TYPE_CODENAME)
context.set_os_version_type('fakeos', os_detect_mock.get_codename)
os_name, os_version = context.get_os_name_and_version()
assert os_name == 'fakeos', os_name
assert os_version == 'fakeos-codename', os_version
context.set_os_version_type('fakeos', TYPE_VERSION)
context.set_os_version_type('fakeos', os_detect_mock.get_version)
os_name, os_version = context.get_os_name_and_version()
assert os_name == 'fakeos', os_name
assert os_version == 'fakeos-version', os_version
......
......@@ -37,6 +37,9 @@ import rospkg.os_detect
import unittest
from mock import patch
from mock import DEFAULT
GITHUB_BASE_URL = 'https://github.com/ros/rosdistro/raw/master/rosdep/base.yaml'
GITHUB_PYTHON_URL = 'https://github.com/ros/rosdistro/raw/master/rosdep/python.yaml'
......@@ -51,7 +54,9 @@ def get_cache_dir():
assert os.path.isdir(p)
return p
from rosdep2 import main
from rosdep2.main import rosdep_main
from rosdep2.main import setup_proxy_opener
from contextlib import contextmanager
@contextmanager
......@@ -214,3 +219,15 @@ class TestRosdepMain(unittest.TestCase):
assert False, "system exit should have occurred"
except SystemExit:
pass
@patch('rosdep2.main.install_opener')
@patch('rosdep2.main.build_opener')
@patch('rosdep2.main.HTTPBasicAuthHandler')
@patch('rosdep2.main.ProxyHandler')
def test_proxy_detection(self, proxy, bah, build, install):
with patch.dict('os.environ', {'http_proxy': 'something'}, clear=True):
setup_proxy_opener()
proxy.assert_called_with({'http': 'something'})
with patch.dict('os.environ', {'https_proxy': 'somethings'}, clear=True):
setup_proxy_opener()
proxy.assert_called_with({'https': 'somethings'})
# -*- coding: utf-8 -*-
# Copyright (c) 2011, Willow Garage, Inc.
# All rights reserved.
#
......@@ -29,21 +31,53 @@
import os
import traceback
from mock import Mock, patch
from mock import call
from mock import Mock
from mock import patch
def get_test_dir():
return os.path.abspath(os.path.join(os.path.dirname(__file__), 'osx'))
def is_port_installed_tripwire():
# don't know the correct answer, but make sure this does not throw