Commit 1ca0fad3 authored by SVN-Git Migration's avatar SVN-Git Migration

Imported Upstream version 3.1.18

parent ef94a72a
......@@ -160,3 +160,23 @@ Martin Davidsson, 2014/02/08
Chris Clark, 2014/02/20
Matthew Duggan, 2014/04/10
Brian Bouterse, 2014/04/10
Dmitry Malinovsky, 2014/04/28
Luke Pomfrey, 2014/05/06
Alexey Kotlyarov, 2014/05/16
Ross Deane, 2014/07/11
Tadej Janež, 2014/08/08
Akexander Koshelev, 2014/08/19
Davide Quarta, 2014/08/19
John Whitlock, 2014/08/19
Konstantinos Koukopoulos, 2014/08/24
Albert Yee Wang, 2014/08/29
Andrea Rabbaglietti, 2014/10/02
Joe Jevnik, 2014/10/22
Nathan Van Gheem, 2014/10/28
Gino Ledesma, 2014/10/28
Thomas French, 2014/11/10
Michael Permana, 2014/11/6
Bert Vanderbauwhede, 2014/12/18
John Anderson, 2014/12/27
Luke Burden, 2015/01/24
Mickaël Penhard, 2015/02/15
......@@ -8,6 +8,248 @@ This document contains change notes for bugfix releases in the 3.1.x series
(Cipater), please see :ref:`whatsnew-3.1` for an overview of what's
new in Celery 3.1.
.. _version-3.1.18:
:release-date: 2015-04-22 05:30 P.M UTC
:release-by: Ask Solem
- **Requirements**
- Now depends on :ref:`Kombu 3.0.25 <kombu:version-3.0.25>`.
- Now depends on :mod:`billiard`
- **Django**: Now supports Django 1.8 (Issue #2536).
Fix contributed by Bence Tamas and Mickaël Penhard.
- **Results**: MongoDB result backend now compatible with pymongo 3.0.
Fix contributed by Fatih Sucu.
- **Tasks**: Fixed bug only happening when a task has multiple callbacks
(Issue #2515).
Fix contributed by NotSqrt.
- **Commands**: Preload options now support ``--arg value`` syntax.
Fix contributed by John Anderson.
- **Compat**: A typo caused ``celery.log.setup_logging_subsystem`` to be
Fix contributed by Gunnlaugur Thor Briem.
- **init scripts**: The celerybeat generic init script now uses
``/bin/sh`` instead of bash (Issue #2496).
Fix contributed by Jelle Verstraaten.
- **Django**: Fixed a :exc:`TypeError` sometimes occurring in logging
when validating models.
Fix contributed by Alexander.
- **Commands**: Worker now supports new ``--executable`` argument that can
be used with ``--detach``.
Contributed by Bert Vanderbauwhede.
- **Canvas**: Fixed crash in chord unlock fallback task (Issue #2404).
- **Worker**: Fixed rare crash occurring with ``--autoscale`` enabled
(Issue #2411).
- **Django**: Properly recycle worker Django database connections when the
Django ``CONN_MAX_AGE`` setting is enabled (Issue #2453).
Fix contributed by Luke Burden.
.. _version-3.1.17:
:release-date: 2014-11-19 03:30 P.M UTC
:release-by: Ask Solem
.. admonition:: Do not enable the :setting:`CELERYD_FORCE_EXECV` setting!
Please review your configuration and disable this option if you're using the
RabbitMQ or Redis transport.
Keeping this option enabled after 3.1 means the async based prefork pool will
be disabled, which can easily cause instability.
- **Requirements**
- Now depends on :ref:`Kombu 3.0.24 <kombu:version-3.0.24>`.
Includes the new Qpid transport coming in Celery 3.2, backported to
support those who may still require Python 2.6 compatibility.
- Now depends on :mod:`billiard`
- ``celery[librabbitmq]`` now depends on librabbitmq 1.6.1.
- **Task**: The timing of ETA/countdown tasks were off after the example ``LocalTimezone``
implementation in the Python documentation no longer works in Python 3.4.
(Issue #2306).
- **Task**: Raising :exc:`~celery.exceptions.Ignore` no longer sends
``task-failed`` event (Issue #2365).
- **Redis result backend**: Fixed unbound local errors.
Fix contributed by Thomas French.
- **Task**: Callbacks was not called properly if ``link`` was a list of
signatures (Issuse #2350).
- **Canvas**: chain and group now handles json serialized signatures
(Issue #2076).
- **Results**: ``.join_native()`` would accidentally treat the ``STARTED``
state as being ready (Issue #2326).
This could lead to the chord callback being called with invalid arguments
when using chords with the :setting:`CELERY_TRACK_STARTED` setting
- **Canvas**: The ``chord_size`` attribute is now set for all canvas primitives,
making sure more combinations will work with the ``new_join`` optimization
for Redis (Issue #2339).
- **Task**: Fixed problem with app not being properly propagated to
``trace_task`` in all cases.
Fix contributed by kristaps.
- **Worker**: Expires from task message now associated with a timezone.
Fix contributed by Albert Wang.
- **Cassandra result backend**: Fixed problems when using detailed mode.
When using the Cassandra backend in detailed mode, a regression
caused errors when attempting to retrieve results.
Fix contributed by Gino Ledesma.
- **Mongodb Result backend**: Pickling the backend instance will now include
the original url (Issue #2347).
Fix contributed by Sukrit Khera.
- **Task**: Exception info was not properly set for tasks raising
:exc:`~celery.exceptions.Reject` (Issue #2043).
- **Worker**: Duplicates are now removed when loading the set of revoked tasks
from the worker state database (Issue #2336).
- **celery.contrib.rdb**: Fixed problems with ``rdb.set_trace`` calling stop
from the wrong frame.
Fix contributed by llllllllll.
- **Canvas**: ``chain`` and ``chord`` can now be immutable.
- **Canvas**: ``chord.apply_async`` will now keep partial args set in
``self.args`` (Issue #2299).
- **Results**: Small refactoring so that results are decoded the same way in
all result backends.
- **Logging**: The ``processName`` format was introduced in Py2.6.2 so for
compatibility this format is now excluded when using earlier versions
(Issue #1644).
.. _version-3.1.16:
:release-date: 2014-10-03 06:00 P.M UTC
:release-by: Ask Solem
- **Worker**: 3.1.15 broke ``-Ofair`` behavior (Issue #2286).
This regression could result in all tasks executing
in a single child process if ``-Ofair`` was enabled.
- **Canvas**: ``celery.signature`` now properly forwards app argument
in all cases.
- **Task**: ``.retry()`` did not raise the exception correctly
when called without a current exception.
Fix contributed by Andrea Rabbaglietti.
- **Worker**: The ``enable_events`` remote control command
disabled worker-related events by mistake (Issue #2272).
Fix contributed by Konstantinos Koukopoulos.
- **Django**: Adds support for Django 1.7 class names in INSTALLED_APPS
when using ``app.autodiscover_tasks()`` (Issue #2248).
- **Sphinx**: ``celery.contrib.sphinx`` now uses ``getfullargspec``
on Python 3 (Issue #2302).
- **Redis/Cache Backends**: Chords will now run at most once if one or more tasks
in the chord are executed multiple times for some reason.
.. _version-3.1.15:
:release-date: 2014-09-14 11:00 P.M UTC
:release-by: Ask Solem
- **Django**: Now makes sure ``django.setup()`` is called
before importing any task modules (Django 1.7 compatibility, Issue #2227)
- **Results**: ``result.get()`` was misbehaving by calling
``backend.get_task_meta`` in a finally call leading to
AMQP result backend queues not being properly cleaned up (Issue #2245).
.. _version-3.1.14:
:release-date: 2014-09-08 03:00 P.M UTC
:release-by: Ask Solem
- **Requirements**
- Now depends on :ref:`Kombu 3.0.22 <kombu:version-3.0.22>`.
- **Init scripts**: The generic worker init scripts ``status`` command
now gets an accurate pidfile list (Issue #1942).
- **Init scripts**: The generic beat script now implements the ``status``
Contributed by John Whitlock.
- **Commands**: Multi now writes informational output to stdout instead of stderr.
- **Worker**: Now ignores not implemented error for ``pool.restart``
(Issue #2153).
- **Task**: Retry no longer raises retry exception when executed in eager
mode (Issue #2164).
- **AMQP Result backend**: Now ensured ``on_interval`` is called at least
every second for blocking calls to properly propagate parent errors.
- **Django**: Compatibility with Django 1.7 on Windows (Issue #2126).
- **Programs**: `--umask` argument can be now specified in both octal (if starting
with 0) or decimal.
.. _version-3.1.13:
......@@ -299,7 +541,7 @@ News
Celery), so if you do enable it then make sure you do so on all
See :ref:`redis-caveats-fanout-patterns`.
See :ref:`redis-caveats`.
This will be the default in Celery 3.2.
Metadata-Version: 1.1
Name: celery
Version: 3.1.13
Version: 3.1.18
Summary: Distributed Task Queue
Author: Ask Solem
......@@ -12,7 +12,7 @@ Description: =================================
.. image::
:Version: 3.1.13 (Cipater)
:Version: 3.1.18 (Cipater)
......@@ -191,7 +191,7 @@ Description: =================================
database connections at ``fork``.
.. _`Django`:
.. _`Pylons`:
.. _`Pylons`:
.. _`Flask`:
.. _`web2py`:
.. _`Bottle`:
......@@ -4,7 +4,7 @@
.. image::
:Version: 3.1.13 (Cipater)
:Version: 3.1.18 (Cipater)
......@@ -183,7 +183,7 @@ development easier, and sometimes they add important hooks like closing
database connections at ``fork``.
.. _`Django`:
.. _`Pylons`:
.. _`Pylons`:
.. _`Flask`:
.. _`web2py`:
.. _`Bottle`:
Metadata-Version: 1.1
Name: celery
Version: 3.1.13
Version: 3.1.18
Summary: Distributed Task Queue
Author: Ask Solem
......@@ -12,7 +12,7 @@ Description: =================================
.. image::
:Version: 3.1.13 (Cipater)
:Version: 3.1.18 (Cipater)
......@@ -191,7 +191,7 @@ Description: =================================
database connections at ``fork``.
.. _`Django`:
.. _`Pylons`:
.. _`Pylons`:
.. _`Flask`:
.. _`web2py`:
.. _`Bottle`:
......@@ -9,7 +9,7 @@ kazoo>=1.3.1
......@@ -14,7 +14,7 @@ version_info_t = namedtuple(
SERIES = 'Cipater'
VERSION = version_info_t(3, 1, 13, '', '')
VERSION = version_info_t(3, 1, 18, '', '')
__version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
__author__ = 'Ask Solem'
__contact__ = ''
......@@ -150,7 +150,7 @@ class Celery(object):
if not isinstance(self._tasks, TaskRegistry):
self._tasks = TaskRegistry(self._tasks or {})
# If the class defins a custom __reduce_args__ we need to use
# If the class defines a custom __reduce_args__ we need to use
# the old way of pickling apps, which is pickling a list of
# args instead of the new way that pickles a dict of keywords.
self._using_v1_reduce = app_has_custom(self, '__reduce_args__')
......@@ -538,7 +538,7 @@ class Celery(object):
when unpickling."""
return {
'main': self.main,
'changes': self.conf.changes,
'changes': self.conf.changes if self.configured else self._preconf,
'loader': self.loader_cls,
'backend': self.backend_cls,
'amqp': self.amqp_cls,
......@@ -51,8 +51,9 @@ def add_unlock_chord_task(app):
default_propagate = app.conf.CELERY_CHORD_PROPAGATES
@app.task(name='celery.chord_unlock', max_retries=None, shared=False,
default_retry_delay=1, ignore_result=True, _force_evaluate=True)
def unlock_chord(group_id, callback, interval=None, propagate=None,
default_retry_delay=1, ignore_result=True, _force_evaluate=True,
def unlock_chord(self, group_id, callback, interval=None, propagate=None,
max_retries=None, result=None,
Result=app.AsyncResult, GroupResult=app.GroupResult,
......@@ -63,44 +64,50 @@ def add_unlock_chord_task(app):
# exception set to ChordError.
propagate = default_propagate if propagate is None else propagate
if interval is None:
interval = unlock_chord.default_retry_delay
interval = self.default_retry_delay
# check if the task group is ready, and if so apply the callback.
deps = GroupResult(
[result_from_tuple(r, app=app) for r in result],
j = deps.join_native if deps.supports_native_join else deps.join
if deps.ready():
callback = signature(callback, app=app)
ready = deps.ready()
except Exception as exc:
raise self.retry(
exc=exc, countdown=interval, max_retries=max_retries,
if not ready:
raise self.retry(countdown=interval, max_retries=max_retries)
callback = signature(callback, app=app)
with allow_join_result():
ret = j(timeout=3.0, propagate=propagate)
except Exception as exc:
with allow_join_result():
ret = j(timeout=3.0, propagate=propagate)
culprit = next(deps._failed_join_report())
reason = 'Dependency {} raised {1!r}'.format(
culprit, exc,
except StopIteration:
reason = repr(exc)
logger.error('Chord %r raised: %r', group_id, exc, exc_info=1)
except Exception as exc:
culprit = next(deps._failed_join_report())
reason = 'Dependency {} raised {1!r}'.format(
culprit, exc,
except StopIteration:
reason = repr(exc)
logger.error('Chord %r raised: %r', group_id, exc, exc_info=1)
except Exception as exc:
logger.error('Chord %r raised: %r', group_id, exc,
exc=ChordError('Callback error: {0!r}'.format(exc)),
raise unlock_chord.retry(countdown=interval,
exc=ChordError('Callback error: {0!r}'.format(exc)),
return unlock_chord
......@@ -332,7 +339,7 @@ def add_chord_task(app):
if eager:
return header.apply(args=partial_args, task_id=group_id)
body.setdefault('chord_size', len(header.tasks))
body['chord_size'] = len(header.tasks)
results = header.freeze(group_id=group_id, chord=body).results
return self.backend.apply_chord(
......@@ -47,6 +47,11 @@ _REDIS_OLD = {'deprecate_by': '2.5', 'remove_by': '4.0',
searchresult = namedtuple('searchresult', ('namespace', 'key', 'type'))
# logging: processName first introduced in Py 2.6.2 (Issue #1644).
if sys.version_info < (2, 6, 2):
class Option(object):
alt = None
deprecate_by = None
......@@ -533,6 +533,10 @@ class Task(object):
:attr:`trail` attribute
:keyword publisher: Deprecated alias to ``producer``.
:rtype :class:`celery.result.AsyncResult`: if
:setting:`CELERY_ALWAYS_EAGER` is not set, otherwise
Also supports all keyword arguments supported by
......@@ -659,20 +663,23 @@ class Task(object):
# first try to reraise the original exception
# or if not in an except block then raise the custom exc.
raise exc()
raise exc
raise self.MaxRetriesExceededError(
"Can't retry {0}[{1}] args:{2} kwargs:{3}".format(,, S.args, S.kwargs))
# If task was executed eagerly using apply(),
# then the retry must also be executed eagerly.
ret = Retry(exc=exc, when=eta or countdown)
if is_eager:
# if task was executed eagerly using apply(),
# then the retry must also be executed eagerly.
return ret
S.apply().get() if is_eager else S.apply_async()
except Exception as exc:
if is_eager:
raise Reject(exc, requeue=False)
ret = Retry(exc=exc, when=eta or countdown)
if throw:
raise ret
return ret
......@@ -273,9 +273,9 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
for group_ in groups:
group.apply_async((retval, ))
group_.apply_async((retval, ))
if sigs:
group(sigs).apply_async(retval, )
group(sigs).apply_async((retval, ))
signature(callbacks[0], app=app).delay(retval)
if publish_result:
......@@ -337,7 +337,8 @@ def trace_task(task, uuid, args, kwargs, request={}, **opts):
def _trace_task_ret(name, uuid, args, kwargs, request={}, app=None, **opts):
return trace_task((app or current_app).tasks[name],
app = app or current_app
return trace_task(app.tasks[name],
uuid, args, kwargs, request, app=app, **opts)
trace_task_ret = _trace_task_ret
......@@ -49,7 +49,13 @@ def appstr(app):
class Settings(ConfigurationView):
"""Celery settings object."""
"""Celery settings object.
.. seealso:
:ref:`configuration` for a full list of configuration keys.
......@@ -140,7 +140,7 @@ class AMQPBackend(BaseBackend):
def on_reply_declare(self, task_id):
return [self._create_binding(task_id)]
def wait_for(self, task_id, timeout=None, cache=True, propagate=True,
def wait_for(self, task_id, timeout=None, cache=True,
no_ack=True, on_interval=None,
......@@ -148,19 +148,14 @@ class AMQPBackend(BaseBackend):
cached_meta = self._cache.get(task_id)
if cache and cached_meta and \
cached_meta['status'] in READY_STATES:
meta = cached_meta
return cached_meta
meta = self.consume(task_id, timeout=timeout, no_ack=no_ack,
return self.consume(task_id, timeout=timeout, no_ack=no_ack,
except socket.timeout:
raise TimeoutError('The operation timed out.')
if meta['status'] in PROPAGATE_STATES and propagate:
raise self.exception_to_python(meta['result'])
# consume() always returns READY_STATE.
return meta['result']
def get_task_meta(self, task_id, backlog_limit=1000):
# Polling and using basic_get
with as (_, channel):
......@@ -213,7 +208,10 @@ class AMQPBackend(BaseBackend):
# Total time spent may exceed a single call to wait()
if timeout and now() - time_start >= timeout:
raise socket.timeout()
except socket.timeout:
if on_interval:
if results: # got event on the wanted channel.
......@@ -258,14 +256,12 @@ class AMQPBackend(BaseBackend):
results = deque()
push_result = results.append
push_cache = self._cache.__setitem__
to_exception = self.exception_to_python
decode_result = self.meta_from_decoded
def on_message(message):
body = message.decode()
body = decode_result(message.decode())
state, uid = getfields(body)
if state in READY_STATES:
body['result'] = to_exception(body['result'])
push_result(body) \
if uid in task_ids else push_cache(uid, body)
......@@ -182,6 +182,14 @@ class BaseBackend(object):
_, _, payload = dumps(data, serializer=self.serializer)
return payload
def meta_from_decoded(self, meta):
if meta['status'] in self.EXCEPTION_STATES:
meta['result'] = self.exception_to_python(meta['result'])
return meta
def decode_result(self, payload):
return self.meta_from_decoded(self.decode(payload))
def decode(self, payload):
payload = PY3 and payload or str(payload)
return loads(payload,
......@@ -190,8 +198,7 @@ class BaseBackend(object):
def wait_for(self, task_id,
timeout=None, propagate=True, interval=0.5, no_ack=True,
timeout=None, interval=0.5, no_ack=True, on_interval=None):
"""Wait for task and return its result.
If the task raises an exception, this exception
......@@ -206,14 +213,9 @@ class BaseBackend(object):
time_elapsed = 0.0
while 1:
status = self.get_status(task_id)
if status == states.SUCCESS:
return self.get_result(task_id)
elif status in states.PROPAGATE_STATES:
result = self.get_result(task_id)
if propagate:
raise result
return result
meta = self.get_task_meta(task_id)
if meta['status'] in states.READY_STATES:
return meta
if on_interval:
# avoid hammering the CPU checking status.
......@@ -271,11 +273,7 @@ class BaseBackend(object):
def get_result(self, task_id):
"""Get the result of a task."""
meta = self.get_task_meta(task_id)
if meta['status'] in self.EXCEPTION_STATES:
return self.exception_to_python(meta['result'])
return meta['result']
return self.get_task_meta(task_id).get('result')
def get_children(self, task_id):
"""Get the list of subtasks sent by a task."""
......@@ -434,17 +432,22 @@ class KeyValueStoreBackend(BaseBackend):
return bytes_to_str(key[len(prefix):])
return bytes_to_str(key)
def _filter_ready(self, values, READY_STATES=states.READY_STATES):
for k, v in values:
if v is not None:
v = self.decode_result(v)
if v['status'] in READY_STATES:
yield k, v
def _mget_to_results(self, values, keys):
if hasattr(values, 'items'):
# client returns dict so mapping preserved.
return dict((self._strip_prefix(k), self.decode(v))
for k, v in items(values)
if v is not None)
return dict((self._strip_prefix(k), v)
for k, v in self._filter_ready(items(values)))
# client returns list so need to recreate mapping.
return dict((bytes_to_str(keys[i]), self.decode(value))
for i, value in enumerate(values)
if value is not None)
return dict((bytes_to_str(keys[i]), v)