Commit c9e2e999 authored by Olivier Sallou's avatar Olivier Sallou

New upstream version 0.15.0

parent 21b0d92e
0.15.0 (2018-10-22)
-------------------
- Added support for V2 JSON encoding.
- Fixed TransportHandler bug that was affecting also V1 JSON.
0.14.1 (2018-10-09)
-------------------
- Fixed memory leak introduced in 0.13.0.
0.14.0 (2018-10-01)
-------------------
- Support JSON encoding for V1 spans.
- Allow overriding the span_name after creation.
0.13.0 (2018-06-25)
-------------------
- Removed deprecated `zipkin_logger.debug()` interface.
- `py_zipkin.stack` was renamed as `py_zipkin.storage`. If you were
importing this module, you'll need to update your code.
0.12.0 (2018-05-29)
-------------------
- Support max payload size for transport handlers.
- Transport handlers should now be implemented as classes
extending py_zipkin.transport.BaseTransportHandler.
0.11.2 (2018-05-23)
-------------------
- Don't overwrite passed in annotations
0.11.1 (2018-05-23)
-------------------
- Add binary annotations to the span even if the request is not being
sampled. This fixes binary annotations for firehose spans.
0.11.0 (2018-02-08)
-------------------
- Add support for "firehose mode", which logs 100% of the spans
regardless of sample rate.
0.10.1 (2018-02-05)
-------------------
- context_stack will now default to `ThreadLocalStack()` if passed as
`None`
0.10.0 (2018-02-05)
-------------------
- Add support for using explicit in-process context storage instead of
using thread_local. This allows you to use py_zipkin in cooperative
multitasking environments e.g. asyncio
- `py_zipkin.thread_local` is now deprecated. Instead use
`py_zipkin.stack.ThreadLocalStack()`
- TraceId and SpanId generation performance improvements.
- 128-bit TraceIds now start with an epoch timestamp to support easy
interop with AWS X-Ray
0.9.0 (2017-07-31)
------------------
- Add batch span sending. Note that spans are now sent in lists.
0.8.3 (2017-07-10)
------------------
- Be defensive about having logging handlers configured to avoid throwing
NullHandler attribute errors
0.8.2 (2017-06-30)
------------------
- Don't log ss and sr annotations when in a client span context
- Add error binary annotation if an exception occurs
0.8.1 (2017-06-16)
------------------
- Fixed server send timing to more accurately reflect when server send
actually occurs.
- Replaced logging_start annotation with logging_end
0.8.0 (2017-06-01)
------------------
- Added 128-bit trace id support
- Added ability to explicitly specify host for a span
- Added exception handling if host can't be determined automatically
- SERVER_ADDR ('sa') binary annotations can be added to spans
- py36 support
0.7.1 (2017-05-01)
------------------
- Fixed a bug where `update_binary_annotations` would fail for a child
span in a trace that is not being sampled
0.7.0 (2017-03-06)
------------------
- Simplify `update_binary_annotations` for both root and non-root spans
0.6.0 (2017-02-03)
------------------
- Added support for forcing `zipkin_span` to report timestamp/duration.
Changes API of `zipkin_span`, but defaults back to existing behavior.
0.5.0 (2017-02-01)
------------------
- Properly set timestamp/duration on server and local spans
- Updated thrift spec to include these new fields
- The `zipkin_span` entrypoint should be backwards compatible
0.4.4 (2016-11-29)
------------------
- Add optional annotation for when Zipkin logging starts
0.4.3 (2016-11-04)
------------------
- Fix bug in zipkin_span decorator
0.4.2 (2016-11-01)
------------------
- Be defensive about transport_handler when logging spans.
0.4.1 (2016-10-24)
------------------
- Add ability to override span_id when creating new ZipkinAttrs.
0.4.0 (2016-10-20)
------------------
- Added `start` and `stop` functions as friendlier versions of the
__enter__ and __exit__ functions.
0.3.1 (2016-09-30)
------------------
- Adds new param to thrift.create_endpoint allowing creation of
thrift Endpoint objects on a proxy machine representing another
host.
0.2.1 (2016-09-30)
------------------
- Officially "release" v0.2.0. Accidentally pushed a v0.2.0 without
the proper version bump, so v0.2.1 is the new real version. Please
use this instead of v0.2.0.
0.2.0 (2016-09-30)
------------------
- Fix problem where if zipkin_attrs and sample_rate were passed, but
zipkin_attrs.is_sampled=True, new zipkin_attrs were being generated.
0.1.2 (2016-09-29)
------------------
- Fix sampling algorithm that always sampled for rates > 50%
0.1.1 (2016-07-05)
------------------
- First py_zipkin version with context manager/decorator functionality.
include README.md
include CHANGELOG.rst
This diff is collapsed.
[![Build Status](https://travis-ci.org/Yelp/py_zipkin.svg?branch=master)](https://travis-ci.org/Yelp/py_zipkin)
[![Coverage Status](https://img.shields.io/coveralls/Yelp/py_zipkin.svg)](https://coveralls.io/r/Yelp/py_zipkin)
[![PyPi version](https://img.shields.io/pypi/v/py_zipkin.svg)](https://pypi.python.org/pypi/py_zipkin/)
[![Supported Python versions](https://img.shields.io/pypi/pyversions/py_zipkin.svg)](https://pypi.python.org/pypi/py_zipkin/)
py_zipkin
---------
py_zipkin provides a context manager/decorator along with some utilities to
facilitate the usage of Zipkin in Python applications.
Install
-------
```
pip install py_zipkin
```
Usage
-----
py_zipkin requires a `transport_handler` object that handles logging zipkin
messages to a central logging service such as kafka or scribe.
`py_zipkin.zipkin.zipkin_span` is the main tool for starting zipkin traces or
logging spans inside an ongoing trace. zipkin_span can be used as a context
manager or a decorator.
#### Usage #1: Start a trace with a given sampling rate
```python
from py_zipkin.zipkin import zipkin_span
def some_function(a, b):
with zipkin_span(
service_name='my_service',
span_name='my_span_name',
transport_handler=some_handler,
port=42,
sample_rate=0.05, # Value between 0.0 and 100.0
):
do_stuff(a, b)
```
#### Usage #2: Trace a service call
The difference between this and Usage #1 is that the zipkin_attrs are calculated
separately and passed in, thus negating the need of the sample_rate param.
```python
# Define a pyramid tween
def tween(request):
zipkin_attrs = some_zipkin_attr_creator(request)
with zipkin_span(
service_name='my_service',
span_name='my_span_name',
zipkin_attrs=zipkin_attrs,
transport_handler=some_handler,
port=22,
) as zipkin_context:
response = handler(request)
zipkin_context.update_binary_annotations(
some_binary_annotations)
return response
```
#### Usage #3: Log a span inside an ongoing trace
This can be also be used inside itself to produce continuously nested spans.
```python
@zipkin_span(service_name='my_service', span_name='some_function')
def some_function(a, b):
return do_stuff(a, b)
```
#### Other utilities
`zipkin_span.update_binary_annotations()` can be used inside a zipkin trace
to add to the existing set of binary annotations.
```python
def some_function(a, b):
with zipkin_span(
service_name='my_service',
span_name='some_function',
transport_handler=some_handler,
port=42,
sample_rate=0.05,
) as zipkin_context:
result = do_stuff(a, b)
zipkin_context.update_binary_annotations({'result': result})
```
`zipkin_span.add_sa_binary_annotation()` can be used to add a binary annotation
to the current span with the key 'sa'. This function allows the user to specify the
destination address of the service being called (useful if the destination doesn't
support zipkin). See http://zipkin.io/pages/data_model.html for more information on the
'sa' binary annotation.
> NOTE: the V2 span format only support 1 "sa" endpoint (represented by remoteEndpoint)
> so `add_sa_binary_annotation` now raises `ValueError` if you try to set multiple "sa"
> annotations for the same span.
```python
def some_function():
with zipkin_span(
service_name='my_service',
span_name='some_function',
transport_handler=some_handler,
port=42,
sample_rate=0.05,
) as zipkin_context:
make_call_to_non_instrumented_service()
zipkin_context.add_sa_binary_annotation(
port=123,
service_name='non_instrumented_service',
host='12.34.56.78',
)
```
`create_http_headers_for_new_span()` creates a set of HTTP headers that can be forwarded
in a request to another service.
```python
headers = {}
headers.update(create_http_headers_for_new_span())
http_client.get(
path='some_url',
headers=headers,
)
```
Transport
---------
py_zipkin (for the moment) thrift-encodes spans. The actual transport layer is
pluggable, though.
The recommended way to implement a new transport handler is to subclass
`py_zipkin.transport.BaseTransportHandler` and implement the `send` and
`get_max_payload_bytes` methods.
`send` receives an already encoded thrift list as argument.
`get_max_payload_bytes` should return the maximum payload size supported by your
transport, or `None` if you can send arbitrarily big messages.
The simplest way to get spans to the collector is via HTTP POST. Here's an
example of a simple HTTP transport using the `requests` library. This assumes
your Zipkin collector is running at localhost:9411.
> NOTE: older versions of py_zipkin suggested implementing the transport handler
> as a function with a single argument. That's still supported and should work
> with the current py_zipkin version, but it's deprecated.
```python
import requests
from py_zipkin.transport import BaseTransportHandler
class HttpTransport(BaseTransportHandler):
def get_max_payload_bytes(self):
return None
def send(self, encoded_span):
# The collector expects a thrift-encoded list of spans.
requests.post(
'http://localhost:9411/api/v1/spans',
data=encoded_span,
headers={'Content-Type': 'application/x-thrift'},
)
```
If you have the ability to send spans over Kafka (more like what you might do
in production), you'd do something like the following, using the
[kafka-python](https://pypi.python.org/pypi/kafka-python) package:
```python
from kafka import SimpleProducer, KafkaClient
from py_zipkin.transport import BaseTransportHandler
class KafkaTransport(BaseTransportHandler):
def get_max_payload_bytes(self):
# By default Kafka rejects messages bigger than 1000012 bytes.
return 1000012
def send(self, message):
kafka_client = KafkaClient('{}:{}'.format('localhost', 9092))
producer = SimpleProducer(kafka_client)
producer.send_messages('kafka_topic_name', message)
```
Using in multithreading evironments
-----------------------------------
If you want to use py_zipkin in a cooperative multithreading environment,
e.g. asyncio, you need to explicitly pass an instance of `py_zipkin.storage.Stack`
as parameter `context_stack` for `zipkin_span` and `create_http_headers_for_new_span`.
By default, py_zipkin uses a thread local storage for the attributes, which is
defined in `py_zipkin.storage.ThreadLocalStack`.
Additionally, you'll also need to explicitly pass an instance of
`py_zipkin.storage.SpanStorage` as parameter `span_storage` to `zipkin_span`.
```python
from py_zipkin.zipkin import zipkin_span
from py_zipkin.storage import Stack
from py_zipkin.storage import SpanStorage
def my_function():
context_stack = Stack()
span_storage = SpanStorage()
await my_function(context_stack, span_storage)
async def my_function(context_stack, span_storage):
with zipkin_span(
service_name='my_service',
span_name='some_function',
transport_handler=some_handler,
port=42,
sample_rate=0.05,
context_stack=context_stack,
span_storage=span_storage,
):
result = do_stuff(a, b)
```
Firehose mode [EXPERIMENTAL]
----------------------------
"Firehose mode" records 100% of the spans, regardless of
sampling rate. This is useful if you want to treat these spans
differently, e.g. send them to a different backend that has limited
retention. It works in tandem with normal operation, however there may
be additional overhead. In order to use this, you add a
`firehose_handler` just like you add a `transport_handler`.
This feature should be considered experimental and may be removed at
any time without warning. If you do use this, be sure to send
asynchronously to avoid excess overhead for every request.
License
-------
Copyright (c) 2018, Yelp, Inc. All Rights reserved. Apache v2
This diff is collapsed.
CHANGELOG.rst
MANIFEST.in
README.md
setup.cfg
setup.py
py_zipkin/__init__.py
py_zipkin/exception.py
py_zipkin/logging_helper.py
py_zipkin/storage.py
py_zipkin/thread_local.py
py_zipkin/transport.py
py_zipkin/util.py
py_zipkin/zipkin.py
py_zipkin.egg-info/PKG-INFO
......@@ -12,5 +16,9 @@ py_zipkin.egg-info/SOURCES.txt
py_zipkin.egg-info/dependency_links.txt
py_zipkin.egg-info/requires.txt
py_zipkin.egg-info/top_level.txt
py_zipkin/encoding/__init__.py
py_zipkin/encoding/_encoders.py
py_zipkin/encoding/_helpers.py
py_zipkin/encoding/_types.py
py_zipkin/thrift/__init__.py
py_zipkin/thrift/zipkinCore.thrift
\ No newline at end of file
six
thriftpy
[:python_version=="2.7"]
enum34
# DeprecationWarnings are silent since Python 2.7.
# The `default` filter only prints the first occurrence of matching warnings for
# each location where the warning is issued, so that we don't spam our users logs.
import warnings
warnings.simplefilter('default', DeprecationWarning)
# Export useful functions and types from private modules.
from py_zipkin.encoding._types import Encoding # noqa
from py_zipkin.encoding._types import Kind # noqa
# -*- coding: utf-8 -*-
import json
from py_zipkin import thrift
from py_zipkin.encoding._types import Encoding
from py_zipkin.exception import ZipkinError
def get_encoder(encoding):
"""Creates encoder object for the given encoding.
:param encoding: desired output encoding protocol.
:type encoding: Encoding
:return: corresponding IEncoder object
:rtype: IEncoder
"""
if encoding == Encoding.V1_THRIFT:
return _V1ThriftEncoder()
if encoding == Encoding.V1_JSON:
return _V1JSONEncoder()
if encoding == Encoding.V2_JSON:
return _V2JSONEncoder()
raise ZipkinError('Unknown encoding: {}'.format(encoding))
class IEncoder(object):
"""Encoder interface."""
def fits(self, current_count, current_size, max_size, new_span):
"""Returns whether the new span will fit in the list.
:param current_count: number of spans already in the list.
:type current_count: int
:param current_size: sum of the sizes of all the spans already in the list.
:type current_size: int
:param max_size: max supported transport payload size.
:type max_size: int
:param new_span: encoded span object that we want to add the the list.
:type new_span: str or bytes
:return: True if the new span can be added to the list, False otherwise.
:rtype: bool
"""
raise NotImplementedError()
def encode_span(self, span_builder):
"""Encodes a single span.
:param span_builder: span_builder object representing the span.
:type span_builder: SpanBuilder
:return: encoded span.
:rtype: str or bytes
"""
raise NotImplementedError()
def encode_queue(self, queue):
"""Encodes a list of pre-encoded spans.
:param queue: list of encoded spans.
:type queue: list
:return: encoded list, type depends on the encoding.
:rtype: str or bytes
"""
raise NotImplementedError()
class _V1ThriftEncoder(IEncoder):
"""Thrift encoder for V1 spans."""
def fits(self, current_count, current_size, max_size, new_span):
"""Checks if the new span fits in the max payload size.
Thrift lists have a fixed-size header and no delimiters between elements
so it's easy to compute the list size.
"""
return thrift.LIST_HEADER_SIZE + current_size + len(new_span) <= max_size
def encode_span(self, span_builder):
"""Encodes the current span to thrift."""
span = span_builder.build_v1_span()
thrift_endpoint = thrift.create_endpoint(
span.endpoint.port,
span.endpoint.service_name,
span.endpoint.ipv4,
span.endpoint.ipv6,
)
thrift_annotations = thrift.annotation_list_builder(
span.annotations,
thrift_endpoint,
)
thrift_binary_annotations = thrift.binary_annotation_list_builder(
span.binary_annotations,
thrift_endpoint,
)
# Add sa binary annotation
if span.sa_endpoint is not None:
thrift_sa_endpoint = thrift.create_endpoint(
span.sa_endpoint.port,
span.sa_endpoint.service_name,
span.sa_endpoint.ipv4,
span.sa_endpoint.ipv6,
)
thrift_binary_annotations.append(thrift.create_binary_annotation(
key=thrift.zipkin_core.SERVER_ADDR,
value=thrift.SERVER_ADDR_VAL,
annotation_type=thrift.zipkin_core.AnnotationType.BOOL,
host=thrift_sa_endpoint,
))
thrift_span = thrift.create_span(
span.id,
span.parent_id,
span.trace_id,
span.name,
thrift_annotations,
thrift_binary_annotations,
span.timestamp,
span.duration,
)
encoded_span = thrift.span_to_bytes(thrift_span)
return encoded_span
def encode_queue(self, queue):
"""Converts the queue to a thrift list"""
return thrift.encode_bytes_list(queue)
class _BaseJSONEncoder(IEncoder):
""" V1 and V2 JSON encoders need many common helper functions """
def fits(self, current_count, current_size, max_size, new_span):
"""Checks if the new span fits in the max payload size.
Json lists only have a 2 bytes overhead from '[]' plus 1 byte from
',' between elements
"""
return 2 + current_count + current_size + len(new_span) <= max_size
def _create_json_endpoint(self, endpoint, is_v1):
"""Converts an Endpoint to a JSON endpoint dict.
:param endpoint: endpoint object to convert.
:type endpoint: Endpoint
:param is_v1: whether we're serializing a v1 span. This is needed since
in v1 some fields default to an empty string rather than being
dropped if they're not set.
:type is_v1: bool
:return: dict representing a JSON endpoint.
:rtype: dict
"""
json_endpoint = {}
if endpoint.service_name:
json_endpoint['serviceName'] = endpoint.service_name
elif is_v1:
# serviceName is mandatory in v1
json_endpoint['serviceName'] = ""
if endpoint.port and endpoint.port != 0:
json_endpoint['port'] = endpoint.port
if endpoint.ipv4 is not None:
json_endpoint['ipv4'] = endpoint.ipv4
if endpoint.ipv6 is not None:
json_endpoint['ipv6'] = endpoint.ipv6
return json_endpoint
def encode_queue(self, queue):
"""Concatenates the list to a JSON list"""
return '[' + ','.join(queue) + ']'
class _V1JSONEncoder(_BaseJSONEncoder):
"""JSON encoder for V1 spans."""
def encode_span(self, span_builder):
"""Encodes a single span to JSON."""
span = span_builder.build_v1_span()
json_span = {
'traceId': span.trace_id,
'name': span.name,
'id': span.id,
'annotations': [],
'binaryAnnotations': [],
}
if span.parent_id:
json_span['parentId'] = span.parent_id
if span.timestamp:
json_span['timestamp'] = int(span.timestamp * 1000000)
if span.duration:
json_span['duration'] = int(span.duration * 1000000)
v1_endpoint = self._create_json_endpoint(span.endpoint, True)
for key, timestamp in span.annotations.items():
json_span['annotations'].append({
'endpoint': v1_endpoint,
'timestamp': int(timestamp * 1000000),
'value': key,
})
for key, value in span.binary_annotations.items():
json_span['binaryAnnotations'].append({
'key': key,
'value': value,
'endpoint': v1_endpoint,
})
# Add sa binary annotations
if span.sa_endpoint is not None:
json_sa_endpoint = self._create_json_endpoint(span.sa_endpoint, True)
json_span['binaryAnnotations'].append({
'key': 'sa',
'value': '1',
'endpoint': json_sa_endpoint,
})
encoded_span = json.dumps(json_span)
return encoded_span
class _V2JSONEncoder(_BaseJSONEncoder):
"""JSON encoder for V2 spans."""
def encode_span(self, span_builder):
"""Encodes a single span to JSON."""
span = span_builder.build_v2_span()
json_span = {
'traceId': span.trace_id,
'id': span.id,
}
if span.name:
json_span['name'] = span.name
if span.parent_id:
json_span['parentId'] = span.parent_id
if span.timestamp:
json_span['timestamp'] = int(span.timestamp * 1000000)
if span.duration:
json_span['duration'] = int(span.duration * 1000000)
if span.shared is True:
json_span['shared'] = True
if span.kind and span.kind.value is not None:
json_span['kind'] = span.kind.value
if span.local_endpoint:
json_span['localEndpoint'] = self._create_json_endpoint(
span.local_endpoint,
False,
)
if span.remote_endpoint:
json_span['remoteEndpoint'] = self._create_json_endpoint(
span.remote_endpoint,
False,
)
if span.tags and len(span.tags) > 0:
json_span['tags'] = span.tags
if span.annotations:
json_span['annotations'] = [
{
'timestamp': int(timestamp * 1000000),
'value': key,
}
for key, timestamp in span.annotations.items()
]
encoded_span = json.dumps(json_span)
return encoded_span
# -*- coding: utf-8 -*-
import socket
from collections import namedtuple
from collections import OrderedDict
from py_zipkin.encoding._types import Kind
from py_zipkin.exception import ZipkinError
Endpoint = namedtuple(
'Endpoint',
['service_name', 'ipv4', 'ipv6', 'port'],
)
_V1Span = namedtuple(
'V1Span',
['trace_id', 'name', 'parent_id', 'id', 'timestamp', 'duration', 'endpoint',
'annotations', 'binary_annotations', 'sa_endpoint'],
)
_V2Span = namedtuple(
'V2Span',
['trace_id', 'name', 'parent_id', 'id', 'kind', 'timestamp',