Commit b44df451 authored by Michael Fladischer's avatar Michael Fladischer

New upstream version 2.2.3

parent 8641eb14
......@@ -3,25 +3,44 @@ sudo: false
language: python
python:
- '3.5'
- '3.6'
- '3.5'
env:
- TWISTED="twisted==17.5.0"
- TWISTED="twisted"
- TWISTED="twisted==18.7.0"
install:
- pip install $TWISTED isort unify flake8 -e .[tests]
- pip install $TWISTED -e .[tests]
- pip freeze
script:
- pytest
- flake8
- isort --check-only --diff --recursive daphne tests
- unify --check-only --recursive --quote \" daphne tests
stages:
- lint
- test
- name: release
if: tag IS present
jobs:
include:
- python: '3.7'
env: TWISTED="twisted==18.7.0"
dist: xenial
sudo: required
- python: '3.7'
env: TWISTED="twisted"
dist: xenial
sudo: required
- stage: lint
install: pip install -U -e .[tests] black pyflakes isort
script:
- pyflakes daphne tests
- black --check daphne tests
- isort --check-only --diff --recursive daphne tests
- stage: release
script: skip
deploy:
......
2.2.3 (2018-11-06)
------------------
* Enforce that response headers are only bytestrings, rather than allowing
unicode strings and coercing them into bytes.
* New command-line options to set proxy header names: --proxy-headers-host and
--proxy-headers-port.
2.2.2 (2018-08-16)
------------------
......
__version__ = "2.2.2"
__version__ = "2.2.3"
......@@ -49,13 +49,16 @@ class AccessLogGenerator(object):
request="WSDISCONNECT %(path)s" % details,
)
def write_entry(self, host, date, request, status=None, length=None, ident=None, user=None):
def write_entry(
self, host, date, request, status=None, length=None, ident=None, user=None
):
"""
Writes an NCSA-style entry to the log file (some liberty is taken with
what the entries are for non-HTTP)
"""
self.stream.write(
"%s %s %s [%s] \"%s\" %s %s\n" % (
'%s %s %s [%s] "%s" %s %s\n'
% (
host,
ident or "-",
user or "-",
......
import argparse
import logging
import sys
from argparse import ArgumentError, Namespace
from .access import AccessLogGenerator
from .endpoints import build_endpoint_description_strings
......@@ -23,15 +24,9 @@ class CommandLineInterface(object):
server_class = Server
def __init__(self):
self.parser = argparse.ArgumentParser(
description=self.description,
)
self.parser = argparse.ArgumentParser(description=self.description)
self.parser.add_argument(
"-p",
"--port",
type=int,
help="Port number to listen on",
default=None,
"-p", "--port", type=int, help="Port number to listen on", default=None
)
self.parser.add_argument(
"-b",
......@@ -128,10 +123,29 @@ class CommandLineInterface(object):
"--proxy-headers",
dest="proxy_headers",
help="Enable parsing and using of X-Forwarded-For and X-Forwarded-Port headers and using that as the "
"client address",
"client address",
default=False,
action="store_true",
)
self.arg_proxy_host = self.parser.add_argument(
"--proxy-headers-host",
dest="proxy_headers_host",
help="Specify which header will be used for getting the host "
"part. Can be omitted, requires --proxy-headers to be specified "
'when passed. "X-Real-IP" (when passed by your webserver) is a '
"good candidate for this.",
default=False,
action="store",
)
self.arg_proxy_port = self.parser.add_argument(
"--proxy-headers-port",
dest="proxy_headers_port",
help="Specify which header will be used for getting the port "
"part. Can be omitted, requires --proxy-headers to be specified "
"when passed.",
default=False,
action="store",
)
self.parser.add_argument(
"application",
help="The application to dispatch to as path.to.module:instance.path",
......@@ -146,6 +160,37 @@ class CommandLineInterface(object):
"""
cls().run(sys.argv[1:])
def _check_proxy_headers_passed(self, argument: str, args: Namespace):
"""Raise if the `--proxy-headers` weren't specified."""
if args.proxy_headers:
return
raise ArgumentError(
argument=argument,
message="--proxy-headers has to be passed for this parameter.",
)
def _get_forwarded_host(self, args: Namespace):
"""
Return the default host header from which the remote hostname/ip
will be extracted.
"""
if args.proxy_headers_host:
self._check_proxy_headers_passed(argument=self.arg_proxy_host, args=args)
return args.proxy_headers_host
if args.proxy_headers:
return "X-Forwarded-For"
def _get_forwarded_port(self, args: Namespace):
"""
Return the default host header from which the remote hostname/ip
will be extracted.
"""
if args.proxy_headers_port:
self._check_proxy_headers_passed(argument=self.arg_proxy_port, args=args)
return args.proxy_headers_port
if args.proxy_headers:
return "X-Forwarded-Port"
def run(self, args):
"""
Pass in raw argument list and it will decode them
......@@ -176,7 +221,15 @@ class CommandLineInterface(object):
sys.path.insert(0, ".")
application = import_by_path(args.application)
# Set up port/host bindings
if not any([args.host, args.port is not None, args.unix_socket, args.file_descriptor, args.socket_strings]):
if not any(
[
args.host,
args.port is not None,
args.unix_socket,
args.file_descriptor,
args.socket_strings,
]
):
# no advanced binding options passed, patch in defaults
args.host = DEFAULT_HOST
args.port = DEFAULT_PORT
......@@ -189,16 +242,11 @@ class CommandLineInterface(object):
host=args.host,
port=args.port,
unix_socket=args.unix_socket,
file_descriptor=args.file_descriptor
)
endpoints = sorted(
args.socket_strings + endpoints
file_descriptor=args.file_descriptor,
)
endpoints = sorted(args.socket_strings + endpoints)
# Start the server
logger.info(
"Starting server at %s" %
(", ".join(endpoints), )
)
logger.info("Starting server at %s" % (", ".join(endpoints),))
self.server = self.server_class(
application=application,
endpoints=endpoints,
......@@ -208,12 +256,16 @@ class CommandLineInterface(object):
websocket_timeout=args.websocket_timeout,
websocket_connect_timeout=args.websocket_connect_timeout,
application_close_timeout=args.application_close_timeout,
action_logger=AccessLogGenerator(access_log_stream) if access_log_stream else None,
action_logger=AccessLogGenerator(access_log_stream)
if access_log_stream
else None,
ws_protocols=args.ws_protocols,
root_path=args.root_path,
verbosity=args.verbosity,
proxy_forwarded_address_header="X-Forwarded-For" if args.proxy_headers else None,
proxy_forwarded_port_header="X-Forwarded-Port" if args.proxy_headers else None,
proxy_forwarded_proto_header="X-Forwarded-Proto" if args.proxy_headers else None,
proxy_forwarded_address_header=self._get_forwarded_host(args=args),
proxy_forwarded_port_header=self._get_forwarded_port(args=args),
proxy_forwarded_proto_header="X-Forwarded-Proto"
if args.proxy_headers
else None,
)
self.server.run()
def build_endpoint_description_strings(
host=None,
port=None,
unix_socket=None,
file_descriptor=None
host=None, port=None, unix_socket=None, file_descriptor=None
):
"""
Build a list of twisted endpoint description strings that the server will listen on.
......
......@@ -23,7 +23,8 @@ class WebRequest(http.Request):
GET and POST out.
"""
error_template = """
error_template = (
"""
<html>
<head>
<title>%(title)s</title>
......@@ -40,7 +41,13 @@ class WebRequest(http.Request):
<footer>Daphne</footer>
</body>
</html>
""".replace("\n", "").replace(" ", " ").replace(" ", " ").replace(" ", " ") # Shorten it a bit, bytes wise
""".replace(
"\n", ""
)
.replace(" ", " ")
.replace(" ", " ")
.replace(" ", " ")
) # Shorten it a bit, bytes wise
def __init__(self, *args, **kwargs):
try:
......@@ -84,7 +91,7 @@ class WebRequest(http.Request):
self.server.proxy_forwarded_port_header,
self.server.proxy_forwarded_proto_header,
self.client_addr,
self.client_scheme
self.client_scheme,
)
# Check for unicodeish path (or it'll crash when trying to parse)
try:
......@@ -105,7 +112,9 @@ class WebRequest(http.Request):
# Is it WebSocket? IS IT?!
if upgrade_header and upgrade_header.lower() == b"websocket":
# Make WebSocket protocol to hand off to
protocol = self.server.ws_factory.buildProtocol(self.transport.getPeer())
protocol = self.server.ws_factory.buildProtocol(
self.transport.getPeer()
)
if not protocol:
# If protocol creation fails, we signal "internal server error"
self.setResponseCode(500)
......@@ -151,33 +160,38 @@ class WebRequest(http.Request):
logger.debug("HTTP %s request for %s", self.method, self.client_addr)
self.content.seek(0, 0)
# Work out the application scope and create application
self.application_queue = yield maybeDeferred(self.server.create_application, self, {
"type": "http",
# TODO: Correctly say if it's 1.1 or 1.0
"http_version": self.clientproto.split(b"/")[-1].decode("ascii"),
"method": self.method.decode("ascii"),
"path": unquote(self.path.decode("ascii")),
"root_path": self.root_path,
"scheme": self.client_scheme,
"query_string": self.query_string,
"headers": self.clean_headers,
"client": self.client_addr,
"server": self.server_addr,
})
self.application_queue = yield maybeDeferred(
self.server.create_application,
self,
{
"type": "http",
# TODO: Correctly say if it's 1.1 or 1.0
"http_version": self.clientproto.split(b"/")[-1].decode(
"ascii"
),
"method": self.method.decode("ascii"),
"path": unquote(self.path.decode("ascii")),
"root_path": self.root_path,
"scheme": self.client_scheme,
"query_string": self.query_string,
"headers": self.clean_headers,
"client": self.client_addr,
"server": self.server_addr,
},
)
# Check they didn't close an unfinished request
if self.application_queue is None or self.content.closed:
# Not much we can do, the request is prematurely abandoned.
return
# Run application against request
self.application_queue.put_nowait(
{
"type": "http.request",
"body": self.content.read(),
},
{"type": "http.request", "body": self.content.read()}
)
except Exception:
logger.error(traceback.format_exc())
self.basic_error(500, b"Internal Server Error", "Daphne HTTP processing error")
self.basic_error(
500, b"Internal Server Error", "Daphne HTTP processing error"
)
def connectionLost(self, reason):
"""
......@@ -217,16 +231,23 @@ class WebRequest(http.Request):
raise ValueError("HTTP response has already been started")
self._response_started = True
if "status" not in message:
raise ValueError("Specifying a status code is required for a Response message.")
raise ValueError(
"Specifying a status code is required for a Response message."
)
# Set HTTP status code
self.setResponseCode(message["status"])
# Write headers
for header, value in message.get("headers", {}):
self.responseHeaders.addRawHeader(header, value)
logger.debug("HTTP %s response started for %s", message["status"], self.client_addr)
logger.debug(
"HTTP %s response started for %s", message["status"], self.client_addr
)
elif message["type"] == "http.response.body":
if not self._response_started:
raise ValueError("HTTP response has not yet been started but got %s" % message["type"])
raise ValueError(
"HTTP response has not yet been started but got %s"
% message["type"]
)
# Write out body
http.Request.write(self, message.get("body", b""))
# End if there's no more content
......@@ -239,15 +260,21 @@ class WebRequest(http.Request):
# The path is malformed somehow - do our best to log something
uri = repr(self.uri)
try:
self.server.log_action("http", "complete", {
"path": uri,
"status": self.code,
"method": self.method.decode("ascii", "replace"),
"client": "%s:%s" % tuple(self.client_addr) if self.client_addr else None,
"time_taken": self.duration(),
"size": self.sentLength,
})
except Exception as e:
self.server.log_action(
"http",
"complete",
{
"path": uri,
"status": self.code,
"method": self.method.decode("ascii", "replace"),
"client": "%s:%s" % tuple(self.client_addr)
if self.client_addr
else None,
"time_taken": self.duration(),
"size": self.sentLength,
},
)
except Exception:
logger.error(traceback.format_exc())
else:
logger.debug("HTTP response chunk for %s", self.client_addr)
......@@ -270,7 +297,11 @@ class WebRequest(http.Request):
logger.warning("Application timed out while sending response")
self.finish()
else:
self.basic_error(503, b"Service Unavailable", "Application failed to respond within time limit.")
self.basic_error(
503,
b"Service Unavailable",
"Application failed to respond within time limit.",
)
### Utility functions
......@@ -281,11 +312,7 @@ class WebRequest(http.Request):
"""
# If we don't yet have a path, then don't send as we never opened.
if self.path:
self.application_queue.put_nowait(
{
"type": "http.disconnect",
},
)
self.application_queue.put_nowait({"type": "http.disconnect"})
def duration(self):
"""
......@@ -299,20 +326,25 @@ class WebRequest(http.Request):
"""
Responds with a server-level error page (very basic)
"""
self.handle_reply({
"type": "http.response.start",
"status": status,
"headers": [
(b"Content-Type", b"text/html; charset=utf-8"),
],
})
self.handle_reply({
"type": "http.response.body",
"body": (self.error_template % {
"title": str(status) + " " + status_text.decode("ascii"),
"body": body,
}).encode("utf8"),
})
self.handle_reply(
{
"type": "http.response.start",
"status": status,
"headers": [(b"Content-Type", b"text/html; charset=utf-8")],
}
)
self.handle_reply(
{
"type": "http.response.body",
"body": (
self.error_template
% {
"title": str(status) + " " + status_text.decode("ascii"),
"body": body,
}
).encode("utf8"),
}
)
def __hash__(self):
return hash(id(self))
......@@ -343,7 +375,7 @@ class HTTPFactory(http.HTTPFactory):
protocol = http.HTTPFactory.buildProtocol(self, addr)
protocol.requestFactory = WebRequest
return protocol
except Exception as e:
except Exception:
logger.error("Cannot build protocol: %s" % traceback.format_exc())
raise
......
......@@ -2,13 +2,14 @@
import sys # isort:skip
import warnings # isort:skip
from twisted.internet import asyncioreactor # isort:skip
current_reactor = sys.modules.get("twisted.internet.reactor", None)
if current_reactor is not None:
if not isinstance(current_reactor, asyncioreactor.AsyncioSelectorReactor):
warnings.warn(
"Something has already installed a non-asyncio Twisted reactor. Attempting to uninstall it; " +
"you can fix this warning by importing daphne.server early in your codebase or " +
"finding the package that imports Twisted and importing it later on.",
"Something has already installed a non-asyncio Twisted reactor. Attempting to uninstall it; "
+ "you can fix this warning by importing daphne.server early in your codebase or "
+ "finding the package that imports Twisted and importing it later on.",
UserWarning,
)
del sys.modules["twisted.internet.reactor"]
......@@ -34,7 +35,6 @@ logger = logging.getLogger(__name__)
class Server(object):
def __init__(
self,
application,
......@@ -91,11 +91,13 @@ class Server(object):
self.ws_factory.setProtocolOptions(
autoPingTimeout=self.ping_timeout,
allowNullOrigin=True,
openHandshakeTimeout=self.websocket_handshake_timeout
openHandshakeTimeout=self.websocket_handshake_timeout,
)
if self.verbosity <= 1:
# Redirect the Twisted log to nowhere
globalLogBeginner.beginLoggingTo([lambda _: None], redirectStandardIO=False, discardBuffer=True)
globalLogBeginner.beginLoggingTo(
[lambda _: None], redirectStandardIO=False, discardBuffer=True
)
else:
globalLogBeginner.beginLoggingTo([STDLibLogObserver(__name__)])
......@@ -103,7 +105,9 @@ class Server(object):
if http.H2_ENABLED:
logger.info("HTTP/2 support enabled")
else:
logger.info("HTTP/2 support not enabled (install the http2 and tls Twisted extras)")
logger.info(
"HTTP/2 support not enabled (install the http2 and tls Twisted extras)"
)
# Kick off the timeout loop
reactor.callLater(1, self.application_checker)
......@@ -141,7 +145,11 @@ class Server(object):
host = port.getHost()
if hasattr(host, "host") and hasattr(host, "port"):
self.listening_addresses.append((host.host, host.port))
logger.info("Listening on TCP address %s:%s", port.getHost().host, port.getHost().port)
logger.info(
"Listening on TCP address %s:%s",
port.getHost().host,
port.getHost().port,
)
def listen_error(self, failure):
logger.critical("Listen failure: %s", failure.getErrorMessage())
......@@ -187,10 +195,13 @@ class Server(object):
# Run it, and stash the future for later checking
if protocol not in self.connections:
return None
self.connections[protocol]["application_instance"] = asyncio.ensure_future(application_instance(
receive=input_queue.get,
send=lambda message: self.handle_reply(protocol, message),
), loop=asyncio.get_event_loop())
self.connections[protocol]["application_instance"] = asyncio.ensure_future(
application_instance(
receive=input_queue.get,
send=lambda message: self.handle_reply(protocol, message),
),
loop=asyncio.get_event_loop(),
)
return input_queue
async def handle_reply(self, protocol, message):
......@@ -200,9 +211,28 @@ class Server(object):
# Don't do anything if the connection is closed
if self.connections[protocol].get("disconnected", None):
return
self.check_headers_type(message)
# Let the protocol handle it
protocol.handle_reply(message)
@staticmethod
def check_headers_type(message):
if not message["type"] == "http.response.start":
return
for k, v in message.get("headers", []):
if not isinstance(k, bytes):
raise ValueError(
"Header name '{}' expected to be `bytes`, but got `{}`".format(
k, type(k)
)
)
if not isinstance(v, bytes):
raise ValueError(
"Header value '{}' expected to be `bytes`, but got `{}`".format(
v, type(v)
)
)
### Utility
def application_checker(self):
......@@ -215,7 +245,10 @@ class Server(object):
application_instance = details.get("application_instance", None)
# First, see if the protocol disconnected and the app has taken
# too long to close up
if disconnected and time.time() - disconnected > self.application_close_timeout:
if (
disconnected
and time.time() - disconnected > self.application_close_timeout
):
if application_instance and not application_instance.done():
logger.warning(
"Application instance %r for connection %s took too long to shut down and was killed.",
......@@ -238,14 +271,11 @@ class Server(object):
else:
exception_output = "{}\n{}{}".format(
exception,
"".join(traceback.format_tb(
exception.__traceback__,
)),
"".join(traceback.format_tb(exception.__traceback__)),
" {}".format(exception),
)
logger.error(
"Exception inside application: %s",
exception_output,
"Exception inside application: %s", exception_output
)
if not disconnected:
protocol.handle_exception(exception)
......
......@@ -100,9 +100,7 @@ class DaphneTestingInstance:
Adds messages for the application to send back.
The next time it receives an incoming message, it will reply with these.
"""
TestApplication.save_setup(
response_messages=messages,
)
TestApplication.save_setup(response_messages=messages)
class DaphneProcess(multiprocessing.Process):
......@@ -193,12 +191,7 @@ class TestApplication:
Stores setup information.
"""
with open(cls.setup_storage, "wb") as fh:
pickle.dump(
{
"response_messages": response_messages,
},
fh,
)
pickle.dump({"response_messages": response_messages}, fh)
@classmethod
def load_setup(cls):
......@@ -218,13 +211,7 @@ class TestApplication:
We could use pickle here, but that seems wrong, still, somehow.
"""
with open(cls.result_storage, "wb") as fh:
pickle.dump(
{
"scope": scope,
"messages": messages,
},
fh,
)
pickle.dump({"scope": scope, "messages": messages}, fh)
@classmethod
def save_exception(cls, exception):
......@@ -233,12 +220,7 @@ class TestApplication:
We could use pickle here, but that seems wrong, still, somehow.
"""
with open(cls.result_storage, "wb") as fh:
pickle.dump(
{
"exception": exception,
},
fh,
)
pickle.dump({"exception": exception}, fh)
@classmethod
def load_result(cls):
......
......@@ -22,12 +22,14 @@ def header_value(headers, header_name):
return value.decode("utf-8")
def parse_x_forwarded_for(headers,
address_header_name="X-Forwarded-For",
port_header_name="X-Forwarded-Port",
proto_header_name="X-Forwarded-Proto",
original_addr=None,
original_scheme=None):
def parse_x_forwarded_for(
headers,
address_header_name="X-Forwarded-For",
port_header_name="X-Forwarded-Port",
proto_header_name="X-Forwarded-Proto",
original_addr=None,
original_scheme=None,
):
"""
Parses an X-Forwarded-For header and returns a host/port pair as a list.
......
......@@ -3,7 +3,11 @@ import time
import traceback
from urllib.parse import unquote
from autobahn.twisted.websocket import ConnectionDeny, WebSocketServerFactory, WebSocketServerProtocol
from autobahn.twisted.websocket import (
ConnectionDeny,
WebSocketServerFactory,
WebSocketServerProtocol,
)
from twisted.internet import defer
from .utils import parse_x_forwarded_for
......@@ -54,32 +58,34 @@ class WebSocketProtocol(WebSocketServerProtocol):
self.server.proxy_forwarded_address_header,
self.server.proxy_forwarded_port_header,
self.server.proxy_forwarded_proto_header,
self.client_addr
self.client_addr,
)
# Decode websocket subprotocol options
subprotocols = []
for header, value in self.clean_headers:
if header == b"sec-websocket-protocol":
subprotocols = [
x.strip()
for x in
unquote(value.decode("ascii")).split(",")
x.strip() for x in unquote(value.decode("ascii")).split(",")
]
# Make new application instance with scope
self.path = request.path.encode("ascii")
self.application_deferred = defer.maybeDeferred(self.server.create_application, self, {
"type": "websocket",
"path": unquote(self.path.decode("ascii")),
"headers": self.clean_headers,
"query_string": self._raw_query_string, # Passed by HTTP protocol
"client": self.client_addr,
"server": self.server_addr,
"subprotocols": subprotocols,
})
self.application_deferred = defer.maybeDeferred(
self.server.create_application,
self,
{