Commit 37ae472c authored by Michael Fladischer's avatar Michael Fladischer

New upstream version 4.2.1

parent 77badf7f
......@@ -260,3 +260,4 @@ Igor Kasianov, 2018/01/20
Derek Harland, 2018/02/15
Chris Mitchell, 2018/02/27
Josue Balandrano Coronel, 2018/05/24
Tom Booth, 2018/07/06
......@@ -8,8 +8,47 @@ This document contains change notes for bugfix releases in
the 4.x series, please see :ref:`whatsnew-4.2` for
an overview of what's new in Celery 4.2.
4.2.1
=====
:release-date: 2018-07-18 11:00 AM IST
:release-by: Omer Katz
- **Result Backend**: Fix deserialization of exceptions that are present in the producer codebase but not in the consumer codebase.
Contributed by **John Arnold**
- **Message Protocol Compatibility**: Fix error caused by an invalid (None) timelimit value in the message headers when migrating messages from 3.x to 4.x.
Contributed by **Robert Kopaczewski**
- **Result Backend**: Fix serialization of exception arguments when exception arguments are not JSON serializable by default.
Contributed by **Tom Booth**
- **Worker**: Fixed multiple issues with rate limited tasks
Maintain scheduling order.
Fix possible scheduling of a :class:`celery.worker.request.Request` with the wrong :class:`kombu.utils.limits.TokenBucket` which could cause tasks' rate limit to behave incorrectly.
Fix possible duplicated execution of tasks that were rate limited or if ETA/Countdown was provided for them.
Contributed by :github_user:`ideascf`
- **Worker**: Defensively handle invalid timelimit header values in requests.
Contributed by **Omer Katz**
Documentation fixes:
- **Matt Wiens**
- **Seunghun Lee**
- **Lewis M. Kabui**
- **Prathamesh Salunkhe**
4.2.0
=====
:release-date: 2018-06-10 21:30 PM IST
:release-by: Omer Katz
- **Task**: Add ``ignore_result`` as task execution option (#4709, #3834)
......
Metadata-Version: 2.1
Name: celery
Version: 4.2.0
Version: 4.2.1
Summary: Distributed Task Queue.
Home-page: http://celeryproject.org
Author: Ask Solem
......@@ -10,7 +10,7 @@ Description: .. image:: http://docs.celeryproject.org/en/latest/_images/celery-b
|build-status| |coverage| |license| |wheel| |pyversion| |pyimp| |ocbackerbadge| |ocsponsorbadge|
:Version: 4.2.0 (latentcall)
:Version: 4.2.1 (latentcall)
:Web: http://celeryproject.org/
:Download: https://pypi.org/project/celery/
:Source: https://github.com/celery/celery/
......@@ -48,7 +48,7 @@ Description: .. image:: http://docs.celeryproject.org/en/latest/_images/celery-b
What do I need?
===============
Celery version 4.1 runs on,
Celery version 4.2 runs on,
- Python (2.7, 3.4, 3.5, 3.6)
- PyPy (5.8)
......@@ -80,7 +80,7 @@ Description: .. image:: http://docs.celeryproject.org/en/latest/_images/celery-b
===========
If this is the first time you're trying to use Celery, or you're
new to Celery 4.1 coming from previous versions then you should read our
new to Celery 4.2 coming from previous versions then you should read our
getting started tutorials:
- `First steps with Celery`_
......@@ -438,7 +438,7 @@ Description: .. image:: http://docs.celeryproject.org/en/latest/_images/celery-b
|oc-contributors|
.. |oc-contributors| image:: https://opencollective.com/celery/contributors.svg?width=890&button=false
:target: graphs/contributors
:target: https://github.com/celery/celery/graphs/contributors
Backers
-------
......@@ -523,28 +523,28 @@ Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Operating System :: OS Independent
Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*
Provides-Extra: django
Provides-Extra: consul
Provides-Extra: elasticsearch
Provides-Extra: pymemcache
Provides-Extra: sqlalchemy
Provides-Extra: riak
Provides-Extra: pyro
Provides-Extra: mongodb
Provides-Extra: memcache
Provides-Extra: gevent
Provides-Extra: librabbitmq
Provides-Extra: msgpack
Provides-Extra: redis
Provides-Extra: solar
Provides-Extra: tblib
Provides-Extra: cassandra
Provides-Extra: pymemcache
Provides-Extra: slmq
Provides-Extra: sqs
Provides-Extra: elasticsearch
Provides-Extra: dynamodb
Provides-Extra: yaml
Provides-Extra: zookeeper
Provides-Extra: redis
Provides-Extra: librabbitmq
Provides-Extra: couchbase
Provides-Extra: couchdb
Provides-Extra: dynamodb
Provides-Extra: tblib
Provides-Extra: auth
Provides-Extra: riak
Provides-Extra: mongodb
Provides-Extra: eventlet
Provides-Extra: couchbase
Provides-Extra: sqlalchemy
Provides-Extra: solar
Provides-Extra: yaml
Provides-Extra: consul
Provides-Extra: gevent
Provides-Extra: msgpack
Provides-Extra: slmq
Provides-Extra: memcache
Provides-Extra: django
Provides-Extra: auth
Provides-Extra: sqs
......@@ -2,7 +2,7 @@
|build-status| |coverage| |license| |wheel| |pyversion| |pyimp| |ocbackerbadge| |ocsponsorbadge|
:Version: 4.2.0 (latentcall)
:Version: 4.2.1 (latentcall)
:Web: http://celeryproject.org/
:Download: https://pypi.org/project/celery/
:Source: https://github.com/celery/celery/
......@@ -40,7 +40,7 @@ in such a way that the client enqueues an URL to be requested by a worker.
What do I need?
===============
Celery version 4.1 runs on,
Celery version 4.2 runs on,
- Python (2.7, 3.4, 3.5, 3.6)
- PyPy (5.8)
......@@ -72,7 +72,7 @@ Get Started
===========
If this is the first time you're trying to use Celery, or you're
new to Celery 4.1 coming from previous versions then you should read our
new to Celery 4.2 coming from previous versions then you should read our
getting started tutorials:
- `First steps with Celery`_
......@@ -430,7 +430,7 @@ documentation.
|oc-contributors|
.. |oc-contributors| image:: https://opencollective.com/celery/contributors.svg?width=890&button=false
:target: graphs/contributors
:target: https://github.com/celery/celery/graphs/contributors
Backers
-------
......
Metadata-Version: 2.1
Name: celery
Version: 4.2.0
Version: 4.2.1
Summary: Distributed Task Queue.
Home-page: http://celeryproject.org
Author: Ask Solem
......@@ -10,7 +10,7 @@ Description: .. image:: http://docs.celeryproject.org/en/latest/_images/celery-b
|build-status| |coverage| |license| |wheel| |pyversion| |pyimp| |ocbackerbadge| |ocsponsorbadge|
:Version: 4.2.0 (latentcall)
:Version: 4.2.1 (latentcall)
:Web: http://celeryproject.org/
:Download: https://pypi.org/project/celery/
:Source: https://github.com/celery/celery/
......@@ -48,7 +48,7 @@ Description: .. image:: http://docs.celeryproject.org/en/latest/_images/celery-b
What do I need?
===============
Celery version 4.1 runs on,
Celery version 4.2 runs on,
- Python (2.7, 3.4, 3.5, 3.6)
- PyPy (5.8)
......@@ -80,7 +80,7 @@ Description: .. image:: http://docs.celeryproject.org/en/latest/_images/celery-b
===========
If this is the first time you're trying to use Celery, or you're
new to Celery 4.1 coming from previous versions then you should read our
new to Celery 4.2 coming from previous versions then you should read our
getting started tutorials:
- `First steps with Celery`_
......@@ -438,7 +438,7 @@ Description: .. image:: http://docs.celeryproject.org/en/latest/_images/celery-b
|oc-contributors|
.. |oc-contributors| image:: https://opencollective.com/celery/contributors.svg?width=890&button=false
:target: graphs/contributors
:target: https://github.com/celery/celery/graphs/contributors
Backers
-------
......@@ -523,28 +523,28 @@ Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Operating System :: OS Independent
Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*
Provides-Extra: django
Provides-Extra: consul
Provides-Extra: elasticsearch
Provides-Extra: pymemcache
Provides-Extra: sqlalchemy
Provides-Extra: riak
Provides-Extra: pyro
Provides-Extra: mongodb
Provides-Extra: memcache
Provides-Extra: gevent
Provides-Extra: librabbitmq
Provides-Extra: msgpack
Provides-Extra: redis
Provides-Extra: solar
Provides-Extra: tblib
Provides-Extra: cassandra
Provides-Extra: pymemcache
Provides-Extra: slmq
Provides-Extra: sqs
Provides-Extra: elasticsearch
Provides-Extra: dynamodb
Provides-Extra: yaml
Provides-Extra: zookeeper
Provides-Extra: redis
Provides-Extra: librabbitmq
Provides-Extra: couchbase
Provides-Extra: couchdb
Provides-Extra: dynamodb
Provides-Extra: tblib
Provides-Extra: auth
Provides-Extra: riak
Provides-Extra: mongodb
Provides-Extra: eventlet
Provides-Extra: couchbase
Provides-Extra: sqlalchemy
Provides-Extra: solar
Provides-Extra: yaml
Provides-Extra: consul
Provides-Extra: gevent
Provides-Extra: msgpack
Provides-Extra: slmq
Provides-Extra: memcache
Provides-Extra: django
Provides-Extra: auth
Provides-Extra: sqs
......@@ -14,7 +14,7 @@ from collections import namedtuple
SERIES = 'windowlicker'
__version__ = '4.2.0'
__version__ = '4.2.1'
__author__ = 'Ask Solem'
__contact__ = 'ask@celeryproject.org'
__homepage__ = 'http://celeryproject.org'
......
......@@ -21,6 +21,7 @@ from kombu.serialization import registry as serializer_registry
from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8
from kombu.utils.url import maybe_sanitize_url
import celery.exceptions
from celery import current_app, group, maybe_signature, states
from celery._state import get_current_task
from celery.exceptions import (ChordError, ImproperlyConfigured,
......@@ -32,6 +33,7 @@ from celery.utils.collections import BufferMap
from celery.utils.functional import LRUCache, arity_greater
from celery.utils.log import get_logger
from celery.utils.serialization import (create_exception_cls,
ensure_serializable,
get_pickleable_exception,
get_pickled_exception)
......@@ -235,7 +237,7 @@ class Backend(object):
if serializer in EXCEPTION_ABLE_CODECS:
return get_pickleable_exception(exc)
return {'exc_type': type(exc).__name__,
'exc_message': exc.args,
'exc_message': ensure_serializable(exc.args, self.encode),
'exc_module': type(exc).__module__}
def exception_to_python(self, exc):
......@@ -249,7 +251,11 @@ class Backend(object):
else:
exc_module = from_utf8(exc_module)
exc_type = from_utf8(exc['exc_type'])
cls = getattr(sys.modules[exc_module], exc_type)
try:
cls = getattr(sys.modules[exc_module], exc_type)
except KeyError:
cls = create_exception_cls(exc_type,
celery.exceptions.__name__)
exc_msg = exc['exc_message']
exc = cls(*exc_msg if isinstance(exc_msg, tuple) else exc_msg)
if self.serializer in EXCEPTION_ABLE_CODECS:
......
......@@ -361,7 +361,7 @@ class crontab(BaseSchedule):
- A (list of) integers from 1-31 that represents the days of the
month that execution should occur.
- A string representing a Crontab pattern. This may get pretty
advanced, such as ``day_of_month='2-30/3'`` (for every even
advanced, such as ``day_of_month='2-30/2'`` (for every even
numbered day) or ``day_of_month='1-7,15-21'`` (for the first and
third weeks of the month).
......
......@@ -56,6 +56,8 @@ def find_pickleable_exception(exc, loads=pickle.loads,
Arguments:
exc (BaseException): An exception instance.
loads: decoder to use.
dumps: encoder to use
Returns:
Exception: Nearest pickleable parent exception class
......@@ -84,6 +86,26 @@ def create_exception_cls(name, module, parent=None):
return subclass_exception(name, parent, module)
def ensure_serializable(items, encoder):
"""Ensure items will serialize.
For a given list of arbitrary objects, return the object
or a string representation, safe for serialization.
Arguments:
items (Iterable[Any]): Objects to serialize.
encoder (Callable): Callable function to serialize with.
"""
safe_exc_args = []
for arg in items:
try:
encoder(arg)
safe_exc_args.append(arg)
except Exception: # pylint: disable=broad-except
safe_exc_args.append(safe_repr(arg))
return tuple(safe_exc_args)
@python_2_unicode_compatible
class UnpickleableExceptionWrapper(Exception):
"""Wraps unpickleable exceptions.
......@@ -116,13 +138,7 @@ class UnpickleableExceptionWrapper(Exception):
exc_args = None
def __init__(self, exc_module, exc_cls_name, exc_args, text=None):
safe_exc_args = []
for arg in exc_args:
try:
pickle.dumps(arg)
safe_exc_args.append(arg)
except Exception: # pylint: disable=broad-except
safe_exc_args.append(safe_repr(arg))
safe_exc_args = ensure_serializable(exc_args, pickle.dumps)
self.exc_module = exc_module
self.exc_cls_name = exc_cls_name
self.exc_args = safe_exc_args
......
......@@ -269,43 +269,38 @@ class Consumer(object):
task_reserved(request)
self.on_task_request(request)
def _on_bucket_wakeup(self, bucket, tokens):
try:
request = bucket.pop()
except IndexError:
pass
else:
self._limit_move_to_pool(request)
self._schedule_oldest_bucket_request(bucket, tokens)
def _schedule_oldest_bucket_request(self, bucket, tokens):
try:
request = bucket.pop()
except IndexError:
pass
else:
return self._schedule_bucket_request(request, bucket, tokens)
def _schedule_bucket_request(self, request, bucket, tokens):
bucket.can_consume(tokens)
bucket.add(request)
pri = self._limit_order = (self._limit_order + 1) % 10
hold = bucket.expected_time(tokens)
self.timer.call_after(
hold, self._on_bucket_wakeup, (bucket, tokens),
priority=pri,
)
def _schedule_bucket_request(self, bucket):
while True:
try:
request, tokens = bucket.pop()
except IndexError:
# no request, break
break
if bucket.can_consume(tokens):
self._limit_move_to_pool(request)
continue
else:
# requeue to head, keep the order.
bucket.contents.appendleft((request, tokens))
pri = self._limit_order = (self._limit_order + 1) % 10
hold = bucket.expected_time(tokens)
self.timer.call_after(
hold, self._schedule_bucket_request, (bucket,),
priority=pri,
)
# no tokens, break
break
def _limit_task(self, request, bucket, tokens):
if bucket.contents:
return bucket.add(request)
return self._schedule_bucket_request(request, bucket, tokens)
bucket.add((request, tokens))
return self._schedule_bucket_request(bucket)
def _limit_post_eta(self, request, bucket, tokens):
self.qos.decrement_eventually()
if bucket.contents:
return bucket.add(request)
return self._schedule_bucket_request(request, bucket, tokens)
bucket.add((request, tokens))
return self._schedule_bucket_request(bucket)
def start(self):
blueprint = self.blueprint
......
......@@ -116,8 +116,9 @@ class Request(object):
self.parent_id = headers.get('parent_id')
if 'shadow' in headers:
self.name = headers['shadow'] or self.name
if 'timelimit' in headers:
self.time_limits = headers['timelimit']
timelimit = headers.get('timelimit', None)
if timelimit:
self.time_limits = timelimit
self.argsrepr = headers.get('argsrepr', '')
self.kwargsrepr = headers.get('kwargsrepr', '')
self.on_ack = on_ack
......
......@@ -48,7 +48,7 @@ def hybrid_to_proto2(message, body):
'eta': body.get('eta'),
'expires': body.get('expires'),
'retries': body.get('retries'),
'timelimit': body.get('timelimit'),
'timelimit': body.get('timelimit', (None, None)),
'argsrepr': body.get('argsrepr'),
'kwargsrepr': body.get('kwargsrepr'),
'origin': body.get('origin'),
......
:Version: 4.2.0 (latentcall)
:Version: 4.2.1 (latentcall)
:Web: http://celeryproject.org/
:Download: https://pypi.org/project/celery/
:Source: https://github.com/celery/celery/
......
......@@ -265,7 +265,7 @@ Some examples:
| | |
+-----------------------------------------+--------------------------------------------+
| ``crontab(0, 0,`` | Execute on every even numbered day. |
| ``day_of_month='2-30/3')`` | |
| ``day_of_month='2-30/2')`` | |
+-----------------------------------------+--------------------------------------------+
| ``crontab(0, 0,`` | Execute on the first and third weeks of |
| ``day_of_month='1-7,15-21')`` | the month. |
......@@ -273,7 +273,7 @@ Some examples:
| ``crontab(0, 0, day_of_month='11',`` | Execute on the eleventh of May every year. |
| ``month_of_year='5')`` | |
+-----------------------------------------+--------------------------------------------+
| ``crontab(0, 0,`` | Execute every day on the first month |
| ``crontab(0, 0,`` | Execute every day on the first month |
| ``month_of_year='*/3')`` | of every quarter. |
+-----------------------------------------+--------------------------------------------+
......
......@@ -330,7 +330,7 @@ Changing the automatic naming behavior
.. versionadded:: 4.0
There are some cases when the default automatic naming isn't suitable.
Consider you have many tasks within many different modules::
Consider having many tasks within many different modules::
project/
/__init__.py
......@@ -1018,7 +1018,7 @@ different strengths and weaknesses (see :ref:`task-result-backends`).
During its lifetime a task will transition through several possible states,
and each state may have arbitrary meta-data attached to it. When a task
moves into a new state the previous state is
forgotten about, but some transitions can be deducted, (e.g., a task now
forgotten about, but some transitions can be deduced, (e.g., a task now
in the :state:`FAILED` state, is implied to have been in the
:state:`STARTED` state at some point).
......@@ -1594,7 +1594,7 @@ yourself:
'celery.chord':
<@task: celery.chord>}
This is the list of tasks built-in to Celery. Note that tasks
This is the list of tasks built into Celery. Note that tasks
will only be registered when the module they're defined in is imported.
The default loader imports any modules listed in the
......@@ -1728,7 +1728,7 @@ different :func:`~celery.signature`'s.
You can read about chains and other powerful constructs
at :ref:`designing-workflows`.
By default celery will not enable you to run tasks within task synchronously
By default Celery will not enable you to run tasks within task synchronously
in rare or extreme cases you might have to do so.
**WARNING**:
enabling subtasks run synchronously is not recommended!
......@@ -1816,7 +1816,7 @@ system, like `memcached`_.
State
-----
Since celery is a distributed system, you can't know which process, or
Since Celery is a distributed system, you can't know which process, or
on what machine the task will be executed. You can't even know if the task will
run in a timely manner.
......@@ -1903,7 +1903,7 @@ There's a race condition if the task starts executing
before the transaction has been committed; The database object doesn't exist
yet!
The solution is to use the ``on_commit`` callback to launch your celery task
The solution is to use the ``on_commit`` callback to launch your Celery task
once all transactions have been committed successfully.
.. code-block:: python
......
This diff is collapsed.
......@@ -145,6 +145,17 @@ class test_prepare_exception:
y = self.b.exception_to_python(x)
assert isinstance(y, KeyError)
def test_json_exception_arguments(self):
self.b.serializer = 'json'
x = self.b.prepare_exception(Exception(object))
assert x == {
'exc_message': serialization.ensure_serializable(
(object,), self.b.encode),
'exc_type': Exception.__name__,
'exc_module': Exception.__module__}
y = self.b.exception_to_python(x)
assert isinstance(y, Exception)
def test_impossible(self):
self.b.serializer = 'pickle'
x = self.b.prepare_exception(Impossible())
......
from __future__ import absolute_import, unicode_literals
import json
import pickle
import sys
from datetime import date, datetime, time, timedelta
import pytest
import pytz
from case import Mock, mock
from case import Mock, mock, skip
from kombu import Queue
from celery.utils.serialization import (UnpickleableExceptionWrapper,
ensure_serializable,
get_pickleable_etype, jsonify)
......@@ -25,6 +28,23 @@ class test_AAPickle:
sys.modules['celery.utils.serialization'] = prev
class test_ensure_serializable:
@skip.unless_python3()
def test_json_py3(self):
assert (1, "<class 'object'>") == \
ensure_serializable([1, object], encoder=json.dumps)
@skip.if_python3()
def test_json_py2(self):
assert (1, "<type 'object'>") == \
ensure_serializable([1, object], encoder=json.dumps)
def test_pickle(self):
assert (1, object) == \
ensure_serializable((1, object), encoder=pickle.dumps)
class test_UnpickleExceptionWrapper:
def test_init(self):
......
......@@ -103,34 +103,69 @@ class test_Consumer:
c.on_send_event_buffered()
c.hub._ready.add.assert_called_with(c._flush_events)
def test_limit_task(self):
def test_schedule_bucket_request(self):
c = self.get_consumer()
c.timer = Mock()
bucket = Mock()
request = Mock()
bucket.pop = lambda: bucket.contents.popleft()
bucket.can_consume.return_value = True
bucket.contents = deque()
c._limit_task(request, bucket, 3)
bucket.can_consume.assert_called_with(3)
bucket.expected_time.assert_called_with(3)
c.timer.call_after.assert_called_with(
bucket.expected_time(), c._on_bucket_wakeup, (bucket, 3),
priority=c._limit_order,
)
with patch(
'celery.worker.consumer.consumer.Consumer._limit_move_to_pool'
) as reserv:
bucket.contents.append((request, 3))
c._schedule_bucket_request(bucket)
bucket.can_consume.assert_called_with(3)
reserv.assert_called_with(request)
bucket.can_consume.return_value = False
bucket.contents = deque()
bucket.expected_time.return_value = 3.33
bucket.contents.append((request, 4))
limit_order = c._limit_order
c._limit_task(request, bucket, 4)
c._schedule_bucket_request(bucket)
assert c._limit_order == limit_order + 1
bucket.can_consume.assert_called_with(4)
c.timer.call_after.assert_called_with(
3.33, c._on_bucket_wakeup, (bucket, 4),
3.33, c._schedule_bucket_request, (bucket,),
priority=c._limit_order,
)
bucket.expected_time.assert_called_with(4)
assert bucket.pop() == (request, 4)
bucket.contents = deque()
bucket.can_consume.reset_mock()
c._schedule_bucket_request(bucket)
bucket.can_consume.assert_not_called()
def test_limit_task(self):
c = self.get_consumer()
bucket = Mock()
request = Mock()
with patch(
'celery.worker.consumer.consumer.Consumer._schedule_bucket_request'
) as reserv:
c._limit_task(request, bucket, 1)
bucket.add.assert_called_with((request, 1))
reserv.assert_called_with(bucket)
def test_post_eta(self):
c = self.get_consumer()
c.qos = Mock()
bucket = Mock()
request = Mock()
with patch(
'celery.worker.consumer.consumer.Consumer._schedule_bucket_request'
) as reserv:
c._limit_post_eta(request, bucket, 1)
c.qos.decrement_eventually.assert_called_with()
bucket.add.assert_called_with((request, 1))
reserv.assert_called_with(bucket)
def test_start_blueprint_raises_EMFILE(self):
c = self.get_consumer()
......
......@@ -25,6 +25,7 @@ from celery.exceptions import (Ignore, InvalidTaskError, Reject, Retry,
from celery.five import monotonic
from celery.signals import task_revoked
from celery.worker import request as module
from celery.worker import strategy
from celery.worker.request import Request, create_request_cls
from celery.worker.request import logger as req_logger
from celery.worker.state import revoked
......@@ -1006,3 +1007,37 @@ class test_create_request_class(RequestCase):
assert job._apply_result
weakref_ref.assert_called_with(self.pool.apply_async())
assert job._apply_result is weakref_ref()
def test_execute_using_pool_with_none_timelimit_header(self):
from celery.app.trace import trace_task_ret as trace
weakref_ref = Mock(name='weakref.ref')
job = self.zRequest(id=uuid(),
revoked_tasks=set(),
ref=weakref_ref,
headers={'timelimit': None})
job.execute_using_pool(self.pool)
self.pool.apply_async.assert_called_with(
trace,
args=(job.type, job.id, job.request_dict, job.body,
job.content_type, job.content_encoding),
accept_callback=job.on_accepted,
timeout_callback=job.on_timeout,
callback=job.on_success,
error_callback=job.on_failure,
soft_timeout=self.task.soft_time_limit,
timeout=self.task.time_limit,
correlation_id=job.id,
)
assert job._apply_result
weakref_ref.assert_called_with(self.pool.apply_async())
assert job._apply_result is weakref_ref()
def test_execute_using_pool__defaults_of_hybrid_to_proto2(self):
weakref_ref = Mock(name='weakref.ref')
headers = strategy.hybrid_to_proto2('', {'id': uuid(),
'task': self.mytask.name})[1]
job = self.zRequest(revoked_tasks=set(), ref=weakref_ref, **headers)
job.execute_using_pool(self.pool)
assert job._apply_result
weakref_ref.assert_called_with(self.pool.apply_async())
assert job._apply_result is weakref_ref()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment