Commit a66f315a authored by Michael Fladischer's avatar Michael Fladischer

Import celery_3.1.20.orig.tar.gz

parent ecfab0e5
......@@ -195,3 +195,5 @@ Piotr Maślanka, 2015/08/24
Gerald Manipon, 2015/10/19
Krzysztof Bujniewicz, 2015/10/21
Sukrit Khera, 2015/10/26
Dave Smith, 2015/10/27
Dennis Brakhane, 2015/10/30
......@@ -8,6 +8,112 @@ This document contains change notes for bugfix releases in the 3.1.x series
(Cipater), please see :ref:`whatsnew-3.1` for an overview of what's
new in Celery 3.1.
.. _version-3.1.20:
- **Requirements**
- Now depends on :ref:`Kombu 3.0.33 <kombu:version-3.0.33>`.
- Now depends on :mod:`billiard`
Includes binary wheels for Microsoft Windows x86 and x86_64!
- **Task**: Error emails now uses ``utf-8`` charset by default (Issue #2737).
- **Task**: Retry now forwards original message headers (Issue #3017).
- **Worker**: Bootsteps can now hook into ``on_node_join``/``leave``/``lost``.
See :ref:`extending-consumer-gossip` for an example.
- **Events**: Fixed handling of DST timezones (Issue #2983).
- **Results**: Redis backend stopped respecting certain settings.
Contributed by Jeremy Llewellyn.
- **Results**: Database backend now properly supports JSON exceptions
(Issue #2441).
- **Results**: Redis ``new_join`` did not properly call task errbacks on chord
error (Issue #2796).
- **Results**: Restores Redis compatibility with redis-py < 2.10.0
(Issue #2903).
- **Results**: Fixed rare issue with chord error handling (Issue #2409).
- **Tasks**: Using queue-name values in :setting:`CELERY_ROUTES` now works
again (Issue #2987).
- **General**: Result backend password now sanitized in report output
(Issue #2812, Issue #2004).
- **Configuration**: Now gives helpful error message when the result backend
configuration points to a module, and not a class (Issue #2945).
- **Results**: Exceptions sent by JSON serialized workers are now properly
handled by pickle configured workers.
- **Programs**: ``celery control autoscale`` now works (Issue #2950).
- **Programs**: ``celery beat --detached`` now runs after fork callbacks.
- **General**: Fix for LRU cache implementation on Python 3.5 (Issue #2897).
Contributed by Dennis Brakhane.
Python 3.5's ``OrderedDict`` does not allow mutation while it is being
iterated over. This breaks "update" if it is called with a dict
larger than the maximum size.
This commit changes the code to a version that does not iterate over
the dict, and should also be a little bit faster.
- **Init scripts**: The beat init script now properly reports service as down
when no pid file can be found.
Eric Zarowny
- **Beat**: Added cleaning of corrupted scheduler files for some storage
backend errors (Issue #2985).
Fix contributed by Aleksandr Kuznetsov.
- **Beat**: Now syncs the schedule even if the schedule is empty.
Fix contributed by Colin McIntosh.
- **Supervisord**: Set higher process priority in supervisord example.
Contributed by George Tantiras.
- **Documentation**: Includes improvements by:
Caleb Mingle
Christopher Martin
Dieter Adriaenssens
Jason Veatch
Jeremy Cline
Juan Rossi
Kevin Harvey
Kevin McCarthy
Kirill Pavlov
Marco Buttu
Mher Movsisyan
Michael Floering
Nathaniel Varona
Rudy Attias
Ryan Luckie
Steven Parker
Tadej Janež
Tom S
.. _version-3.1.19:
Copyright (c) 2009, 2010, 2011, 2012 Ask Solem, and individual contributors. All Rights Reserved.
Copyright (c) 2015 Ask Solem & contributors. All rights reserved.
Copyright (c) 2012-2014 GoPivotal, Inc. All rights reserved.
Copyright (c) 2009, 2010, 2011, 2012 Ask Solem, and individual contributors. All rights reserved.
Celery is licensed under The BSD License (3 Clause, also known as
the new BSD license). The license is an OSI approved Open Source
......@@ -39,9 +40,9 @@ Documentation License
The documentation portion of Celery (the rendered contents of the
"docs" directory of a software distribution or checkout) is supplied
under the Creative Commons Attribution-Noncommercial-Share Alike 3.0
United States License as described by
under the "Creative Commons Attribution-ShareAlike 4.0
International" (CC BY-SA 4.0) License as described by
Metadata-Version: 1.1
Name: celery
Version: 3.1.19
Version: 3.1.20
Summary: Distributed Task Queue
Author: Ask Solem
......@@ -12,7 +12,7 @@ Description: =================================
.. image::
:Version: 3.1.19 (Cipater)
:Version: 3.1.20 (Cipater)
......@@ -4,7 +4,7 @@
.. image::
:Version: 3.1.19 (Cipater)
:Version: 3.1.20 (Cipater)
Metadata-Version: 1.1
Name: celery
Version: 3.1.19
Version: 3.1.20
Summary: Distributed Task Queue
Author: Ask Solem
......@@ -12,7 +12,7 @@ Description: =================================
.. image::
:Version: 3.1.19 (Cipater)
:Version: 3.1.20 (Cipater)
# -*- coding: utf-8 -*-
"""Distributed Task Queue"""
# :copyright: (c) 2015 Ask Solem and individual contributors.
# All rights # reserved.
# :copyright: (c) 2012-2014 GoPivotal, Inc., All rights reserved.
# :copyright: (c) 2009 - 2012 Ask Solem and individual contributors,
# All rights reserved.
# :copyright: (c) 2012-2014 GoPivotal, Inc., All rights reserved.
# :license: BSD (3 Clause), see LICENSE for more details.
from __future__ import absolute_import
......@@ -17,7 +19,7 @@ version_info_t = namedtuple(
SERIES = 'Cipater'
VERSION = version_info_t(3, 1, 19, '', '')
VERSION = version_info_t(3, 1, 20, '', '')
__version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
__author__ = 'Ask Solem'
__contact__ = ''
......@@ -26,7 +26,7 @@ def add_backend_cleanup_task(app):
If the configured backend requires periodic cleanup this task is also
automatically configured to run every day at midnight (requires
automatically configured to run every day at 4am (requires
:program:`celery beat` to be running).
......@@ -282,6 +282,15 @@ class Control(object):
return self.broadcast('pool_shrink', {'n': n}, destination, **kwargs)
def autoscale(self, max, min, destination=None, **kwargs):
"""Change worker(s) autoscale setting.
Supports the same arguments as :meth:`broadcast`.
return self.broadcast(
'autoscale', {'max': max, 'min': min}, destination, **kwargs)
def broadcast(self, command, arguments=None, destination=None,
connection=None, reply=False, timeout=1, limit=None,
callback=None, channel=None, **extra_kwargs):
......@@ -31,6 +31,8 @@ class MapRoute(object):
return dict([task])
except KeyError:
except ValueError:
return {'queue':[task]}
class Router(object):
......@@ -527,13 +527,18 @@ class Task(object):
if an error occurs while executing the task.
:keyword producer: :class:~@amqp.TaskProducer` instance to use.
:keyword add_to_parent: If set to True (default) and the task
is applied while executing another task, then the result
will be appended to the parent tasks ``request.children``
attribute. Trailing can also be disabled by default using the
:attr:`trail` attribute
:keyword publisher: Deprecated alias to ``producer``.
:keyword headers: Message headers to be sent in the
task (a :class:`dict`)
:rtype :class:`celery.result.AsyncResult`: if
:setting:`CELERY_ALWAYS_EAGER` is not set, otherwise
......@@ -575,6 +580,7 @@ class Task(object):
'soft_time_limit': limit_soft,
'time_limit': limit_hard,
'reply_to': request.reply_to,
'headers': request.headers,
{'queue': queue} if queue else (request.delivery_info or {})
......@@ -15,6 +15,8 @@ import re
from collections import Mapping
from types import ModuleType
from kombu.utils.url import maybe_sanitize_url
from celery.datastructures import ConfigurationView
from celery.five import items, string_t, values
from celery.platforms import pyimplementation
......@@ -184,9 +186,12 @@ def filter_hidden_settings(conf):
if isinstance(key, string_t):
return mask
if 'BROKER_URL' in key.upper():
elif 'BROKER_URL' in key.upper():
from kombu import Connection
return Connection(value).as_uri(mask=mask)
elif key.upper() in ('CELERY_RESULT_BACKEND', 'CELERY_BACKEND'):
return maybe_sanitize_url(value, mask=mask)
return value
return dict((k, maybe_censor(k, v)) for k, v in items(conf))
......@@ -216,7 +221,8 @@ def bugreport(app):
results=app.conf.CELERY_RESULT_BACKEND or 'disabled',
app.conf.CELERY_RESULT_BACKEND or 'disabled'),
......@@ -22,6 +22,7 @@ from functools import partial
from billiard import current_process
from kombu.utils.encoding import safe_str
from kombu.utils.url import maybe_sanitize_url
from celery import VERSION_BANNER, platforms, signals
from import trace
......@@ -227,7 +228,9 @@ class Worker(WorkController):
version=VERSION_BANNER,, or 'disabled',
results=maybe_sanitize_url( or 'disabled',
......@@ -9,7 +9,9 @@
from __future__ import absolute_import
import sys
import types
from celery.exceptions import ImproperlyConfigured
from celery.local import Proxy
from celery._state import current_app
from celery.five import reraise
......@@ -44,10 +46,14 @@ def get_backend_cls(backend=None, loader=None):
loader = loader or current_app.loader
aliases = dict(BACKEND_ALIASES, **loader.override_backends)
return symbol_by_name(backend, aliases)
cls = symbol_by_name(backend, aliases)
except ValueError as exc:
reraise(ValueError, ValueError(UNKNOWN_BACKEND.format(
backend, exc)), sys.exc_info()[2])
reraise(ImproperlyConfigured, ImproperlyConfigured(
UNKNOWN_BACKEND.format(backend, exc)), sys.exc_info()[2])
if isinstance(cls, types.ModuleType):
raise ImproperlyConfigured(UNKNOWN_BACKEND.format(
backend, 'is a Python module, not a backend class.'))
return cls
def get_backend_by_url(backend=None, loader=None):
......@@ -116,7 +116,7 @@ class BaseBackend(object):
status=states.SUCCESS, request=request)
def mark_as_failure(self, task_id, exc, traceback=None, request=None):
"""Mark task as executed with failure. Stores the execption."""
"""Mark task as executed with failure. Stores the exception."""
return self.store_result(task_id, exc, status=states.FAILURE,
traceback=traceback, request=request)
......@@ -166,11 +166,11 @@ class BaseBackend(object):
def exception_to_python(self, exc):
"""Convert serialized exception to Python exception."""
if exc:
if self.serializer in EXCEPTION_ABLE_CODECS:
return get_pickled_exception(exc)
elif not isinstance(exc, BaseException):
return create_exception_cls(
if not isinstance(exc, BaseException):
exc = create_exception_cls(
from_utf8(exc['exc_type']), __name__)(exc['exc_message'])
if self.serializer in EXCEPTION_ABLE_CODECS:
exc = get_pickled_exception(exc)
return exc
def prepare_value(self, result):
......@@ -241,6 +241,8 @@ class BaseBackend(object):
return self.persistent if p is None else p
def encode_result(self, result, status):
if isinstance(result, ExceptionInfo):
result = result.exception
if status in self.EXCEPTION_STATES and isinstance(result, Exception):
return self.prepare_exception(result)
......@@ -140,7 +140,7 @@ class DatabaseBackend(BaseBackend):
task = Task(task_id)
task.status = states.PENDING
task.result = None
return task.to_dict()
return self.meta_from_decoded(task.to_dict())
def _save_group(self, group_id, result):
......@@ -36,12 +36,6 @@ else: # pragma: no cover
__all__ = ['MongoBackend']
class Bunch(object):
def __init__(self, **kw):
class MongoBackend(BaseBackend):
host = 'localhost'
port = 27017
......@@ -63,6 +63,7 @@ class RedisBackend(KeyValueStoreBackend):
conf =
if self.redis is None:
raise ImproperlyConfigured(REDIS_MISSING)
self._client_capabilities = self._detect_client_capabilities()
# For compatibility with the old REDIS_* configuration keys.
def _get(key):
......@@ -227,31 +228,41 @@ class RedisBackend(KeyValueStoreBackend):
except Exception as exc:
error('Chord callback for %r raised: %r',, exc, exc_info=1)
exc=ChordError('Callback error: {0!r}'.format(exc)),
return self.chord_error_from_stack(
ChordError('Callback error: {0!r}'.format(exc)),
except ChordError as exc:
error('Chord %r raised: %r',, exc, exc_info=1)
app._tasks[callback.task].backend.fail_from_current_stack(, exc=exc,
return self.chord_error_from_stack(callback, exc)
except Exception as exc:
error('Chord %r raised: %r',, exc, exc_info=1)
app._tasks[callback.task].backend.fail_from_current_stack(, exc=ChordError('Join error: {0!r}'.format(exc)),
return self.chord_error_from_stack(
callback, ChordError('Join error: {0!r}'.format(exc)),
def _detect_client_capabilities(self, socket_connect_timeout=False):
if self.redis.VERSION < (2, 4, 4):
raise ImproperlyConfigured(
'Redis backend requires redis-py versions 2.4.4 or later. '
'You have {0.__version__}'.format(redis))
if self.redis.VERSION >= (2, 10):
socket_connect_timeout = True
return {'socket_connect_timeout': socket_connect_timeout}
def _create_client(self, socket_timeout=None, socket_connect_timeout=None,
return self.redis.Redis(
return self._new_redis_client(
socket_timeout=socket_timeout and float(socket_timeout),
socket_connect_timeout=socket_connect_timeout and float(
socket_connect_timeout), **params
def _new_redis_client(self, **params):
if not self._client_capabilities['socket_connect_timeout']:
params.pop('socket_connect_timeout', None)
return self.redis.Redis(connection_pool=self.ConnectionPool(**params))
def ConnectionPool(self):
if self._ConnectionPool is None:
......@@ -374,6 +374,12 @@ class PersistentScheduler(Scheduler):
def setup_schedule(self):
self._store = self._open_schedule()
# In some cases there may be different errors from a storage
# backend for corrupted files. Example - DBPageNotFoundError
# exception from bsddb. In such case the file will be
# successfully opened but the error will be raised on first key
# retrieving.
except Exception as exc:
self._store = self._destroy_open_corrupted_schedule(exc)
......@@ -476,6 +482,8 @@ class Service(object):
debug('beat: Waking up %s.',
humanize_seconds(interval, prefix='in '))
if self.scheduler.should_sync():
except (KeyboardInterrupt, SystemExit):
......@@ -41,7 +41,8 @@ def detach(path, argv, logfile=None, pidfile=None, uid=None,
gid=None, umask=None, working_directory=None, fake=False, app=None,
fake = 1 if C_FAKEFORK else fake
with detached(logfile, pidfile, uid, gid, umask, working_directory, fake):
with detached(logfile, pidfile, uid, gid, umask, working_directory, fake,
if executable is not None:
path = executable
......@@ -404,6 +404,8 @@ class AsynPool(_pool.Pool):
# as processes are recycled, or found lost elsewhere.
self._fileno_to_outq[proc.outqR_fd] = proc
self._fileno_to_synq[proc.synqW_fd] = proc
self.on_soft_timeout = self.on_hard_timeout = None
if self._timeout_handler:
self.on_soft_timeout = self._timeout_handler.on_soft_timeout
self.on_hard_timeout = self._timeout_handler.on_hard_timeout
......@@ -17,7 +17,7 @@ Experimental task class that buffers messages and processes them as a list.
**Simple Example**
A click counter that flushes the buffer every 100 messages, and every
seconds. Does not do anything with the data, but can easily be modified
10 seconds. Does not do anything with the data, but can easily be modified
to store it in a database.
.. code-block:: python
......@@ -132,10 +132,20 @@ class Rdb(Pdb):
def say(self, m):
print(m, file=self.out)
def __enter__(self):
return self
def __exit__(self, *exc_info):
def _close_session(self):
self.stdin, self.stdout = sys.stdin, sys.stdout = self._prev_handles
if self._handle is not None:
if self._client is not None:
if self._sock is not None:
self._sock.close() = False
......@@ -224,10 +224,11 @@ class BaseLoader(object):
def mail_admins(self, subject, body, fail_silently=False,
sender=None, to=None, host=None, port=None,
user=None, password=None, timeout=None,
use_ssl=False, use_tls=False):
use_ssl=False, use_tls=False, charset='utf-8'):
message = self.mail.Message(sender=sender, to=to,
mailer = self.mail.Mailer(host=host, port=port,
user=user, password=password,
timeout=timeout, use_ssl=use_ssl,
......@@ -36,6 +36,7 @@ _setproctitle = try_import('setproctitle')
resource = try_import('resource')
pwd = try_import('pwd')
grp = try_import('grp')
mputil = try_import('multiprocessing.util')
'IS_OSX', 'IS_WINDOWS', 'pyimplementation', 'LockFailed',
......@@ -331,7 +332,8 @@ class DaemonContext(object):
_is_open = False
def __init__(self, pidfile=None, workdir=None, umask=None,
fake=False, after_chdir=None, **kwargs):
fake=False, after_chdir=None, after_forkers=True,
if isinstance(umask, string_t):
# octal or decimal, depending on initial zero.
umask = int(umask, 8 if umask.startswith('0') else 10)
......@@ -339,6 +341,7 @@ class DaemonContext(object):
self.umask = umask
self.fake = fake
self.after_chdir = after_chdir
self.after_forkers = after_forkers
self.stdfds = (sys.stdin, sys.stdout, sys.stderr)
def redirect_to_null(self, fd):
......@@ -365,6 +368,8 @@ class DaemonContext(object):
for fd in self.stdfds:
if self.after_forkers and mputil is not None:
self._is_open = True
__enter__ = open
from __future__ import absolute_import
from celery import backends
from celery.exceptions import ImproperlyConfigured
from celery.backends.amqp import AMQPBackend
from celery.backends.cache import CacheBackend
from import AppCase, depends_on_current_app, patch
......@@ -36,5 +37,5 @@ class test_backends(AppCase):
def test_sym_raises_ValuError(self):
with patch('celery.backends.symbol_by_name') as sbn:
sbn.side_effect = ValueError()
with self.assertRaises(ValueError):
with self.assertRaises(ImproperlyConfigured):
......@@ -7,7 +7,7 @@ from pickle import loads, dumps
from celery import states
from celery.backends import mongodb as module
from celery.backends.mongodb import MongoBackend, Bunch, pymongo
from celery.backends.mongodb import MongoBackend, pymongo
from celery.exceptions import ImproperlyConfigured
from import (
AppCase, MagicMock, Mock, SkipTest, ANY,
......@@ -44,11 +44,6 @@ class test_MongoBackend(AppCase):
module.Binary = self._reset['Binary']
datetime.datetime = self._reset['datetime']
def test_Bunch(self):
x = Bunch(foo='foo', bar=2)
self.assertEqual(, 'foo')
self.assertEqual(, 2)
def test_init_no_mongodb(self):
prev, module.pymongo = module.pymongo, None
......@@ -98,6 +98,7 @@ class Redis(MockCallbacks):
class redis(object):
VERSION = (2, 4, 10)
Redis = Redis
class ConnectionPool(object):
......@@ -24,8 +24,10 @@ if not IS_WINDOWS:
detach('/bin/boo', ['a', 'b', 'c'], logfile='/var/log',
detached.assert_called_with('/var/log', '/var/pid', None, None,
None, None, False)
'/var/log', '/var/pid', None, None, None, None, False,
execv.assert_called_with('/bin/boo', ['/bin/boo', 'a', 'b', 'c'])
execv.side_effect = Exception('foo')
......@@ -444,11 +444,11 @@ class AppCase(Case):
assert sys.__stdout__
assert sys.__stderr__
this = self._get_test_name()
if isinstance(sys.stdout, LoggingProxy) or \
isinstance(sys.__stdout__, LoggingProxy):
if isinstance(sys.stdout, (LoggingProxy, Mock)) or \
isinstance(sys.__stdout__, (LoggingProxy, Mock)):
raise RuntimeError(CASE_LOG_REDIRECT_EFFECT.format(this, 'stdout'))
if isinstance(sys.stderr, LoggingProxy) or \
isinstance(sys.__stderr__, LoggingProxy):
if isinstance(sys.stderr, (LoggingProxy, Mock)) or \
isinstance(sys.__stderr__, (LoggingProxy, Mock)):
raise RuntimeError(CASE_LOG_REDIRECT_EFFECT.format(this, 'stderr'))
backend ='backend')
if backend is not None:
......@@ -8,14 +8,14 @@ from celery.contrib.rdb import (
from import Case, Mock, WhateverIO, patch, skip_if_pypy
from import AppCase, Mock, WhateverIO, patch, skip_if_pypy
class SockErr(socket.error):
errno = None
class test_Rdb(Case):
class test_Rdb(AppCase):
def test_debugger(self, Rdb):
......@@ -37,7 +37,7 @@ class test_Rdb(Case):
get_avail_port.return_value = (sock, 8000)
sock.accept.return_value = (Mock(), ['helu'])
out = WhateverIO()
rdb = Rdb(out=out)
with Rdb(out=out) as rdb:
self.assertIn('helu', out.getvalue())
......@@ -74,19 +74,23 @@ class test_Rdb(Case):
def test_get_avail_port(self, sock):
out = WhateverIO()
sock.return_value.accept.return_value = (Mock(), ['helu'])
with Rdb(out=out):
with patch('celery.contrib.rdb.current_process') as curproc: = 'PoolWorker-10'
with Rdb(out=out):
err = sock.return_value.bind.side_effect = SockErr()
err.errno = errno.ENOENT
with self.assertRaises(SockErr):