Commit 06ac5758 authored by SVN-Git Migration's avatar SVN-Git Migration

Imported Upstream version 3.1.11

parent 153b8df5
......@@ -157,3 +157,6 @@ Pepijn de Vos, 2014/01/15
Dan McGee, 2014/01/27
Paul Kilgo, 2014/01/28
Martin Davidsson, 2014/02/08
Chris Clark, 2014/02/20
Matthew Duggan, 2014/04/10
Brian Bouterse, 2014/04/10
......@@ -8,6 +8,225 @@ This document contains change notes for bugfix releases in the 3.1.x series
(Cipater), please see :ref:`whatsnew-3.1` for an overview of what's
new in Celery 3.1.
.. _version-3.1.11:
3.1.11
======
:release-date: 2014-04-16 11:00 P.M UTC
:release-by: Ask Solem
- Now compatible with RabbitMQ 3.3.0
You need to run Celery 3.1.11 or later when using RabbitMQ 3.3,
and if you use the ``librabbitmq`` module you also have to upgrade
to librabbitmq 1.5.0:
.. code-block:: bash
$ pip install -U librabbitmq
- **Requirements**:
- Now depends on :ref:`Kombu 3.0.15 <kombu:version-3.0.15>`.
- Now depends on `billiard 3.3.0.17`_.
- Bundle ``celery[librabbitmq]`` now depends on :mod:`librabbitmq` 1.5.0.
.. _`billiard 3.3.0.17`:
https://github.com/celery/billiard/blob/master/CHANGES.txt
- **Tasks**: The :setting:`CELERY_DEFAULT_DELIVERY_MODE` setting was being
ignored (Issue #1953).
- **Worker**: New :option:`--heartbeat-interval` can be used to change the
time (in seconds) between sending event heartbeats.
Contributed by Matthew Duggan and Craig Northway.
- **App**: Fixed memory leaks occurring when creating lots of temporary
app instances (Issue #1949).
- **MongoDB**: SSL configuration with non-MongoDB transport breaks MongoDB
results backend (Issue #1973).
Fix contributed by Brian Bouterse.
- **Logging**: The color formatter accidentally modified ``record.msg``
(Issue #1939).
- **Results**: Fixed problem with task trails being stored multiple times,
causing ``result.collect()`` to hang (Issue #1936, Issue #1943).
- **Results**: ``ResultSet`` now implements a ``.backend`` attribute for
compatibility with ``AsyncResult``.
- **Results**: ``.forget()`` now also clears the local cache.
- **Results**: Fixed problem with multiple calls to ``result._set_cache``
(Issue #1940).
- **Results**: ``join_native`` populated result cache even if disabled.
- **Results**: The YAML result serializer should now be able to handle storing
exceptions.
- **Worker**: No longer sends task error emails for expected errors (in
``@task(throws=(..., )))``.
- **Canvas**: Fixed problem with exception deserialization when using
the JSON serializer (Issue #1987).
- **Eventlet**: Fixes crash when ``celery.contrib.batches`` attempted to
cancel a non-existing timer (Issue #1984).
- Can now import ``celery.version_info_t``, and ``celery.five`` (Issue #1968).
.. _version-3.1.10:
3.1.10
======
:release-date: 2014-03-22 09:40 P.M UTC
:release-by: Ask Solem
- **Requirements**:
- Now depends on :ref:`Kombu 3.0.14 <kombu:version-3.0.14>`.
- **Results**:
Reliability improvements to the SQLAlchemy database backend. Previously the
connection from the MainProcess was improperly shared with the workers.
(Issue #1786)
- **Redis:** Important note about events (Issue #1882).
There is a new transport option for Redis that enables monitors
to filter out unwanted events. Enabling this option in the workers
will increase performance considerably:
.. code-block:: python
BROKER_TRANSPORT_OPTIONS = {'fanout_patterns': True}
Enabling this option means that your workers will not be able to see
workers with the option disabled (or is running an older version of
Celery), so if you do enable it then make sure you do so on all
nodes.
See :ref:`redis-caveats-fanout-patterns`.
This will be the default in Celery 3.2.
- **Results**: The :class:`@AsyncResult` object now keeps a local cache
of the final state of the task.
This means that the global result cache can finally be disabled,
and you can do so by setting :setting:`CELERY_MAX_CACHED_RESULTS` to
:const:`-1`. The lifetime of the cache will then be bound to the
lifetime of the result object, which will be the default behavior
in Celery 3.2.
- **Events**: The "Substantial drift" warning message is now logged once
per node name only (Issue #1802).
- **Worker**: Ability to use one log file per child process when using the
prefork pool.
This can be enabled by using the new ``%i`` and ``%I`` format specifiers
for the log file name. See :ref:`worker-files-process-index`.
- **Redis**: New experimental chord join implementation.
This is an optimization for chords when using the Redis result backend,
where the join operation is now considerably faster and using less
resources than the previous strategy.
The new option can be set in the result backend URL:
CELERY_RESULT_BACKEND = 'redis://localhost?new_join=1'
This must be enabled manually as it's incompatible
with workers and clients not using it, so be sure to enable
the option in all clients and workers if you decide to use it.
- **Multi**: With ``-opt:index`` (e.g. :option:`-c:1`) the index now always refers
to the position of a node in the argument list.
This means that referring to a number will work when specifying a list
of node names and not just for a number range:
.. code-block:: bash
celery multi start A B C D -c:1 4 -c:2-4 8
In this example ``1`` refers to node A (as it's the first node in the
list).
- **Signals**: The sender argument to ``Signal.connect`` can now be a proxy
object, which means that it can be used with the task decorator
(Issue #1873).
- **Task**: A regression caused the ``queue`` argument to ``Task.retry`` to be
ignored (Issue #1892).
- **App**: Fixed error message for :meth:`~@Celery.config_from_envvar`.
Fix contributed by Dmitry Malinovsky.
- **Canvas**: Chords can now contain a group of other chords (Issue #1921).
- **Canvas**: Chords can now be combined when using the amqp result backend
(a chord where the callback is also a chord).
- **Canvas**: Calling ``result.get()`` for a chain task will now complete
even if one of the tasks in the chain is ``ignore_result=True``
(Issue #1905).
- **Canvas**: Worker now also logs chord errors.
- **Canvas**: A chord task raising an exception will now result in
any errbacks (``link_error``) to the chord callback to also be called.
- **Results**: Reliability improvements to the SQLAlchemy database backend
(Issue #1786).
Previously the connection from the ``MainProcess`` was improperly
inherited by child processes.
Fix contributed by Ionel Cristian Mărieș.
- **Task**: Task callbacks and errbacks are now called using the group
primitive.
- **Task**: ``Task.apply`` now properly sets ``request.headers``
(Issue #1874).
- **Worker**: Fixed ``UnicodeEncodeError`` occuring when worker is started
by `supervisord`.
Fix contributed by Codeb Fan.
- **Beat**: No longer attempts to upgrade a newly created database file
(Issue #1923).
- **Beat**: New setting :setting:``CELERYBEAT_SYNC_EVERY`` can be be used
to control file sync by specifying the number of tasks to send between
each sync.
Contributed by Chris Clark.
- **Commands**: :program:`celery inspect memdump` no longer crashes
if the :mod:`psutil` module is not installed (Issue #1914).
- **Worker**: Remote control commands now always accepts json serialized
messages (Issue #1870).
- **Worker**: Gossip will now drop any task related events it receives
by mistake (Issue #1882).
.. _version-3.1.9:
3.1.9
......
Metadata-Version: 1.1
Name: celery
Version: 3.1.9
Version: 3.1.11
Summary: Distributed Task Queue
Home-page: http://celeryproject.org
Author: Ask Solem
......@@ -12,7 +12,7 @@ Description: =================================
.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
:Version: 3.1.9 (Cipater)
:Version: 3.1.11 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
......@@ -89,8 +89,8 @@ Description: =================================
.. _`Next steps`:
http://docs.celeryproject.org/en/latest/getting-started/next-steps.html
Celery is
==========
Celery is...
============
- **Simple**
......@@ -127,8 +127,8 @@ Description: =================================
Custom pool implementations, serializers, compression schemes, logging,
schedulers, consumers, producers, autoscalers, broker transports and much more.
It supports
============
It supports...
==============
- **Message Transports**
......@@ -136,7 +136,7 @@ Description: =================================
- MongoDB_ (experimental), Amazon SQS (experimental),
- CouchDB_ (experimental), SQLAlchemy_ (experimental),
- Django ORM (experimental), `IronMQ`_
- and more
- and more...
- **Concurrency**
......@@ -445,6 +445,7 @@ Classifier: Programming Language :: Python :: 2.6
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Programming Language :: Python :: Implementation :: Jython
......
......@@ -4,7 +4,7 @@
.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
:Version: 3.1.9 (Cipater)
:Version: 3.1.11 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
......@@ -81,8 +81,8 @@ getting started tutorials:
.. _`Next steps`:
http://docs.celeryproject.org/en/latest/getting-started/next-steps.html
Celery is
==========
Celery is...
============
- **Simple**
......@@ -119,8 +119,8 @@ Celery is…
Custom pool implementations, serializers, compression schemes, logging,
schedulers, consumers, producers, autoscalers, broker transports and much more.
It supports
============
It supports...
==============
- **Message Transports**
......@@ -128,7 +128,7 @@ It supports…
- MongoDB_ (experimental), Amazon SQS (experimental),
- CouchDB_ (experimental), SQLAlchemy_ (experimental),
- Django ORM (experimental), `IronMQ`_
- and more
- and more...
- **Concurrency**
......
Metadata-Version: 1.1
Name: celery
Version: 3.1.9
Version: 3.1.11
Summary: Distributed Task Queue
Home-page: http://celeryproject.org
Author: Ask Solem
......@@ -12,7 +12,7 @@ Description: =================================
.. image:: http://cloud.github.com/downloads/celery/celery/celery_128.png
:Version: 3.1.9 (Cipater)
:Version: 3.1.11 (Cipater)
:Web: http://celeryproject.org/
:Download: http://pypi.python.org/pypi/celery/
:Source: http://github.com/celery/celery/
......@@ -89,8 +89,8 @@ Description: =================================
.. _`Next steps`:
http://docs.celeryproject.org/en/latest/getting-started/next-steps.html
Celery is
==========
Celery is...
============
- **Simple**
......@@ -127,8 +127,8 @@ Description: =================================
Custom pool implementations, serializers, compression schemes, logging,
schedulers, consumers, producers, autoscalers, broker transports and much more.
It supports
============
It supports...
==============
- **Message Transports**
......@@ -136,7 +136,7 @@ Description: =================================
- MongoDB_ (experimental), Amazon SQS (experimental),
- CouchDB_ (experimental), SQLAlchemy_ (experimental),
- Django ORM (experimental), `IronMQ`_
- and more
- and more...
- **Concurrency**
......@@ -445,6 +445,7 @@ Classifier: Programming Language :: Python :: 2.6
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Programming Language :: Python :: Implementation :: Jython
......
pytz>dev
billiard>=3.3.0.14,<3.4
kombu>=3.0.12,<4.0
billiard>=3.3.0.17,<3.4
kombu>=3.0.15,<4.0
[zookeeper]
kazoo>=1.3.1
......@@ -9,7 +9,7 @@ kazoo>=1.3.1
msgpack-python>=0.3.0
[librabbitmq]
librabbitmq>=1.0.2
librabbitmq>=1.5.0
[slmq]
softlayer_messaging>=1.0.3
......
......@@ -14,7 +14,7 @@ version_info_t = namedtuple(
)
SERIES = 'Cipater'
VERSION = version_info_t(3, 1, 9, '', '')
VERSION = version_info_t(3, 1, 11, '', '')
__version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
__author__ = 'Ask Solem'
__contact__ = 'ask@celeryproject.org'
......@@ -127,9 +127,9 @@ def maybe_patch_concurrency(argv=sys.argv,
concurrency.get_implementation(pool)
# Lazy loading
from .five import recreate_module
from celery import five
old_module, new_module = recreate_module( # pragma: no cover
old_module, new_module = five.recreate_module( # pragma: no cover
__name__,
by_module={
'celery.app': ['Celery', 'bugreport', 'shared_task'],
......@@ -144,8 +144,9 @@ old_module, new_module = recreate_module( # pragma: no cover
__package__='celery', __file__=__file__,
__path__=__path__, __doc__=__doc__, __version__=__version__,
__author__=__author__, __contact__=__contact__,
__homepage__=__homepage__, __docformat__=__docformat__,
__homepage__=__homepage__, __docformat__=__docformat__, five=five,
VERSION=VERSION, SERIES=SERIES, VERSION_BANNER=VERSION_BANNER,
version_info_t=version_info_t,
maybe_patch_concurrency=maybe_patch_concurrency,
_find_option_with_arg=_find_option_with_arg,
)
......@@ -45,7 +45,8 @@ except ImportError: # XXX Py2.6
self._refs.discard(dirty.pop())
__all__ = ['set_default_app', 'get_current_app', 'get_current_task',
'get_current_worker_task', 'current_app', 'current_task']
'get_current_worker_task', 'current_app', 'current_task',
'connect_on_app_finalize']
#: Global default app used when no current app.
default_app = None
......@@ -53,9 +54,25 @@ default_app = None
#: List of all app instances (weakrefs), must not be used directly.
_apps = AppSet()
#: global set of functions to call whenever a new app is finalized
#: E.g. Shared tasks, and builtin tasks are created
#: by adding callbacks here.
_on_app_finalizers = set()
_task_join_will_block = False
def connect_on_app_finalize(callback):
_on_app_finalizers.add(callback)
return callback
def _announce_app_finalized(app):
callbacks = set(_on_app_finalizers)
for callback in callbacks:
callback(app)
def _set_task_join_will_block(blocks):
global _task_join_will_block
_task_join_will_block = blocks
......@@ -92,6 +109,11 @@ def _get_current_app():
))
return _tls.current_app or default_app
def _set_current_app(app):
_tls.current_app = app
C_STRICT_APP = os.environ.get('C_STRICT_APP')
if os.environ.get('C_STRICT_APP'): # pragma: no cover
def get_current_app():
......
......@@ -13,15 +13,12 @@ import os
from celery.local import Proxy
from celery import _state
from celery._state import (
set_default_app,
get_current_app as current_app,
get_current_task as current_task,
_get_active_apps,
_task_stack,
connect_on_app_finalize, set_default_app, _get_active_apps, _task_stack,
)
from celery.utils import gen_task_name
from .builtins import shared_task as _shared_task
from .base import Celery, AppPickler
__all__ = ['Celery', 'AppPickler', 'default_app', 'app_or_default',
......@@ -128,7 +125,9 @@ def shared_task(*args, **kwargs):
name = options.get('name')
# Set as shared task so that unfinalized apps,
# and future apps will load the task.
_shared_task(lambda app: app._task_from_fun(fun, **options))
connect_on_app_finalize(
lambda app: app._task_from_fun(fun, **options)
)
# Force all finalized apps to take this task as well.
for app in _get_active_apps():
......
......@@ -8,6 +8,8 @@
"""
from __future__ import absolute_import
import numbers
from datetime import timedelta
from weakref import WeakValueDictionary
......@@ -200,6 +202,7 @@ class TaskProducer(Producer):
exchange = exchange or self.exchange
self.queues = self.app.amqp.queues # shortcut
self.default_queue = self.app.amqp.default_queue
self._default_mode = self.app.conf.CELERY_DEFAULT_DELIVERY_MODE
super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
def publish_task(self, task_name, task_args=None, task_kwargs=None,
......@@ -235,6 +238,8 @@ class TaskProducer(Producer):
routing_key = routing_key or queue.routing_key
if declare is None and queue and not isinstance(queue, Broadcast):
declare = [queue]
if delivery_mode is None:
delivery_mode = self._default_mode
# merge default and custom policy
retry = self.retry if retry is None else retry
......@@ -252,7 +257,7 @@ class TaskProducer(Producer):
eta = now + timedelta(seconds=countdown)
if self.utc:
eta = to_utc(eta).astimezone(self.app.timezone)
if isinstance(expires, (int, float)):
if isinstance(expires, numbers.Real):
now = now or self.app.now()
expires = now + timedelta(seconds=expires)
if self.utc:
......
......@@ -26,26 +26,28 @@ from kombu.utils import cached_property, uuid
from celery import platforms
from celery import signals
from celery._state import (
_task_stack, _tls, get_current_app, set_default_app,
_register_app, get_current_worker_task,
_task_stack, get_current_app, _set_current_app, set_default_app,
_register_app, get_current_worker_task, connect_on_app_finalize,
_announce_app_finalized,
)
from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
from celery.five import items, values
from celery.loaders import get_loader_cls
from celery.local import PromiseProxy, maybe_evaluate
from celery.utils import shadowsig
from celery.utils.functional import first, maybe_list
from celery.utils.imports import instantiate, symbol_by_name
from celery.utils.objects import mro_lookup
from .annotations import prepare as prepare_annotations
from .builtins import shared_task, load_shared_tasks
from .defaults import DEFAULTS, find_deprecated_settings
from .registry import TaskRegistry
from .utils import (
AppPickler, Settings, bugreport, _unpickle_app, _unpickle_app_v2, appstr,
)
# Load all builtin tasks
from . import builtins # noqa
__all__ = ['Celery']
_EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
......@@ -59,6 +61,8 @@ and as such the configuration could not be loaded.
Please set this variable and make it point to
a configuration module."""
_after_fork_registered = False
def app_has_custom(app, attr):
return mro_lookup(app.__class__, attr, stop=(Celery, object),
......@@ -71,6 +75,29 @@ def _unpickle_appattr(reverse_name, args):
return get_current_app()._rgetattr(reverse_name)(*args)
def _global_after_fork():
# Previously every app would call:
# `register_after_fork(app, app._after_fork)`
# but this created a leak as `register_after_fork` stores concrete object
# references and once registered an object cannot be removed without
# touching and iterating over the private afterfork registry list.
#
# See Issue #1949
from celery import _state
from multiprocessing.util import info
for app in _state.apps:
try:
app._after_fork()
except Exception as exc:
info('after forker raised exception: %r' % (exc, ), exc_info=1)
def _ensure_after_fork():
global _after_fork_registered
_after_fork_registered = True
register_after_fork(_global_after_fork, _global_after_fork)
class Celery(object):
#: This is deprecated, use :meth:`reduce_keys` instead
Pickler = AppPickler
......@@ -148,7 +175,7 @@ class Celery(object):
_register_app(self)
def set_current(self):
_tls.current_app = self
_set_current_app(self)
def set_default(self):
set_default_app(self)
......@@ -184,8 +211,8 @@ class Celery(object):
# a differnt task instance. This makes sure it will always use
# the task instance from the current app.
# Really need a better solution for this :(
from . import shared_task as proxies_to_curapp
return proxies_to_curapp(*args, _force_evaluate=True, **opts)
from . import shared_task
return shared_task(*args, _force_evaluate=True, **opts)
def inner_create_task_cls(shared=True, filter=None, **opts):
_filt = filter # stupid 2to3
......@@ -194,7 +221,7 @@ class Celery(object):
if shared:
cons = lambda app: app._task_from_fun(fun, **opts)
cons.__name__ = fun.__name__
shared_task(cons)
connect_on_app_finalize(cons)
if self.accept_magic_kwargs: # compat mode
task = self._task_from_fun(fun, **opts)
if filter:
......@@ -238,7 +265,6 @@ class Celery(object):
'__doc__': fun.__doc__,
'__module__': fun.__module__,
'__wrapped__': fun}, **options))()
shadowsig(T, fun) # for inspect.getargspec
task = self._tasks[T.name] # return global instance.
return task
......@@ -248,7 +274,7 @@ class Celery(object):
if auto and not self.autofinalize:
raise RuntimeError('Contract breach: app not finalized')
self.finalized = True
load_shared_tasks(self)
_announce_app_finalized(self)
pending = self._pending
while pending:
......@@ -275,7 +301,8 @@ class Celery(object):
if not module_name:
if silent:
return False
raise ImproperlyConfigured(ERR_ENVVAR_NOT_SET.format(module_name))
raise ImproperlyConfigured(
ERR_ENVVAR_NOT_SET.format(variable_name))
return self.config_from_object(module_name, silent=silent, force=force)
def config_from_cmdline(self, argv, namespace='celery'):
......@@ -584,7 +611,7 @@ class Celery(object):
@property
def pool(self):
if self._pool is None:
register_after_fork(self, self._after_fork)
_ensure_after_fork()
limit = self.conf.BROKER_POOL_LIMIT
self._pool = self.connection().Pool(limit=limit)
return self._pool
......
......@@ -11,36 +11,16 @@ from __future__ import absolute_import
from collections import deque
from celery._state import get_current_worker_task
from celery._state import get_current_worker_task, connect_on_app_finalize
from celery.utils import uuid
from celery.utils.log import get_logger
__all__ = ['shared_task', 'load_shared_tasks']
__all__ = []
#: global list of functions defining tasks that should be
#: added to all apps.
_shared_tasks = set()
logger = get_logger(__name__)
def shared_task(constructor):
"""Decorator that specifies a function that generates a built-in task.
The function will then be called for every new app instance created
(lazily, so more exactly when the task registry for that app is needed).
The function must take a single ``app`` argument.
"""
_shared_tasks.add(constructor)
return constructor
def load_shared_tasks(app):
"""Create built-in tasks for an app instance."""
constructors = set(_shared_tasks)
for constructor in constructors:
constructor(app)
@shared_task
@connect_on_app_finalize
def add_backend_cleanup_task(app):
"""The backend cleanup task can be used to clean up the default result
backend.
......@@ -57,7 +37,7 @@ def add_backend_cleanup_task(app):
return backend_cleanup
@shared_task
@connect_on_app_finalize
def add_unlock_chord_task(app):
"""This task is used by result backends without native chord support.
......@@ -105,16 +85,17 @@ def add_unlock_chord_task(app):
)
except StopIteration:
reason = repr(exc)
app._tasks[callback.task].backend.fail_from_current_stack(
callback.id, exc=ChordError(reason),
)
logger.error('Chord %r raised: %r', group_id, exc, exc_info=1)
app.backend.chord_error_from_stack(callback,
ChordError(reason))
else: