Commit 8b0731bb authored by Sebastian Ramacher's avatar Sebastian Ramacher

Imported Upstream version 1.3.3

parent fe554734
# Automatically generated by `hgimportsvn`
syntax:glob
.svn
.hgsvn
# These lines are suggested according to the svn:ignore property
# Feel free to enable them by uncommenting them
syntax:glob
*.pyc
*.pyo
*.swp
*.html
*.class
*.orig
build/
dist/
py.egg-info
issue/
3rdparty/
This diff is collapsed.
......@@ -9,9 +9,10 @@ graft contrib
graft bin
graft testing
#exclude *.orig
#exclude *.rej
#exclude .hginore
#exclude *.pyc
#exclude *.orig
exclude *.rej
exclude .hginore
exclude *.pyc
#recursive-exclude testing *.pyc *.orig *.rej *$py.class
#prune .pyc
#prune .svn
......
Metadata-Version: 1.0
Name: py
Version: 1.2.1
Version: 1.3.3
Summary: py.test and pylib: rapid testing and development utils.
Home-page: http://pylib.org
Author: holger krekel, Guido Wesdorp, Carl Friedrich Bolz, Armin Rigo, Maciej Fijalkowski & others
......@@ -14,7 +14,7 @@ Description:
- `py.code`_: dynamic code compile and traceback printing support
Platforms: Linux, Win32, OSX
Interpreters: Python versions 2.4 through to 3.1, Jython 2.5.1.
Interpreters: Python versions 2.4 through to 3.2, Jython 2.5.1 and PyPy
For questions please check out http://pylib.org/contact.html
.. _`py.test`: http://pytest.org
......@@ -38,3 +38,4 @@ Classifier: Topic :: Software Development :: Testing
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: Utilities
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
The py lib is a Python development support library featuring
The py lib is a Python development support library featuring
the following tools and modules:
* py.test: tool for distributed automated testing
* py.code: dynamic code generation and introspection
* py.path: uniform local and svn path objects
* py.path: uniform local and svn path objects
For questions and more information please visit http://pylib.org
#!/usr/bin/env python
#!/usr/bin/env python
#
# find and import a version of 'py' that exists in a parent dir
# of the current working directory. fall back to import a
# of the current working directory. fall back to import a
# globally available version
#
import sys
......@@ -31,9 +31,9 @@ def searchpy(current):
if not searchpy(abspath(os.curdir)):
if not searchpy(opd(abspath(sys.argv[0]))):
if not searchpy(opd(__file__)):
pass # let's hope it is just on sys.path
pass # let's hope it is just on sys.path
import py
if __name__ == '__main__':
if __name__ == '__main__':
print ("py lib is at %s" % py.__file__)
......@@ -7,22 +7,43 @@ collect_ignore = ['build', 'doc/_build']
rsyncdirs = ['conftest.py', 'bin', 'py', 'doc', 'testing']
import py
import os, py
pid = os.getpid()
def pytest_addoption(parser):
group = parser.getgroup("pylib", "py lib testing options")
group.addoption('--sshhost',
group.addoption('--sshhost',
action="store", dest="sshhost", default=None,
help=("ssh xspec for ssh functional tests. "))
group.addoption('--runslowtests',
action="store_true", dest="runslowtests", default=False,
help=("run slow tests"))
group.addoption('--lsof',
action="store_true", dest="lsof", default=False,
help=("run FD checks if lsof is available"))
def pytest_configure(config):
if config.getvalue("lsof"):
try:
out = py.process.cmdexec("lsof -p %d" % pid)
except py.process.cmdexec.Error:
pass
else:
config._numfiles = len([x for x in out.split("\n") if "REG" in x])
def pytest_unconfigure(config, __multicall__):
if not hasattr(config, '_numfiles'):
return
__multicall__.execute()
out2 = py.process.cmdexec("lsof -p %d" % pid)
len2 = len([x for x in out2.split("\n") if "REG" in x])
assert len2 < config._numfiles + 7, out2
def pytest_funcarg__sshhost(request):
val = request.config.getvalue("sshhost")
if val:
return val
py.test.skip("need --sshhost option")
py.test.skip("need --sshhost option")
def pytest_generate_tests(metafunc):
multi = getattr(metafunc.function, 'multi', None)
if multi is not None:
......@@ -31,11 +52,11 @@ def pytest_generate_tests(metafunc):
for val in l:
metafunc.addcall(funcargs={name: val})
elif 'anypython' in metafunc.funcargnames:
for name in ('python2.4', 'python2.5', 'python2.6',
for name in ('python2.4', 'python2.5', 'python2.6',
'python2.7', 'python3.1', 'pypy-c', 'jython'):
metafunc.addcall(id=name, param=name)
# XXX copied from execnet's conftest.py - needs to be merged
# XXX copied from execnet's conftest.py - needs to be merged
winpymap = {
'python2.7': r'C:\Python27\python.exe',
'python2.6': r'C:\Python26\python.exe',
......@@ -52,7 +73,7 @@ def getexecutable(name, cache={}):
if executable:
if name == "jython":
import subprocess
popen = subprocess.Popen([str(executable), "--version"],
popen = subprocess.Popen([str(executable), "--version"],
universal_newlines=True, stderr=subprocess.PIPE)
out, err = popen.communicate()
if not err or "2.5" not in err:
......
......@@ -5,4 +5,4 @@ def pytest_runtest_call(item, __multicall__):
try:
return __multicall__.execute()
finally:
outerr = cap.reset()
outerr = cap.reset()
"""XXX in progress: resultdb plugin for database logging of test results.
"""XXX in progress: resultdb plugin for database logging of test results.
Saves test results to a datastore.
XXX this needs to be merged with resultlog plugin
Also mixes in some early ideas about an archive abstraction for test
Also mixes in some early ideas about an archive abstraction for test
results.
"""
"""
import py
py.test.skip("XXX needs to be merged with resultlog")
......@@ -15,15 +15,15 @@ from pytest_resultlog import ResultLog
def pytest_addoption(parser):
group = parser.addgroup("resultdb", "resultdb plugin options")
group.addoption('--resultdb', action="store", dest="resultdb",
group.addoption('--resultdb', action="store", dest="resultdb",
metavar="path",
help="path to the file to store test results.")
group.addoption('--resultdb_format', action="store",
group.addoption('--resultdb_format', action="store",
dest="resultdbformat", default='json',
help="data format (json, sqlite)")
def pytest_configure(config):
# XXX using config.XYZ is not good
# XXX using config.XYZ is not good
if config.getvalue('resultdb'):
if config.option.resultdb:
# local import so missing module won't crash py.test
......@@ -36,14 +36,14 @@ def pytest_configure(config):
except ImportError:
raise config.Error('Could not import simplejson module')
if config.option.resultdbformat.lower() == 'json':
resultdb = ResultDB(JSONResultArchive,
config.option.resultdb)
resultdb = ResultDB(JSONResultArchive,
config.option.resultdb)
elif config.option.resultdbformat.lower() == 'sqlite':
resultdb = ResultDB(SQLiteResultArchive,
config.option.resultdb)
resultdb = ResultDB(SQLiteResultArchive,
config.option.resultdb)
else:
raise config.Error('Unknown --resultdb_format: %s' %
config.option.resultdbformat)
raise config.Error('Unknown --resultdb_format: %s' %
config.option.resultdbformat)
config.pluginmanager.register(resultdb)
......@@ -52,7 +52,7 @@ class JSONResultArchive(object):
self.archive_path = archive_path
import simplejson
self.simplejson = simplejson
def init_db(self):
if os.path.exists(self.archive_path):
data_file = open(self.archive_path)
......@@ -84,7 +84,7 @@ class SQLiteResultArchive(object):
self.archive_path = archive_path
import sqlite3
self.sqlite3 = sqlite3
def init_db(self):
if not os.path.exists(self.archive_path):
conn = self.sqlite3.connect(self.archive_path)
......@@ -153,7 +153,7 @@ class ResultDB(ResultLog):
for item in vars(event).keys():
if item not in event_excludes:
data[item] = getattr(event, item)
# use the locally calculated longrepr & shortrepr
# use the locally calculated longrepr & shortrepr
data['longrepr'] = longrepr
data['shortrepr'] = shortrepr
......@@ -185,10 +185,10 @@ insert into pytest_results (
longrepr,
fspath,
itemname)
values (?, ?, ?, ?, ?, ?, ?, ?, ?);
values (?, ?, ?, ?, ?, ?, ?, ?, ?);
"""
SQL_SELECT_DATA = """
select
select
runid,
name,
passed,
......@@ -204,7 +204,7 @@ from pytest_results;
# ===============================================================================
#
# plugin tests
# plugin tests
#
# ===============================================================================
......@@ -214,7 +214,7 @@ class BaseResultArchiveTests(object):
cls = None
def setup_class(cls):
# XXX refactor setup into a funcarg?
# XXX refactor setup into a funcarg?
cls.tempdb = "test_tempdb"
def test_init_db(self, testdir):
......@@ -243,7 +243,7 @@ class BaseResultArchiveTests(object):
for key, value in data[0].items():
assert value == result[0][key]
assert 'runid' in result[0]
# make sure the data is persisted
tempdb_path = unicode(testdir.tmpdir.join(self.tempdb))
archive = self.cls(tempdb_path)
......@@ -257,7 +257,7 @@ class TestJSONResultArchive(BaseResultArchiveTests):
def setup_method(self, method):
py.test.importorskip("simplejson")
class TestSQLiteResultArchive(BaseResultArchiveTests):
cls = SQLiteResultArchive
......@@ -270,8 +270,8 @@ class TestSQLiteResultArchive(BaseResultArchiveTests):
archive = self.cls(tempdb_path)
archive.init_db()
assert os.path.exists(tempdb_path)
# is table in the database?
# is table in the database?
import sqlite3
conn = sqlite3.connect(tempdb_path)
cursor = conn.cursor()
......@@ -281,7 +281,7 @@ class TestSQLiteResultArchive(BaseResultArchiveTests):
cursor.close()
conn.close()
assert len(tables) == 1
def verify_archive_item_shape(item):
names = ("runid name passed skipped failed shortrepr "
"longrepr fspath itemname").split()
......@@ -299,14 +299,14 @@ class TestWithFunctionIntegration:
archive = SQLiteResultArchive(unicode(resultdb))
archive.init_db()
return archive
def test_collection_report(self, testdir):
py.test.skip("Needs a rewrite for db version.")
ok = testdir.makepyfile(test_collection_ok="")
skip = testdir.makepyfile(test_collection_skip="import py ; py.test.skip('hello')")
fail = testdir.makepyfile(test_collection_fail="XXX")
lines = self.getresultdb(testdir, ok)
lines = self.getresultdb(testdir, ok)
assert not lines
lines = self.getresultdb(testdir, skip)
......@@ -326,7 +326,7 @@ class TestWithFunctionIntegration:
def test_log_test_outcomes(self, testdir):
mod = testdir.makepyfile(test_mod="""
import py
import py
def test_pass(): pass
def test_skip(): py.test.skip("hello")
def test_fail(): raise ValueError("val")
......@@ -349,7 +349,7 @@ class TestWithFunctionIntegration:
raise ValueError
except ValueError:
excinfo = py.code.ExceptionInfo()
reslog = ResultDB(StringIO.StringIO())
reslog = ResultDB(StringIO.StringIO())
reslog.pytest_internalerror(excinfo.getrepr)
entry = reslog.logfile.getvalue()
entry_lines = entry.splitlines()
......@@ -357,7 +357,7 @@ class TestWithFunctionIntegration:
assert entry_lines[0].startswith('! ')
assert os.path.basename(__file__)[:-1] in entry_lines[0] #.py/.pyc
assert entry_lines[-1][0] == ' '
assert 'ValueError' in entry
assert 'ValueError' in entry
def test_generic(testdir):
testdir.makepyfile("""
......@@ -371,4 +371,4 @@ def test_generic(testdir):
""")
testdir.runpytest("--resultdb=result.sqlite")
#testdir.tmpdir.join("result.sqlite")
......@@ -20,7 +20,7 @@ try:
from greenlet import greenlet
except ImportError:
print "Since pylib 1.0 greenlet are removed and separately packaged: " \
"http://pypi.python.org/pypi/greenlet"
"http://pypi.python.org/pypi/greenlet"
sys.exit(10)
......@@ -44,7 +44,7 @@ def _run_twisted(logging=False):
failure.Failure.cleanFailure = lambda *args: None
if logging:
_start_twisted_logging()
def fix_signal_handling():
# see http://twistedmatrix.com/trac/ticket/733
import signal
......@@ -54,7 +54,7 @@ def _run_twisted(logging=False):
def start():
fix_signal_handling()
doit(None)
# recursively called for each test-function/method due done()
def doit(val): # val always None
# switch context to wait that wrapper() passes back to test-method
......@@ -68,7 +68,7 @@ def _run_twisted(logging=False):
def err(res):
reactor.callLater(0.0, doit, res)
# the test-function *may* return a deferred
# here the test-function will actually been called
# done() is finalizing a test-process by assuring recursive invoking
......@@ -92,19 +92,19 @@ def pytest_unconfigure(config):
gr_twisted.switch(None)
def pytest_pyfunc_call(pyfuncitem):
# XXX1 kwargs?
# XXX1 kwargs?
# XXX2 we want to delegate actual call to next plugin
# (which may want to produce test coverage, etc.)
# (which may want to produce test coverage, etc.)
res = gr_twisted.switch(lambda: pyfuncitem.call())
if res:
res.raiseException()
return True # indicates that we performed the function call
return True # indicates that we performed the function call
gr_twisted = greenlet(_run_twisted)
gr_tests = greenlet.getcurrent()
# ===============================================================================
# plugin tests
# plugin tests
# ===============================================================================
def test_generic(testdir):
......@@ -125,7 +125,7 @@ def test_generic(testdir):
def done():
log.msg("test_deferred.done() CALLBACK DONE")
d.callback(None)
reactor.callLater(2.5, done)
log.msg("test_deferred() returning deferred: %r" % (d,))
return d
......@@ -136,7 +136,7 @@ def test_generic(testdir):
def done():
log.msg("test_deferred2.done() CALLBACK DONE")
d.callback(None)
reactor.callLater(2.5, done)
log.msg("test_deferred2() returning deferred: %r" % (d,))
return d
......@@ -157,7 +157,7 @@ def test_generic(testdir):
def done():
log.msg("test_deferred3.done() CALLBACK DONE")
d.callback(None)
reactor.callLater(2.5, done)
log.msg("test_deferred3() returning deferred: %r" % (d,))
return d
......@@ -172,7 +172,7 @@ def test_generic(testdir):
def done():
log.msg("TestTwistedSetupMethod.test_deferred() CALLBACK DONE")
d.callback(None)
reactor.callLater(2.5, done)
log.msg("TestTwistedSetupMethod.test_deferred() returning deferred: %r" % (d,))
return d
......
pygreen: experimental IO and execnet operations through greenlets
pygreen: experimental IO and execnet operations through greenlets
"""
this little helper allows to run tests multiple times
in the same process. useful for running tests from
in the same process. useful for running tests from
a console.
NOTE: since 1.3.1 you can just call py.test.cmdline.main()
multiple times - no special logic needed.
"""
import py, sys
......
......@@ -46,19 +46,21 @@ except ImportError:
args = [quote(arg) for arg in args]
return os.spawnl(os.P_WAIT, sys.executable, *args) == 0
DEFAULT_VERSION = "0.6.6"
DEFAULT_VERSION = "0.6.13"
DEFAULT_URL = "http://pypi.python.org/packages/source/d/distribute/"
SETUPTOOLS_FAKED_VERSION = "0.6c11"
SETUPTOOLS_PKG_INFO = """\
Metadata-Version: 1.0
Name: setuptools
Version: 0.6c9
Version: %s
Summary: xxxx
Home-page: xxx
Author: xxx
Author-email: xxx
License: xxx
Description: xxx
"""
""" % SETUPTOOLS_FAKED_VERSION
def _install(tarball):
......@@ -79,12 +81,14 @@ def _install(tarball):
# installing
log.warn('Installing Distribute')
assert _python_cmd('setup.py', 'install')
if not _python_cmd('setup.py', 'install'):
log.warn('Something went wrong during the installation.')
log.warn('See the error message above.')
finally:
os.chdir(old_wd)
def _build_egg(tarball, to_dir):
def _build_egg(egg, tarball, to_dir):
# extracting the tarball
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
......@@ -104,27 +108,28 @@ def _build_egg(tarball, to_dir):
log.warn('Building a Distribute egg in %s', to_dir)
_python_cmd('setup.py', '-q', 'bdist_egg', '--dist-dir', to_dir)
# returning the result
for file in os.listdir(to_dir):
if fnmatch.fnmatch(file, 'distribute-%s*.egg' % DEFAULT_VERSION):
return os.path.join(to_dir, file)
raise IOError('Could not build the egg.')
finally:
os.chdir(old_wd)
# returning the result
log.warn(egg)
if not os.path.exists(egg):
raise IOError('Could not build the egg.')
def _do_download(version, download_base, to_dir, download_delay):
tarball = download_setuptools(version, download_base,
to_dir, download_delay)
egg = _build_egg(tarball, to_dir)
egg = os.path.join(to_dir, 'distribute-%s-py%d.%d.egg'
% (version, sys.version_info[0], sys.version_info[1]))
if not os.path.exists(egg):
tarball = download_setuptools(version, download_base,
to_dir, download_delay)
_build_egg(egg, tarball, to_dir)
sys.path.insert(0, egg)
import setuptools
setuptools.bootstrap_install_from = egg
def use_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL,
to_dir=os.curdir, download_delay=15, no_fake=False):
to_dir=os.curdir, download_delay=15, no_fake=True):
# making sure we use the absolute path
to_dir = os.path.abspath(to_dir)
was_imported = 'pkg_resources' in sys.modules or \
......@@ -134,7 +139,7 @@ def use_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL,
import pkg_resources
if not hasattr(pkg_resources, '_distribute'):
if not no_fake:
fake_setuptools()
_fake_setuptools()
raise ImportError
except ImportError:
return _do_download(version, download_base, to_dir, download_delay)
......@@ -159,7 +164,8 @@ def use_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL,
return _do_download(version, download_base, to_dir,
download_delay)
finally:
_create_fake_setuptools_pkg_info(to_dir)
if not no_fake:
_create_fake_setuptools_pkg_info(to_dir)
def download_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL,
to_dir=os.curdir, delay=15):
......@@ -197,6 +203,29 @@ def download_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL,
dst.close()
return os.path.realpath(saveto)
def _no_sandbox(function):
def __no_sandbox(*args, **kw):
try:
from setuptools.sandbox import DirectorySandbox
if not hasattr(DirectorySandbox, '_old'):
def violation(*args):
pass
DirectorySandbox._old = DirectorySandbox._violation
DirectorySandbox._violation = violation
patched = True
else:
patched = False
except ImportError:
patched = False
try:
return function(*args, **kw)
finally:
if patched:
DirectorySandbox._violation = DirectorySandbox._old
del DirectorySandbox._old
return __no_sandbox
def _patch_file(path, content):
"""Will backup the file then patch it"""
......@@ -214,26 +243,17 @@ def _patch_file(path, content):
f.close()
return True
_patch_file = _no_sandbox(_patch_file)
def _same_content(path, content):
return open(path).read() == content
def _rename_path(path):
new_name = path + '.OLD.%s' % time.time()
log.warn('Renaming %s into %s', path, new_name)
try:
from setuptools.sandbox import DirectorySandbox
def _violation(*args):
pass
DirectorySandbox._violation = _violation
except ImportError:
pass