Commit 0514c88c authored by Ana Rodríguez López's avatar Ana Rodríguez López

Import upstream version 0.2

parents
[run]
include =
celery_batches/*
t/*
*.pyc
# Build related.
_build
build
dist
celery_batches.egg-info
# Testing related.
.cache
.pytest_cache
.tox
# Coverage related.
.coverage
htmlcov
language: python
sudo: required
dist: trusty
cache: pip
python:
- '2.7'
- '3.4'
- '3.5'
- '3.6'
- 'pypy'
- 'pypy3'
os:
- linux
env:
- CELERY_VERSION=40
- CELERY_VERSION=41
- CELERY_VERSION=master
before_install:
- export TOXENV=${TRAVIS_PYTHON_VERSION}-celery${CELERY_VERSION}
- |
if [[ "$TOXENV" =~ "pypy" ]]; then
export PYENV_ROOT="$HOME/.pyenv"
if [ -f "$PYENV_ROOT/bin/pyenv" ]; then
cd "$PYENV_ROOT" && git pull
else
rm -rf "$PYENV_ROOT" && git clone --depth 1 https://github.com/pyenv/pyenv.git "$PYENV_ROOT"
fi
"$PYENV_ROOT/bin/pyenv" install "$PYPY_VERSION"
virtualenv --python="$PYENV_ROOT/versions/$PYPY_VERSION/bin/python" "$HOME/virtualenvs/$PYPY_VERSION"
source "$HOME/virtualenvs/$PYPY_VERSION/bin/activate"
which python
fi
after_success:
- |
if [[ -v MATRIX_TOXENV || "$TOXENV" =~ "pypy" ]]; then
.tox/$TOXENV/bin/coverage xml
.tox/$TOXENV/bin/codecov -e TOXENV
fi;
install: pip install -U tox
script: tox -v
.. :changelog:
Changelog
#########
0.2 2018-04-20
==============
* Add support for protocol v2.
* Adds tests.
* Fixes some documentation issues.
0.1 2018-03-23
==============
* The initial released version, includes changes to make it a separate package,
etc.
* Batch tasks now call pre- and post-run signals.
celery-final
============
* The final version of ``celery.contrib.batches`` before it was removed in
4b3ab708778a3772d24bb39142b7e9d5b94c488b.
Copyright (c) 2015-2016 Ask Solem & contributors. All rights reserved.
Copyright (c) 2012-2014 GoPivotal, Inc. All rights reserved.
Copyright (c) 2009, 2010, 2011, 2012 Ask Solem, and individual contributors. All rights reserved.
Celery is licensed under The BSD License (3 Clause, also known as
the new BSD license). The license is an OSI approved Open Source
license and is GPL-compatible(1).
The license text can also be found here:
http://www.opensource.org/licenses/BSD-3-Clause
License
=======
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Ask Solem, nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Ask Solem OR CONTRIBUTORS
BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
Documentation License
=====================
The documentation portion of Celery (the rendered contents of the
"docs" directory of a software distribution or checkout) is supplied
under the "Creative Commons Attribution-ShareAlike 4.0
International" (CC BY-SA 4.0) License as described by
https://creativecommons.org/licenses/by-sa/4.0/
Footnotes
=========
(1) A GPL-compatible license makes it possible to
combine Celery with other software that is released
under the GPL, it does not mean that we're distributing
Celery under the GPL license. The BSD license, unlike the GPL,
let you distribute a modified version without making your
changes open source.
Celery Batches
==============
Celery Batches provides a ``Task`` class that allows processing of multiple
Celery task calls together as a list. The buffer of tasks calls is flushed on a
timer and based on the number of queued tasks.
History
=======
Celery Batches was part of Celery (as ``celery.contrib.batches``) until Celery
4.0. This is repository includes that history. The Batches code has been updated
to maintain compatible with newer versions of Celery and other fixes. See the
Changelog for details.
# -*- coding: utf-8 -*-
"""
celery_batches
==============
Experimental task class that buffers messages and processes them as a list.
.. warning::
For this to work you have to set
:setting:`worker_prefetch_multiplier` to zero, or some value where
the final multiplied value is higher than ``flush_every``.
In the future we hope to add the ability to direct batching tasks
to a channel with different QoS requirements than the task channel.
**Simple Example**
A click counter that flushes the buffer every 100 messages, and every
10 seconds. Does not do anything with the data, but can easily be modified
to store it in a database.
.. code-block:: python
# Flush after 100 messages, or 10 seconds.
@app.task(base=Batches, flush_every=100, flush_interval=10)
def count_click(requests):
from collections import Counter
count = Counter(request.kwargs['url'] for request in requests)
for url, count in count.items():
print('>>> Clicks: {0} -> {1}'.format(url, count))
Then you can ask for a click to be counted by doing::
>>> count_click.delay('http://example.com')
**Example returning results**
An interface to the Web of Trust API that flushes the buffer every 100
messages, and every 10 seconds.
.. code-block:: python
import requests
from urlparse import urlparse
from celery.contrib.batches import Batches
wot_api_target = 'https://api.mywot.com/0.4/public_link_json'
@app.task(base=Batches, flush_every=100, flush_interval=10)
def wot_api(requests):
sig = lambda url: url
reponses = wot_api_real(
(sig(*request.args, **request.kwargs) for request in requests)
)
# use mark_as_done to manually return response data
for response, request in zip(reponses, requests):
app.backend.mark_as_done(request.id, response)
def wot_api_real(urls):
domains = [urlparse(url).netloc for url in urls]
response = requests.get(
wot_api_target,
params={'hosts': ('/').join(set(domains)) + '/'}
)
return [response.json[domain] for domain in domains]
Using the API is done as follows::
>>> wot_api.delay('http://example.com')
.. note::
If you don't have an ``app`` instance then use the current app proxy
instead::
from celery import current_app
current_app.backend.mark_as_done(request.id, response)
"""
from __future__ import absolute_import, unicode_literals
from itertools import count
from celery import signals, states
from celery.five import Empty, Queue
from celery.task import Task
from celery.utils import noop
from celery.utils.log import get_logger
from celery.worker.request import Request
from celery.worker.strategy import proto1_to_proto2
from kombu.five import buffer_t
from kombu.utils.uuid import uuid
__all__ = ['Batches']
logger = get_logger(__name__)
def consume_queue(queue):
"""Iterator yielding all immediately available items in a
:class:`Queue.Queue`.
The iterator stops as soon as the queue raises :exc:`Queue.Empty`.
*Examples*
>>> q = Queue()
>>> map(q.put, range(4))
>>> list(consume_queue(q))
[0, 1, 2, 3]
>>> list(consume_queue(q))
[]
"""
get = queue.get_nowait
while 1:
try:
yield get()
except Empty:
break
send_prerun = signals.task_prerun.send
send_postrun = signals.task_postrun.send
SUCCESS = states.SUCCESS
FAILURE = states.FAILURE
def apply_batches_task(task, args, loglevel, logfile):
# Mimics some of the functionality found in celery.app.trace.trace_task.
prerun_receivers = signals.task_prerun.receivers
postrun_receivers = signals.task_postrun.receivers
# Corresponds to multiple requests, so generate a new UUID.
task_id = uuid()
if prerun_receivers:
send_prerun(sender=task, task_id=task_id, task=task,
args=args, kwargs={})
task.push_request(loglevel=loglevel, logfile=logfile)
try:
result = task(*args)
state = SUCCESS
except Exception as exc:
result = None
state = FAILURE
logger.error('Error: %r', exc, exc_info=True)
finally:
task.pop_request()
if postrun_receivers:
send_postrun(sender=task, task_id=task_id, task=task,
args=args, kwargs={},
retval=result, state=state)
return result
class SimpleRequest(object):
"""Pickleable request."""
#: task id
id = None
#: task name
name = None
#: positional arguments
args = ()
#: keyword arguments
kwargs = {}
#: message delivery information.
delivery_info = None
#: worker node name
hostname = None
def __init__(self, id, name, args, kwargs, delivery_info, hostname):
self.id = id
self.name = name
self.args = args
self.kwargs = kwargs
self.delivery_info = delivery_info
self.hostname = hostname
@classmethod
def from_request(cls, request):
# Support both protocol v1 and v2.
args, kwargs, embed = request._payload
return cls(request.id, request.name, args,
kwargs, request.delivery_info, request.hostname)
class Batches(Task):
abstract = True
#: Maximum number of message in buffer.
flush_every = 10
#: Timeout in seconds before buffer is flushed anyway.
flush_interval = 30
def __init__(self):
self._buffer = Queue()
self._count = count(1)
self._tref = None
self._pool = None
def run(self, requests):
raise NotImplementedError('must implement run(requests)')
def Strategy(self, task, app, consumer):
self._pool = consumer.pool
hostname = consumer.hostname
eventer = consumer.event_dispatcher
Req = Request
connection_errors = consumer.connection_errors
timer = consumer.timer
put_buffer = self._buffer.put
flush_buffer = self._do_flush
body_can_be_buffer = consumer.pool.body_can_be_buffer
def task_message_handler(message, body, ack, reject, callbacks, **kw):
if body is None:
body, headers, decoded, utc = (
message.body, message.headers, False, True,
)
if not body_can_be_buffer:
body = bytes(body) if isinstance(body, buffer_t) else body
else:
body, headers, decoded, utc = proto1_to_proto2(message, body)
request = Req(
message,
on_ack=ack, on_reject=reject, app=app, hostname=hostname,
eventer=eventer, task=task,
body=body, headers=headers, decoded=decoded, utc=utc,
connection_errors=connection_errors,
)
put_buffer(request)
if self._tref is None: # first request starts flush timer.
self._tref = timer.call_repeatedly(
self.flush_interval, flush_buffer,
)
if not next(self._count) % self.flush_every:
flush_buffer()
return task_message_handler
def flush(self, requests):
return self.apply_buffer(requests, ([SimpleRequest.from_request(r)
for r in requests],))
def _do_flush(self):
logger.debug('Batches: Wake-up to flush buffer...')
requests = None
if self._buffer.qsize():
requests = list(consume_queue(self._buffer))
if requests:
logger.debug('Batches: Buffer complete: %s', len(requests))
self.flush(requests)
if not requests:
logger.debug('Batches: Canceling timer: Nothing in buffer.')
if self._tref:
self._tref.cancel() # cancel timer.
self._tref = None
def apply_buffer(self, requests, args=(), kwargs={}):
acks_late = [], []
[acks_late[r.task.acks_late].append(r) for r in requests]
assert requests and (acks_late[True] or acks_late[False])
def on_accepted(pid, time_accepted):
[req.acknowledge() for req in acks_late[False]]
def on_return(result):
[req.acknowledge() for req in acks_late[True]]
return self._pool.apply_async(
apply_batches_task,
(self, args, 0, None),
accept_callback=on_accepted,
callback=acks_late[True] and on_return or noop,
)
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
SPHINXPROJ = CeleryBatches
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
\ No newline at end of file
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
import sys
sys.path.insert(0, os.path.abspath('..'))
# -- General configuration ------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.intersphinx',
'sphinx_celery.setting_crossref',
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix(es) of source filenames.
source_suffix = '.rst'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = u'Celery Batches'
copyright = u'2017, Percipient Networks'
author = u'Percipient Networks'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = u'0.1'
# The full version, including alpha/beta/rc tags.
release = u'0.1'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This patterns also effect to html_static_path and html_extra_path
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = False
# -- Options for HTML output ----------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'alabaster'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#
# html_theme_options = {}
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# Custom sidebar templates, must be a dictionary that maps document names
# to template names.
#
# This is required for the alabaster theme
# refs: http://alabaster.readthedocs.io/en/latest/installation.html#sidebars
html_sidebars = {
'**': [
'relations.html', # needs 'show_related': True theme option to display
'searchbox.html',
]
}
# -- Options for HTMLHelp output ------------------------------------------
# Output file base name for HTML help builder.
htmlhelp_basename = 'CeleryBatchesdoc'
# -- Options for LaTeX output ---------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#
# 'preamble': '',
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'CeleryBatches.tex', u'Celery Batches Documentation',
u'Percipient Networks', 'manual'),
]
# -- Options for manual page output ---------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'celerybatches', u'Celery Batches Documentation',
[author], 1)
]
# -- Options for Texinfo output -------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'CeleryBatches', u'Celery Batches Documentation',
author, 'CeleryBatches', 'One line description of project.',
'Miscellaneous'),
]
# Check sphinx_celery.conf for other things that might be useful.
INTERSPHINX_MAPPING = {
'python': ('http://docs.python.org/dev/', None),
'kombu': ('http://kombu.readthedocs.io/en/master/', None),
'celery': ('http://docs.celeryproject.org/en/master', None),
}
.. currentmodule:: celery_batches
.. automodule:: celery_batches
**API**
.. autoclass:: Batches
:members:
:undoc-members:
.. autoclass:: SimpleRequest
:members:
:undoc-members:
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
set SPHINXPROJ=CeleryBatches
if "%1" == "" goto help
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.http://sphinx-doc.org/
exit /b 1
)
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
:end
popd
git+https://github.com/celery/sphinx_celery.git
Sphinx==1.6.5
pytest>=3.0,<3.3
coverage
[bdist_wheel]
universal = 1
[metadata]
license_file = LICENSE
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import codecs
import setuptools
def long_description():
try:
return codecs.open('README.rst', 'r', 'utf-8').read()
except IOError:
return 'Long description error: Missing README.rst file'
setuptools.setup(
name='celery-batches',
packages=setuptools.find_packages(),
version='0.2',
description='Experimental task class that buffers messages and processes them as a list.',
long_description=long_description(),
keywords='task job queue distributed messaging actor',
author='Percipient Networks',
author_email='support@strongarm.io',
url='https://github.com/percipient/celery-batches',
license='BSD',
platforms=['any'],
install_requires=['celery>=4.0,<5.0'],
classifiers=[
'Development Status :: 3 - Alpha',
'License :: OSI Approved :: BSD License',
'Topic :: System :: Distributed Computing',
'Topic :: Software Development :: Object Brokering',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy',
'Operating System :: OS Independent',
],
)
import pytest
@pytest.fixture(scope='session', params=[1, 2])
def celery_config(request):
return {
'broker_url': 'memory://',
'result_backend': 'cache+memory://',
# Test both protocol 1 and 2 via the parameterized fixture.
'task_protocol': request.param,