Commit ecfab0e5 authored by Brian May's avatar Brian May

Import celery_3.1.19.orig.tar.gz

parent 1ca0fad3
......@@ -180,3 +180,18 @@ Bert Vanderbauwhede, 2014/12/18
John Anderson, 2014/12/27
Luke Burden, 2015/01/24
Mickaël Penhard, 2015/02/15
Mark Parncutt, 2015/02/16
Samuel Jaillet, 2015/03/24
Ilya Georgievsky, 2015/03/31
Fatih Sucu, 2015/04/17
James Pulec, 2015/04/19
Alexander Lebedev, 2015/04/25
Frantisek Holop, 2015/05/21
Feanil Patel, 2015/05/21
Jocelyn Delalande, 2015/06/03
Justin Patrin, 2015/08/06
Juan Rossi, 2015/08/10
Piotr Maślanka, 2015/08/24
Gerald Manipon, 2015/10/19
Krzysztof Bujniewicz, 2015/10/21
Sukrit Khera, 2015/10/26
......@@ -8,6 +8,87 @@ This document contains change notes for bugfix releases in the 3.1.x series
(Cipater), please see :ref:`whatsnew-3.1` for an overview of what's
new in Celery 3.1.
.. _version-3.1.19:
3.1.19
======
:release-date: 2015-10-26 01:00 P.M UTC
:release-by: Ask Solem
- **Requirements**
- Now depends on :ref:`Kombu 3.0.29 <kombu:version-3.0.29>`.
- Now depends on :mod:`billiard` 3.3.0.21.
- **Results**: Fixed MongoDB result backend URL parsing problem
(Issue celery/kombu#375).
- **Worker**: Task request now properly sets ``priority`` in delivery_info.
Fix contributed by Gerald Manipon.
- **Beat**: PyPy shelve may raise ``KeyError`` when setting keys
(Issue #2862).
- **Programs**: :program:`celery beat --deatched` now working on PyPy.
Fix contributed by Krzysztof Bujniewicz.
- **Results**: Redis result backend now ensures all pipelines are cleaned up.
Contributed by Justin Patrin.
- **Results**: Redis result backend now allows for timeout to be set in the
query portion of the result backend URL.
E.g. ``CELERY_RESULT_BACKEND = 'redis://?timeout=10'``
Contributed by Justin Patrin.
- **Results**: ``result.get`` now properly handles failures where the
exception value is set to :const:`None` (Issue #2560).
- **Prefork pool**: Fixed attribute error ``proc.dead``.
- **Worker**: Fixed worker hanging when gossip/heartbeat disabled
(Issue #1847).
Fix contributed by Aaron Webber and Bryan Helmig.
- **Results**: MongoDB result backend now supports pymongo 3.x
(Issue #2744).
Fix contributed by Sukrit Khera.
- **Results**: RPC/amqp backends did not deserialize exceptions properly
(Issue #2691).
Fix contributed by Sukrit Khera.
- **Programs**: Fixed problem with :program:`celery amqp`'s
``basic_publish`` (Issue #2013).
- **Worker**: Embedded beat now properly sets app for thread/process
(Issue #2594).
- **Documentation**: Many improvements and typos fixed.
Contributions by:
Carlos Garcia-Dubus
D. Yu
jerry
Jocelyn Delalande
Josh Kupershmidt
Juan Rossi
kanemra
Paul Pearce
Pavel Savchenko
Sean Wang
Seungha Kim
Zhaorong Ma
.. _version-3.1.18:
3.1.18
......
Metadata-Version: 1.1
Name: celery
Version: 3.1.18
Version: 3.1.19
Summary: Distributed Task Queue
Home-page: http://celeryproject.org
Author: Ask Solem
......@@ -12,7 +12,7 @@ Description: =================================
.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
:Version: 3.1.18 (Cipater)
:Version: 3.1.19 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
......
......@@ -4,7 +4,7 @@
.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
:Version: 3.1.18 (Cipater)
:Version: 3.1.19 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
......
Metadata-Version: 1.1
Name: celery
Version: 3.1.18
Version: 3.1.19
Summary: Distributed Task Queue
Home-page: http://celeryproject.org
Author: Ask Solem
......@@ -12,7 +12,7 @@ Description: =================================
.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
:Version: 3.1.18 (Cipater)
:Version: 3.1.19 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
......
pytz>dev
billiard>=3.3.0.20,<3.4
kombu>=3.0.25,<3.1
billiard>=3.3.0.21,<3.4
kombu>=3.0.29,<3.1
[zookeeper]
kazoo>=1.3.1
......
......@@ -7,6 +7,9 @@
from __future__ import absolute_import
import os
import sys
from collections import namedtuple
version_info_t = namedtuple(
......@@ -14,7 +17,7 @@ version_info_t = namedtuple(
)
SERIES = 'Cipater'
VERSION = version_info_t(3, 1, 18, '', '')
VERSION = version_info_t(3, 1, 19, '', '')
__version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
__author__ = 'Ask Solem'
__contact__ = 'ask@celeryproject.org'
......@@ -30,8 +33,6 @@ VERSION_BANNER = '{0} ({1})'.format(__version__, SERIES)
# -eof meta-
import os
import sys
if os.environ.get('C_IMPDEBUG'): # pragma: no cover
from .five import builtins
real_import = builtins.__import__
......@@ -127,7 +128,7 @@ def maybe_patch_concurrency(argv=sys.argv,
concurrency.get_implementation(pool)
# Lazy loading
from celery import five
from celery import five # noqa
old_module, new_module = five.recreate_module( # pragma: no cover
__name__,
......
......@@ -221,7 +221,8 @@ class Celery(object):
def _create_task_cls(fun):
if shared:
cons = lambda app: app._task_from_fun(fun, **opts)
def cons(app):
return app._task_from_fun(fun, **opts)
cons.__name__ = fun.__name__
connect_on_app_finalize(cons)
if self.accept_magic_kwargs: # compat mode
......
......@@ -263,7 +263,7 @@ class Control(object):
return self.broadcast('enable_events', {}, destination, **kwargs)
def disable_events(self, destination=None, **kwargs):
"""Tell all (or specific) workers to enable events."""
"""Tell all (or specific) workers to disable events."""
return self.broadcast('disable_events', {}, destination, **kwargs)
def pool_grow(self, n=1, destination=None, **kwargs):
......
......@@ -483,11 +483,12 @@ class Task(object):
:keyword retry: If enabled sending of the task message will be retried
in the event of connection loss or failure. Default
is taken from the :setting:`CELERY_TASK_PUBLISH_RETRY`
setting. Note you need to handle the
setting. Note that you need to handle the
producer/connection manually for this to work.
:keyword retry_policy: Override the retry policy used. See the
:setting:`CELERY_TASK_PUBLISH_RETRY` setting.
:setting:`CELERY_TASK_PUBLISH_RETRY_POLICY`
setting.
:keyword routing_key: Custom routing key used to route the task to a
worker server. If in combination with a
......@@ -599,7 +600,12 @@ class Task(object):
:keyword countdown: Time in seconds to delay the retry for.
:keyword eta: Explicit time and date to run the retry at
(must be a :class:`~datetime.datetime` instance).
:keyword max_retries: If set, overrides the default retry limit.
:keyword max_retries: If set, overrides the default retry limit for
this execution. Changes to this parameter do not propagate to
subsequent task retry attempts. A value of :const:`None`, means
"use the default", so if you want infinite retries you would
have to set the :attr:`max_retries` attribute of the task to
:const:`None` first.
:keyword time_limit: If set, overrides the default time limit.
:keyword soft_time_limit: If set, overrides the default soft
time limit.
......@@ -624,14 +630,14 @@ class Task(object):
>>> from imaginary_twitter_lib import Twitter
>>> from proj.celery import app
>>> @app.task()
... def tweet(auth, message):
>>> @app.task(bind=True)
... def tweet(self, auth, message):
... twitter = Twitter(oauth=auth)
... try:
... twitter.post_status_update(message)
... except twitter.FailWhale as exc:
... # Retry in 5 minutes.
... raise tweet.retry(countdown=60 * 5, exc=exc)
... raise self.retry(countdown=60 * 5, exc=exc)
Although the task will never return above as `retry` raises an
exception to notify the worker, we use `raise` in front of the retry
......@@ -863,9 +869,8 @@ class Task(object):
:param status: Current task state.
:param retval: Task return value/exception.
:param task_id: Unique id of the task.
:param args: Original arguments for the task that failed.
:param kwargs: Original keyword arguments for the task
that failed.
:param args: Original arguments for the task.
:param kwargs: Original keyword arguments for the task.
:keyword einfo: :class:`~billiard.einfo.ExceptionInfo`
instance, containing the traceback (if any).
......
......@@ -313,7 +313,8 @@ if not is_jython: # pragma: no cover
_shutdown_handler, sig='SIGINT', callback=on_SIGINT
)
else: # pragma: no cover
install_worker_int_handler = lambda *a, **kw: None
def install_worker_int_handler(*a, **kw):
pass
def _reload_current_worker():
......
......@@ -10,8 +10,6 @@ from __future__ import absolute_import
import sys
from kombu.utils.url import _parse_url
from celery.local import Proxy
from celery._state import current_app
from celery.five import reraise
......@@ -56,8 +54,9 @@ def get_backend_by_url(backend=None, loader=None):
url = None
if backend and '://' in backend:
url = backend
if '+' in url[:url.index('://')]:
scheme, _, _ = url.partition('://')
if '+' in scheme:
backend, url = url.split('+', 1)
else:
backend, _, _, _, _, _, _ = _parse_url(url)
backend = scheme
return get_backend_cls(backend, loader), url
......@@ -180,7 +180,8 @@ class AMQPBackend(BaseBackend):
raise self.BacklogLimitExceeded(task_id)
if latest:
payload = self._cache[task_id] = latest.payload
payload = self._cache[task_id] = \
self.meta_from_decoded(latest.payload)
latest.requeue()
return payload
else:
......
......@@ -165,11 +165,12 @@ class BaseBackend(object):
def exception_to_python(self, exc):
"""Convert serialized exception to Python exception."""
if self.serializer in EXCEPTION_ABLE_CODECS:
return get_pickled_exception(exc)
elif not isinstance(exc, BaseException):
return create_exception_cls(
from_utf8(exc['exc_type']), __name__)(exc['exc_message'])
if exc:
if self.serializer in EXCEPTION_ABLE_CODECS:
return get_pickled_exception(exc)
elif not isinstance(exc, BaseException):
return create_exception_cls(
from_utf8(exc['exc_type']), __name__)(exc['exc_message'])
return exc
def prepare_value(self, result):
......
......@@ -37,8 +37,8 @@ def _sqlalchemy_installed():
return sqlalchemy
_sqlalchemy_installed()
from sqlalchemy.exc import DatabaseError, InvalidRequestError
from sqlalchemy.orm.exc import StaleDataError
from sqlalchemy.exc import DatabaseError, InvalidRequestError # noqa
from sqlalchemy.orm.exc import StaleDataError # noqa
@contextmanager
......
......@@ -10,6 +10,16 @@ from __future__ import absolute_import
from datetime import datetime
from kombu.syn import detect_environment
from kombu.utils import cached_property
from celery import states
from celery.exceptions import ImproperlyConfigured
from celery.five import items, string_t
from celery.utils.timeutils import maybe_timedelta
from .base import BaseBackend
try:
import pymongo
except ImportError: # pragma: no cover
......@@ -23,16 +33,6 @@ if pymongo:
else: # pragma: no cover
Binary = None # noqa
from kombu.syn import detect_environment
from kombu.utils import cached_property
from celery import states
from celery.exceptions import ImproperlyConfigured
from celery.five import string_t
from celery.utils.timeutils import maybe_timedelta
from .base import BaseBackend
__all__ = ['MongoBackend']
......@@ -92,17 +92,26 @@ class MongoBackend(BaseBackend):
self.options = dict(config, **config.pop('options', None) or {})
# Set option defaults
if pymongo.version_tuple >= (3, ):
self.options.setdefault('maxPoolSize', self.max_pool_size)
else:
self.options.setdefault('max_pool_size', self.max_pool_size)
self.options.setdefault('auto_start_request', False)
for key, value in items(self._prepare_client_options()):
self.options.setdefault(key, value)
self.url = url
if self.url:
# Specifying backend as an URL
self.host = self.url
def _prepare_client_options(self):
if pymongo.version_tuple >= (3, ):
return {'maxPoolSize': self.max_pool_size}
else: # pragma: no cover
options = {
'max_pool_size': self.max_pool_size,
'auto_start_request': False
}
if detect_environment() != 'default':
options['use_greenlets'] = True
return options
def _get_connection(self):
"""Connect to the MongoDB server."""
if self._connection is None:
......@@ -120,8 +129,6 @@ class MongoBackend(BaseBackend):
url = 'mongodb://{0}:{1}'.format(url, self.port)
if url == 'mongodb://':
url = url + 'localhost'
if detect_environment() != 'default':
self.options['use_greenlets'] = True
self._connection = MongoClient(host=url, **self.options)
return self._connection
......
......@@ -160,13 +160,13 @@ class RedisBackend(KeyValueStoreBackend):
return self.ensure(self._set, (key, value), **retry_policy)
def _set(self, key, value):
pipe = self.client.pipeline()
if self.expires:
pipe.setex(key, value, self.expires)
else:
pipe.set(key, value)
pipe.publish(key, value)
pipe.execute()
with self.client.pipeline() as pipe:
if self.expires:
pipe.setex(key, value, self.expires)
else:
pipe.set(key, value)
pipe.publish(key, value)
pipe.execute()
def delete(self, key):
self.client.delete(key)
......@@ -205,21 +205,23 @@ class RedisBackend(KeyValueStoreBackend):
client = self.client
jkey = self.get_key_for_group(gid, '.j')
result = self.encode_result(result, state)
_, readycount, _ = client.pipeline() \
.rpush(jkey, self.encode([1, tid, state, result])) \
.llen(jkey) \
.expire(jkey, 86400) \
.execute()
with client.pipeline() as pipe:
_, readycount, _ = pipe \
.rpush(jkey, self.encode([1, tid, state, result])) \
.llen(jkey) \
.expire(jkey, 86400) \
.execute()
try:
callback = maybe_signature(request.chord, app=app)
total = callback['chord_size']
if readycount == total:
decode, unpack = self.decode, self._unpack_chord_result
resl, _ = client.pipeline() \
.lrange(jkey, 0, total) \
.delete(jkey) \
.execute()
with client.pipeline() as pipe:
resl, _, = pipe \
.lrange(jkey, 0, total) \
.delete(jkey) \
.execute()
try:
callback.delay([unpack(tup, decode) for tup in resl])
except Exception as exc:
......@@ -240,6 +242,16 @@ class RedisBackend(KeyValueStoreBackend):
callback.id, exc=ChordError('Join error: {0!r}'.format(exc)),
)
def _create_client(self, socket_timeout=None, socket_connect_timeout=None,
**params):
return self.redis.Redis(
connection_pool=self.ConnectionPool(
socket_timeout=socket_timeout and float(socket_timeout),
socket_connect_timeout=socket_connect_timeout and float(
socket_connect_timeout),
**params),
)
@property
def ConnectionPool(self):
if self._ConnectionPool is None:
......@@ -248,9 +260,7 @@ class RedisBackend(KeyValueStoreBackend):
@cached_property
def client(self):
return self.redis.Redis(
connection_pool=self.ConnectionPool(**self.connparams),
)
return self._create_client(**self.connparams)
def __reduce__(self, args=(), kwargs={}):
return super(RedisBackend, self).__reduce__(
......
......@@ -174,9 +174,9 @@ class Scheduler(object):
Publisher=None, lazy=False, sync_every_tasks=None, **kwargs):
self.app = app
self.data = maybe_evaluate({} if schedule is None else schedule)
self.max_interval = (max_interval
or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
or self.max_interval)
self.max_interval = (max_interval or
app.conf.CELERYBEAT_MAX_LOOP_INTERVAL or
self.max_interval)
self.sync_every_tasks = (
app.conf.CELERYBEAT_SYNC_EVERY if sync_every_tasks is None
else sync_every_tasks)
......@@ -362,22 +362,31 @@ class PersistentScheduler(Scheduler):
with platforms.ignore_errno(errno.ENOENT):
os.remove(self.schedule_filename + suffix)
def _open_schedule(self):
return self.persistence.open(self.schedule_filename, writeback=True)
def _destroy_open_corrupted_schedule(self, exc):
error('Removing corrupted schedule file %r: %r',
self.schedule_filename, exc, exc_info=True)
self._remove_db()
return self._open_schedule()
def setup_schedule(self):
try:
self._store = self.persistence.open(self.schedule_filename,
writeback=True)
self._store = self._open_schedule()
except Exception as exc:
error('Removing corrupted schedule file %r: %r',
self.schedule_filename, exc, exc_info=True)
self._remove_db()
self._store = self.persistence.open(self.schedule_filename,
writeback=True)
else:
self._store = self._destroy_open_corrupted_schedule(exc)
for _ in (1, 2):
try:
self._store['entries']
except KeyError:
# new schedule db
self._store['entries'] = {}
try:
self._store['entries'] = {}
except KeyError as exc:
self._store = self._destroy_open_corrupted_schedule(exc)
continue
else:
if '__version__' not in self._store:
warning('DB Reset: Account for new __version__ field')
......@@ -388,6 +397,7 @@ class PersistentScheduler(Scheduler):
elif 'utc_enabled' not in self._store:
warning('DB Reset: Account for new utc_enabled field')
self._store.clear() # remove schedule at 3.0.9 upgrade
break
tz = self.app.conf.CELERY_TIMEZONE
stored_tz = self._store.get('tz')
......@@ -435,8 +445,8 @@ class Service(object):
def __init__(self, app, max_interval=None, schedule_filename=None,
scheduler_cls=None):
self.app = app
self.max_interval = (max_interval
or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL)
self.max_interval = (max_interval or
app.conf.CELERYBEAT_MAX_LOOP_INTERVAL)
self.scheduler_cls = scheduler_cls or self.scheduler_cls
self.schedule_filename = (
schedule_filename or app.conf.CELERYBEAT_SCHEDULE_FILENAME)
......@@ -497,13 +507,15 @@ class Service(object):
class _Threaded(Thread):
"""Embedded task scheduler using threading."""
def __init__(self, *args, **kwargs):
def __init__(self, app, **kwargs):
super(_Threaded, self).__init__()
self.service = Service(*args, **kwargs)
self.app = app
self.service = Service(app, **kwargs)
self.daemon = True
self.name = 'Beat'
def run(self):
self.app.set_current()
self.service.start()
def stop(self):
......@@ -517,9 +529,10 @@ except NotImplementedError: # pragma: no cover
else:
class _Process(Process): # noqa
def __init__(self, *args, **kwargs):
def __init__(self, app, **kwargs):
super(_Process, self).__init__()
self.service = Service(*args, **kwargs)
self.app = app
self.service = Service(app, **kwargs)
self.name = 'Beat'
def run(self):
......@@ -527,6 +540,8 @@ else:
platforms.close_open_fds([
sys.__stdin__, sys.__stdout__, sys.__stderr__,
] + list(iter_open_logger_fds()))
self.app.set_default()
self.app.set_current()
self.service.start(embedded_process=True)
def stop(self):
......@@ -534,7 +549,7 @@ else:
self.terminate()
def EmbeddedService(*args, **kwargs):
def EmbeddedService(app, max_interval=None, **kwargs):
"""Return embedded clock service.
:keyword thread: Run threaded instead of as a separate process.
......@@ -544,6 +559,5 @@ def EmbeddedService(*args, **kwargs):
if kwargs.pop('thread', False) or _Process is None:
# Need short max interval to be able to stop thread
# in reasonable time.
kwargs.setdefault('max_interval', 1)
return _Threaded(*args, **kwargs)
return _Process(*args, **kwargs)
return _Threaded(app, max_interval=1, **kwargs)
return _Process(app, max_interval=max_interval, **kwargs)
......@@ -182,6 +182,16 @@ class AMQShell(cmd.Cmd):
'basic.ack': Spec(('delivery_tag', int)),
}
def _prepare_spec(self, conn):
# XXX Hack to fix Issue #2013
from amqp import Connection, Message
if isinstance(conn.connection, Connection):
self.amqp['basic.publish'] = Spec(('msg', Message),
('exchange', str),
('routing_key', str),
('mandatory', bool, 'no'),
('immediate', bool, 'no'))
def __init__(self, *args, **kwargs):
self.connect = kwargs.pop('connect')
self.silent = kwargs.pop('silent', False)
......@@ -296,6 +306,7 @@ class AMQShell(cmd.Cmd):
def _reconnect(self):
"""Re-establish connection to the AMQP server."""
self.conn = self.connect(self.conn)
self._prepare_spec(self.conn)
self.chan = self.conn.default_channel
self.needs_reconnect = False
......
......@@ -87,9 +87,9 @@ class beat(Command):
default=c.CELERYBEAT_SCHEDULE_FILENAME),
Option('--max-interval', type='float'),
Option('-S', '--scheduler', dest='scheduler_cls'),
Option('-l', '--loglevel', default=c.CELERYBEAT_LOG_LEVEL))
+ daemon_options(default_pidfile='celerybeat.pid')
+ tuple(self.app.user_options['beat'])
Option('-l', '--loglevel', default=c.CELERYBEAT_LOG_LEVEL)) +
daemon_options(default_pidfile='celerybeat.pid') +
tuple(self.app.user_options['beat'])
)
......
......@@ -115,7 +115,8 @@ class list_(Command):
except NotImplementedError:
raise self.Error('Your transport cannot list bindings.')
fmt = lambda q, e, r: self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
def fmt(q, e, r):
return self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
fmt('Queue', 'Exchange', 'Routing Key')
fmt('-' * 16, '-' * 16, '-' * 16)
for b in bindings:
......
......@@ -125,9 +125,9 @@ class events(Command):
Option('-F', '--frequency', '--freq',
type='float', default=1.0),
Option('-r', '--maxrate'),
Option('-l', '--loglevel', default='INFO'))
+ daemon_options(default_pidfile='celeryev.pid')
+ tuple(self.app.user_options['events'])
Option('-l', '--loglevel', default='INFO')) +
daemon_options(default_pidfile='celeryev.pid') +
tuple(self.app.user_options['events'])
)
......
......@@ -576,7 +576,7 @@ class AsynPool(_pool.Pool):
def on_process_down(proc):
"""Called when a worker process exits."""
if proc.dead:
if getattr(proc, 'dead', None):
return
process_flush_queues(proc)
_remove_from_index(
......
......@@ -29,10 +29,10 @@ for mod in (mod for mod in sys.modules if mod.startswith(RACE_MODS)):
warnings.warn(RuntimeWarning(W_RACE % side))
from celery import signals
from celery.utils import timer2
from celery import signals # noqa
from celery.utils import timer2 # noqa
from . import base
from . import base # noqa