Commit 153b8df5 authored by SVN-Git Migration's avatar SVN-Git Migration

Imported Upstream version 3.1.9

parent ba1c48e2
......@@ -150,4 +150,10 @@ Daniel M Taub, 2013/10/22
Matt Wise, 2013/11/06
Michael Robellard, 2013/11/07
Vsevolod Kulaga, 2013/11/16
Ionel Cristian Mărieș, 2013/12/09
Константин Подшумок, 2013/12/16
Antoine Legrand, 2014/01/09
Pepijn de Vos, 2014/01/15
Dan McGee, 2014/01/27
Paul Kilgo, 2014/01/28
Martin Davidsson, 2014/02/08
This diff is collapsed.
Metadata-Version: 1.1
Name: celery
Version: 3.1.6
Version: 3.1.9
Summary: Distributed Task Queue
Home-page: http://celeryproject.org
Author: Ask Solem
......@@ -12,7 +12,7 @@ Description: =================================
.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
:Version: 3.1.6 (Cipater)
:Version: 3.1.9 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
......@@ -282,10 +282,11 @@ Description: =================================
for using Redis as a message transport or as a result backend.
:celery[mongodb]:
for using MongoDB as a message transport, or as a result backend.
for using MongoDB as a message transport (*experimental*),
or as a result backend (*supported*).
:celery[sqs]:
for using Amazon SQS as a message transport.
for using Amazon SQS as a message transport (*experimental*).
:celery[memcache]:
for using memcached as a result backend.
......@@ -294,28 +295,29 @@ Description: =================================
for using Apache Cassandra as a result backend.
:celery[couchdb]:
for using CouchDB as a message transport.
for using CouchDB as a message transport (*experimental*).
:celery[couchbase]:
for using CouchBase as a result backend.
:celery[beanstalk]:
for using Beanstalk as a message transport.
for using Beanstalk as a message transport (*experimental*).
:celery[zookeeper]:
for using Zookeeper as a message transport.
:celery[zeromq]:
for using ZeroMQ as a message transport.
for using ZeroMQ as a message transport (*experimental*).
:celery[sqlalchemy]:
for using SQLAlchemy as a message transport, or as a result backend.
for using SQLAlchemy as a message transport (*experimental*),
or as a result backend (*supported*).
:celery[pyro]:
for using the Pyro4 message transport.
for using the Pyro4 message transport (*experimental*).
:celery[slmq]:
for using the SoftLayer Message Queue transport.
for using the SoftLayer Message Queue transport (*experimental*).
.. _celery-installing-from-source:
......
......@@ -4,7 +4,7 @@
.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
:Version: 3.1.6 (Cipater)
:Version: 3.1.9 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
......@@ -274,10 +274,11 @@ Transports and Backends
for using Redis as a message transport or as a result backend.
:celery[mongodb]:
for using MongoDB as a message transport, or as a result backend.
for using MongoDB as a message transport (*experimental*),
or as a result backend (*supported*).
:celery[sqs]:
for using Amazon SQS as a message transport.
for using Amazon SQS as a message transport (*experimental*).
:celery[memcache]:
for using memcached as a result backend.
......@@ -286,28 +287,29 @@ Transports and Backends
for using Apache Cassandra as a result backend.
:celery[couchdb]:
for using CouchDB as a message transport.
for using CouchDB as a message transport (*experimental*).
:celery[couchbase]:
for using CouchBase as a result backend.
:celery[beanstalk]:
for using Beanstalk as a message transport.
for using Beanstalk as a message transport (*experimental*).
:celery[zookeeper]:
for using Zookeeper as a message transport.
:celery[zeromq]:
for using ZeroMQ as a message transport.
for using ZeroMQ as a message transport (*experimental*).
:celery[sqlalchemy]:
for using SQLAlchemy as a message transport, or as a result backend.
for using SQLAlchemy as a message transport (*experimental*),
or as a result backend (*supported*).
:celery[pyro]:
for using the Pyro4 message transport.
for using the Pyro4 message transport (*experimental*).
:celery[slmq]:
for using the SoftLayer Message Queue transport.
for using the SoftLayer Message Queue transport (*experimental*).
.. _celery-installing-from-source:
......
Metadata-Version: 1.1
Name: celery
Version: 3.1.6
Version: 3.1.9
Summary: Distributed Task Queue
Home-page: http://celeryproject.org
Author: Ask Solem
......@@ -12,7 +12,7 @@ Description: =================================
.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
:Version: 3.1.6 (Cipater)
:Version: 3.1.9 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
......@@ -282,10 +282,11 @@ Description: =================================
for using Redis as a message transport or as a result backend.
:celery[mongodb]:
for using MongoDB as a message transport, or as a result backend.
for using MongoDB as a message transport (*experimental*),
or as a result backend (*supported*).
:celery[sqs]:
for using Amazon SQS as a message transport.
for using Amazon SQS as a message transport (*experimental*).
:celery[memcache]:
for using memcached as a result backend.
......@@ -294,28 +295,29 @@ Description: =================================
for using Apache Cassandra as a result backend.
:celery[couchdb]:
for using CouchDB as a message transport.
for using CouchDB as a message transport (*experimental*).
:celery[couchbase]:
for using CouchBase as a result backend.
:celery[beanstalk]:
for using Beanstalk as a message transport.
for using Beanstalk as a message transport (*experimental*).
:celery[zookeeper]:
for using Zookeeper as a message transport.
:celery[zeromq]:
for using ZeroMQ as a message transport.
for using ZeroMQ as a message transport (*experimental*).
:celery[sqlalchemy]:
for using SQLAlchemy as a message transport, or as a result backend.
for using SQLAlchemy as a message transport (*experimental*),
or as a result backend (*supported*).
:celery[pyro]:
for using the Pyro4 message transport.
for using the Pyro4 message transport (*experimental*).
:celery[slmq]:
for using the SoftLayer Message Queue transport.
for using the SoftLayer Message Queue transport (*experimental*).
.. _celery-installing-from-source:
......
......@@ -80,6 +80,7 @@ celery/contrib/batches.py
celery/contrib/methods.py
celery/contrib/migrate.py
celery/contrib/rdb.py
celery/contrib/sphinx.py
celery/events/__init__.py
celery/events/cursesmon.py
celery/events/dumper.py
......@@ -274,6 +275,7 @@ docs/.templates/sidebarintro.html
docs/.templates/sidebarlogo.html
docs/_ext/applyxrefs.py
docs/_ext/celerydocs.py
docs/_ext/githubsphinx.py
docs/_ext/literals_to_xrefs.py
docs/_theme/celery/theme.conf
docs/_theme/celery/static/celery.css_t
......@@ -403,6 +405,7 @@ docs/reference/celery.contrib.batches.rst
docs/reference/celery.contrib.methods.rst
docs/reference/celery.contrib.migrate.rst
docs/reference/celery.contrib.rdb.rst
docs/reference/celery.contrib.sphinx.rst
docs/reference/celery.events.rst
docs/reference/celery.events.state.rst
docs/reference/celery.exceptions.rst
......
pytz>dev
billiard>=3.3.0.10,<3.4
kombu>=3.0.7,<4.0
billiard>=3.3.0.14,<3.4
kombu>=3.0.12,<4.0
[zookeeper]
kazoo>=1.3.1
......
......@@ -14,7 +14,7 @@ version_info_t = namedtuple(
)
SERIES = 'Cipater'
VERSION = version_info_t(3, 1, 6, '', '')
VERSION = version_info_t(3, 1, 9, '', '')
__version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
__author__ = 'Ask Solem'
__contact__ = 'ask@celeryproject.org'
......
......@@ -24,7 +24,8 @@ def _warn_deprecated(new):
def main():
maybe_patch_concurrency()
if 'multi' not in sys.argv:
maybe_patch_concurrency()
from celery.bin.celery import main
main()
......@@ -37,7 +38,6 @@ def _compat_worker():
def _compat_multi():
maybe_patch_concurrency()
_warn_deprecated('celery multi')
from celery.bin.multi import main
main()
......
......@@ -58,7 +58,7 @@ _task_join_will_block = False
def _set_task_join_will_block(blocks):
global _task_join_will_block
_task_join_will_block = True
_task_join_will_block = blocks
def task_join_will_block():
......
......@@ -221,6 +221,7 @@ class TaskProducer(Producer):
**kwargs):
"""Send task message."""
retry = self.retry if retry is None else retry
headers = {} if headers is None else headers
qname = queue
if queue is None and exchange is None:
......@@ -379,6 +380,7 @@ class AMQP(object):
producer_cls = TaskProducer
consumer_cls = TaskConsumer
queues_cls = Queues
#: Cached and prepared routing table.
_rtable = None
......@@ -414,7 +416,7 @@ class AMQP(object):
routing_key=conf.CELERY_DEFAULT_ROUTING_KEY), )
autoexchange = (self.autoexchange if autoexchange is None
else autoexchange)
return Queues(
return self.queues_cls(
queues, self.default_exchange, create_missing,
ha_policy, autoexchange,
)
......
......@@ -33,6 +33,7 @@ from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
from celery.five import items, values
from celery.loaders import get_loader_cls
from celery.local import PromiseProxy, maybe_evaluate
from celery.utils import shadowsig
from celery.utils.functional import first, maybe_list
from celery.utils.imports import instantiate, symbol_by_name
from celery.utils.objects import mro_lookup
......@@ -235,7 +236,9 @@ class Celery(object):
'run': fun if bind else staticmethod(fun),
'_decorated': True,
'__doc__': fun.__doc__,
'__module__': fun.__module__}, **options))()
'__module__': fun.__module__,
'__wrapped__': fun}, **options))()
shadowsig(T, fun) # for inspect.getargspec
task = self._tasks[T.name] # return global instance.
return task
......@@ -307,7 +310,8 @@ class Celery(object):
conf = self.conf
if conf.CELERY_ALWAYS_EAGER: # pragma: no cover
warnings.warn(AlwaysEagerIgnored(
'CELERY_ALWAYS_EAGER has no effect on send_task'))
'CELERY_ALWAYS_EAGER has no effect on send_task',
), stacklevel=2)
options = router.route(options, name, args, kwargs)
if connection:
producer = self.amqp.TaskProducer(connection)
......@@ -445,8 +449,8 @@ class Celery(object):
if self._pool:
self._pool.force_close_all()
self._pool = None
amqp = self.amqp
if amqp._producer_pool:
amqp = self.__dict__.get('amqp')
if amqp is not None and amqp._producer_pool is not None:
amqp._producer_pool.force_close_all()
amqp._producer_pool = None
......
......@@ -66,7 +66,7 @@ def add_unlock_chord_task(app):
"""
from celery.canvas import signature
from celery.exceptions import ChordError
from celery.result import result_from_tuple
from celery.result import allow_join_result, result_from_tuple
default_propagate = app.conf.CELERY_CHORD_PROPAGATES
......@@ -95,7 +95,8 @@ def add_unlock_chord_task(app):
if deps.ready():
callback = signature(callback, app=app)
try:
ret = j(propagate=propagate)
with allow_join_result():
ret = j(timeout=3.0, propagate=propagate)
except Exception as exc:
try:
culprit = next(deps._failed_join_report())
......@@ -117,8 +118,8 @@ def add_unlock_chord_task(app):
exc=ChordError('Callback error: {0!r}'.format(exc)),
)
else:
return unlock_chord.retry(countdown=interval,
max_retries=max_retries)
raise unlock_chord.retry(countdown=interval,
max_retries=max_retries)
return unlock_chord
......@@ -277,8 +278,6 @@ def add_chain_task(app):
tasks.append(task)
prev_task, prev_res = task, res
print(tasks)
return tasks, results
def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
......@@ -302,7 +301,7 @@ def add_chain_task(app):
if link_error:
for task in tasks:
task.set(link_error=link_error)
tasks[0].apply_async()
tasks[0].apply_async(**options)
return result
def apply(self, args=(), kwargs={}, signature=maybe_signature,
......@@ -356,17 +355,11 @@ def add_chord_task(app):
results = [AsyncResult(prepare_member(task, body, group_id))
for task in header.tasks]
# - fallback implementations schedules the chord_unlock task here
app.backend.on_chord_apply(group_id, body,
interval=interval,
countdown=countdown,
max_retries=max_retries,
propagate=propagate,
result=results)
# - call the header group, returning the GroupResult.
final_res = header(*partial_args, task_id=group_id)
return final_res
return self.backend.apply_chord(
header, partial_args, group_id,
body, interval=interval, countdown=countdown,
max_retries=max_retries, propagate=propagate, result=results,
)
def _prepare_member(self, task, body, group_id):
opts = task.options
......
......@@ -343,6 +343,8 @@ class Task(object):
'CELERY_STORE_ERRORS_EVEN_IF_IGNORED'),
)
_backend = None # set by backend property.
__bound__ = False
# - Tasks are lazily bound, so that configuration is not set
......@@ -360,7 +362,6 @@ class Task(object):
setattr(self, attr_name, conf[config_name])
if self.accept_magic_kwargs is None:
self.accept_magic_kwargs = app.accept_magic_kwargs
self.backend = app.backend
# decorate with annotations from config.
if not was_bound:
......@@ -651,7 +652,10 @@ class Task(object):
if max_retries is not None and retries > max_retries:
if exc:
# first try to reraise the original exception
maybe_reraise()
# or if not in an except block then raise the custom exc.
raise exc()
raise self.MaxRetriesExceededError(
"Can't retry {0}[{1}] args:{2} kwargs:{3}".format(
self.name, request.id, S.args, S.kwargs))
......@@ -896,6 +900,17 @@ class Task(object):
self._exec_options = extract_exec_options(self)
return self._exec_options
@property
def backend(self):
backend = self._backend
if backend is None:
return self.app.backend
return backend
@backend.setter
def backend(self, value): # noqa
self._backend = value
@property
def __name__(self):
return self.__class__.__name__
......
......@@ -22,6 +22,7 @@ import sys
from warnings import warn
from billiard.einfo import ExceptionInfo
from kombu.exceptions import EncodeError
from kombu.utils import kwdict
from celery import current_app
......@@ -193,7 +194,26 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
from celery import canvas
signature = canvas.maybe_signature # maybe_ does not clone if already
def on_error(request, exc, uuid, state=FAILURE, call_errbacks=True):
if propagate:
raise
I = Info(state, exc)
R = I.handle_error_state(task, eager=eager)
if call_errbacks:
[signature(errback, app=app).apply_async((uuid, ))
for errback in request.errbacks or []]
return I, R, I.state, I.retval
def trace_task(uuid, args, kwargs, request=None):
# R - is the possibly prepared return value.
# I - is the Info object.
# retval - is the always unmodified return value.
# state - is the resulting task state.
# This function is very long because we have unrolled all the calls
# for performance reasons, and because the function is so long
# we want the main variables (I, and R) to stand out visually from the
# the rest of the variables, so breaking PEP8 is worth it ;)
R = I = retval = state = None
kwargs = kwdict(kwargs)
try:
......@@ -224,32 +244,30 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
I, R = Info(IGNORED, exc), ExceptionInfo(internal=True)
state, retval = I.state, I.retval
except Retry as exc:
I = Info(RETRY, exc)
state, retval = I.state, I.retval
R = I.handle_error_state(task, eager=eager)
I, R, state, retval = on_error(
task_request, exc, uuid, RETRY, call_errbacks=False,
)
except Exception as exc:
if propagate:
raise
I = Info(FAILURE, exc)
state, retval = I.state, I.retval
R = I.handle_error_state(task, eager=eager)
[signature(errback, app=app).apply_async((uuid, ))
for errback in task_request.errbacks or []]
I, R, state, retval = on_error(task_request, exc, uuid)
except BaseException as exc:
raise
else:
# callback tasks must be applied before the result is
# stored, so that result.children is populated.
[signature(callback, app=app).apply_async((retval, ))
for callback in task_request.callbacks or []]
if publish_result:
store_result(
uuid, retval, SUCCESS, request=task_request,
)
if task_on_success:
task_on_success(retval, uuid, args, kwargs)
if success_receivers:
send_success(sender=task, result=retval)
try:
# callback tasks must be applied before the result is
# stored, so that result.children is populated.
[signature(callback, app=app).apply_async((retval, ))
for callback in task_request.callbacks or []]
if publish_result:
store_result(
uuid, retval, SUCCESS, request=task_request,
)
except EncodeError as exc:
I, R, state, retval = on_error(task_request, exc, uuid)
else:
if task_on_success:
task_on_success(retval, uuid, args, kwargs)
if success_receivers:
send_success(sender=task, result=retval)
# -* POST *-
if state not in IGNORE_STATES:
......
......@@ -25,7 +25,9 @@ from kombu.utils.encoding import safe_str
from celery import VERSION_BANNER, platforms, signals
from celery.app import trace
from celery.exceptions import CDeprecationWarning, SystemTerminate
from celery.exceptions import (
CDeprecationWarning, WorkerShutdown, WorkerTerminate,
)
from celery.five import string, string_t
from celery.loaders.app import AppLoader
from celery.platforms import check_privileges
......@@ -91,10 +93,10 @@ BANNER = """\
{platform}
[config]
.> broker: {conninfo}
.> app: {app}
.> transport: {conninfo}
.> results: {results}
.> concurrency: {concurrency}
.> events: {events}
[queues]
{queues}
......@@ -225,6 +227,7 @@ class Worker(WorkController):
hostname=safe_str(self.hostname),
version=VERSION_BANNER,
conninfo=self.app.connection().as_uri(),
results=self.app.conf.CELERY_RESULT_BACKEND or 'disabled',
concurrency=concurrency,
platform=safe_str(_platform.platform()),
events=events,
......@@ -274,7 +277,7 @@ class Worker(WorkController):
def _shutdown_handler(worker, sig='TERM', how='Warm',
exc=SystemExit, callback=None):
exc=WorkerShutdown, callback=None):
def _handle_request(*args):
with in_sighandler():
......@@ -291,11 +294,11 @@ def _shutdown_handler(worker, sig='TERM', how='Warm',
_handle_request.__name__ = str('worker_{0}'.format(how))
platforms.signals[sig] = _handle_request
install_worker_term_handler = partial(
_shutdown_handler, sig='SIGTERM', how='Warm', exc=SystemExit,
_shutdown_handler, sig='SIGTERM', how='Warm', exc=WorkerShutdown,
)
if not is_jython: # pragma: no cover
install_worker_term_hard_handler = partial(
_shutdown_handler, sig='SIGQUIT', how='Cold', exc=SystemTerminate,
_shutdown_handler, sig='SIGQUIT', how='Cold', exc=WorkerTerminate,
)
else: # pragma: no cover
install_worker_term_handler = \
......@@ -314,6 +317,9 @@ else: # pragma: no cover
def _reload_current_worker():
platforms.close_open_fds([
sys.__stdin__, sys.__stdout__, sys.__stderr__,
])
os.execv(sys.executable, [sys.executable] + sys.argv)
......
......@@ -55,6 +55,7 @@ class AMQPBackend(BaseBackend):
BacklogLimitExceeded = BacklogLimitExceeded
persistent = True
supports_autoexpire = True
supports_native_join = True
......
......@@ -26,10 +26,13 @@ from kombu.serialization import (
from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8
from celery import states
from celery import current_app, maybe_signature
from celery.app import current_task
from celery.exceptions import ChordError, TimeoutError, TaskRevokedError
from celery.five import items
from celery.result import result_from_tuple, GroupResult
from celery.result import (
GroupResult, ResultBase, allow_join_result, result_from_tuple,
)
from celery.utils import timeutils
from celery.utils.functional import LRUCache
from celery.utils.serialization import (
......@@ -46,7 +49,6 @@ PY3 = sys.version_info >= (3, 0)
def unpickle_backend(cls, args, kwargs):
"""Return an unpickled backend."""
from celery import current_app
return cls(*args, app=current_app._get_current_object(), **kwargs)
......@@ -73,6 +75,13 @@ class BaseBackend(object):
#: Set to true if the backend is peristent by default.
persistent = True
retry_policy = {
'max_retries': 20,
'interval_start': 0,
'interval_step': 1,
'interval_max': 1,
}
def __init__(self, app, serializer=None,
max_cached_results=None, accept=None, **kwargs):
self.app = app
......@@ -138,7 +147,7 @@ class BaseBackend(object):
def prepare_value(self, result):
"""Prepare value for storage."""
if isinstance(result, GroupResult):
if self.serializer != 'pickle' and isinstance(result, ResultBase):
return result.as_tuple()
return result
......@@ -311,7 +320,11 @@ class BaseBackend(object):
self.app.tasks['celery.chord_unlock'].apply_async(
(group_id, body, ), kwargs, countdown=countdown,
)
on_chord_apply = fallback_chord_unlock
def apply_chord(self, header, partial_args, group_id, body, **options):
result = header(*partial_args, task_id=group_id)
self.fallback_chord_unlock(group_id, body, **options)
return result
def current_task_children(self, request=None):
request = request or getattr(current_task(), 'request', None)
......@@ -335,6 +348,8 @@ class KeyValueStoreBackend(BaseBackend):
self.key_t = self.key_t.__func__ # remove binding
self._encode_prefixes()
super(KeyValueStoreBackend, self).__init__(*args, **kwargs)
if self.implements_incr:
self.apply_chord = self._apply_chord_incr
def _encode_prefixes(self):
self.task_keyprefix = self.key_t(self.task_keyprefix)
......@@ -459,17 +474,14 @@ class KeyValueStoreBackend(BaseBackend):
meta['result'] = result_from_tuple(result, self.app)
return meta
def on_chord_apply(self, group_id, body, result=None, **kwargs):
if self.implements_incr:
self.save_group(group_id, self.app.GroupResult(group_id, result))
else:
self.fallback_chord_unlock(group_id, body, result, **kwargs)
def _apply_chord_incr(self, header, partial_args, group_id, body,
result=None, **options):
self.save_group(group_id, self.app.GroupResult(group_id, result))
return header(*partial_args, task_id=group_id)
def on_chord_part_return(self, task, propagate=None):
if not self.implements_incr:
return
from celery import maybe_signature
from celery.result import GroupResult
app = self.app