Commit 7df8bf3f authored by Tianon Gravi's avatar Tianon Gravi

Imported Upstream version 0.0~git20140312.0.f693c7e

parent e8a98c33
Copyright 2013 Alan Shreve
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
# muxado - Stream multiplexing for Go
## What is stream multiplexing?
Imagine you have a single stream (a bi-directional stream of bytes) like a TCP connection. Stream multiplexing
is a method for enabling the transmission of multiple simultaneous streams over the one underlying transport stream.
## What is muxado?
muxado is an implementation of a stream multiplexing library in Go that can be layered on top of a net.Conn to multiplex that stream.
muxado's protocol is not currently documented explicitly, but it is very nearly an implementation of the HTTP2
framing layer with all of the HTTP-specific bits removed. It is heavily inspired by HTTP2, SPDY, and WebMUX.
## How does it work?
Simplifying, muxado chunks data sent over each multiplexed stream and transmits each piece
as a "frame" over the transport stream. It then sends these frames,
often interleaving data for multiple streams, to the remote side.
The remote endpoint then reassembles the frames into distinct streams
of data which are presented to the application layer.
## What good is it anyways?
A stream multiplexing library is a powerful tool for an application developer's toolbox which solves a number of problems:
- It allows developers to implement asynchronous/pipelined protocols with ease. Instead of matching requests with responses in your protocols, just open a new stream for each request and communicate over that.
- muxado can do application-level keep-alives and dead-session detection so that you don't have to write heartbeat code ever again.
- You never need to build connection pools for services running your protocol. You can open as many independent, concurrent streams as you need without incurring any round-trip latency costs.
- muxado allows the server to initiate new streams to clients which is normally very difficult without NAT-busting trickery.
## Show me the code!
As much as possible, the muxado library strives to look and feel just like the standard library's net package. Here's how you initiate a new client session:
sess, err := muxado.DialTLS("tcp", "example.com:1234", tlsConfig)
And a server:
l, err := muxado.ListenTLS("tcp", ":1234", tlsConfig))
for {
sess, err := l.Accept()
go handleSession(sess)
}
Once you have a session, you can open new streams on it:
stream, err := sess.Open()
And accept streams opened by the remote side:
stream, err := sess.Accept()
Streams satisfy the net.Conn interface, so they're very familiar to work with:
n, err := stream.Write(buf)
n, err = stream.Read(buf)
muxado sessions and streams implement the net.Listener and net.Conn interfaces (with a small shim), so you can use them with existing golang libraries!
sess, err := muxado.DialTLS("tcp", "example.com:1234", tlsConfig)
http.Serve(sess.NetListener(), handler)
## A more extensive muxado client
// open a new session to a remote endpoint
sess, err := muxado.Dial("tcp", "example.com:1234")
if err != nil {
panic(err)
}
// handle streams initiated by the server
go func() {
for {
stream, err := sess.Accept()
if err != nil {
panic(err)
}
go handleStream(stream)
}
}()
// open new streams for application requests
for req := range requests {
str, err := sess.Open()
if err != nil {
panic(err)
}
go func(stream muxado.Stream) {
defer stream.Close()
// send request
if _, err = stream.Write(req.serialize()); err != nil {
panic(err)
}
// read response
if buf, err := ioutil.ReadAll(stream); err != nil {
panic(err)
}
handleResponse(buf)
}(str)
}
## How did you build it?
muxado is a modified implementation of the HTTP2 framing protocol with all of the HTTP-specific bits removed. It aims
for simplicity in the protocol by removing everything that is not core to multiplexing streams. The muxado code
is also built with the intention that its performance should be moderately good within the bounds of working in Go. As a result,
muxado does contain some unidiomatic code.
## API documentation
API documentation is available on godoc.org:
[muxado API documentation](https://godoc.org/github.com/inconshreveable/muxado)
## What are its biggest drawbacks?
Any stream-multiplexing library over TCP will suffer from head-of-line blocking if the next packet to service gets dropped.
muxado is also a poor choice when sending large payloads and speed is a priority.
It shines best when the application workload needs to quickly open a large number of small-payload streams.
## Status
Most of muxado's features are implemented (and tested!), but there are many that are still rough or could be improved. See the TODO file for suggestions on what needs to improve.
## License
Apache
improve the formatting of the docs to look nice for godoc
use a better example in the docs first before showing the clever integration with the net.Listener/net.Conn APIs
Make all errors support Temporary() API so applications can better decide what to do
Handle case of running out of stream ids + test
writeFrame errors should kill the session, but only if it's not a timeout + test
Short read should cause an error + test
Decrement() in outBuffer needs to have deadline support
Extensions:
Heartbeat extension needs tests
Make extensions a public API instead of a private API
Document how extensions work
Don't include any extensions by default
heartbeat test
Finish writing buffer tests
Write stress test
Write multi-frame write test
More session tests
More stream tests
Write frame/transport tests - verify read correct type, verify unknown type causes error, verify ioerror is propogated
Write frame/syn tests
Write frame/goaway tests
### Low priority:
- Add the ability to differentiate stream errors which allow you to safely retry
- Decide what to do if the application isn't handling its accepted streams fast enough. Refuse stream? Wait and block reading more frames?
- Figure out whether to die with/without lock - in GoAway/OpenStream
- Add priority APIs to stream
- Add priority extension
- Add Reset() stream API
- Eliminate unlikely race on s.remoteDebug between handleFrame() and die()
- Should writeFrame calls for rst/wndinc set the write deadline?
- don't send reset if the stream is fully closed
- include muxado pun somewhere in the docs
package muxado
import (
"github.com/inconshreveable/muxado/proto"
"github.com/inconshreveable/muxado/proto/frame"
)
// streamAdaptor recasts the types of some function calls by the proto/Stream implementation
// so that it satisfies the public interface
type streamAdaptor struct {
proto.IStream
}
func (a *streamAdaptor) Id() StreamId {
return StreamId(a.IStream.Id())
}
func (a *streamAdaptor) StreamType() StreamType {
return StreamType(a.IStream.StreamType())
}
func (a *streamAdaptor) Session() Session {
return &sessionAdaptor{a.IStream.Session()}
}
// sessionAdaptor recasts the types of some function calls by the proto/Session implementation
// so that it satisfies the public interface
type sessionAdaptor struct {
proto.ISession
}
func (a *sessionAdaptor) Accept() (Stream, error) {
str, err := a.ISession.Accept()
return &streamAdaptor{str}, err
}
func (a *sessionAdaptor) Open() (Stream, error) {
str, err := a.ISession.Open()
return &streamAdaptor{str}, err
}
func (a *sessionAdaptor) OpenStream(priority StreamPriority, streamType StreamType, fin bool) (Stream, error) {
str, err := a.ISession.OpenStream(frame.StreamPriority(priority), frame.StreamType(streamType), fin)
return &streamAdaptor{str}, err
}
func (a *sessionAdaptor) GoAway(code ErrorCode, debug []byte) error {
return a.ISession.GoAway(frame.ErrorCode(code), debug)
}
func (a *sessionAdaptor) Wait() (ErrorCode, error, []byte) {
code, err, debug := a.ISession.Wait()
return ErrorCode(code), err, debug
}
package muxado
import (
"crypto/tls"
"github.com/inconshreveable/muxado/proto"
"github.com/inconshreveable/muxado/proto/ext"
"net"
)
// Client returns a new muxado client-side connection using conn as the transport.
func Client(conn net.Conn) Session {
return &sessionAdaptor{proto.NewSession(conn, proto.NewStream, true, []proto.Extension{ext.NewDefaultHeartbeat()})}
}
// Dial opens a new connection to the given network/address and then beings a muxado client session on it.
func Dial(network, addr string) (sess Session, err error) {
conn, err := net.Dial(network, addr)
if err != nil {
return
}
return Client(conn), nil
}
// DialTLS opens a new TLS encrytped connection with the givent configuration
// to the network/address and then beings a muxado client session on it.
func DialTLS(network, addr string, tlsConfig *tls.Config) (sess Session, err error) {
conn, err := tls.Dial(network, addr, tlsConfig)
if err != nil {
return
}
return Client(conn), nil
}
// muxado is an implementation of a general-purpose stream-multiplexing protocol.
//
// muxado allows clients applications to multiplex a single stream-oriented connection,
// like a TCP connection, and communicate over many streams on top of it. muxado accomplishes
// this by chunking data sent over each stream into frames and then reassembling the
// frames and buffering the data before being passed up to the application
// layer on the other side.
//
// muxado is very nearly an exact implementation of the HTTP2 framing layer while leaving out all
// the HTTP-specific parts. It is heavily inspired by HTTP2/SPDY/WebMUX.
//
// muxado's documentation uses the following terms consistently for easier communication:
// - "a transport" is an underlying stream (typically TCP) over which frames are sent between
// endpoints
// - "a stream" is any of the full-duplex byte-streams multiplexed over the transport
// - "a session" refers to an instance of the muxado protocol running over a transport between
// two endpoints
//
// Perhaps the best part of muxado is the interface exposed to client libraries. Since new
// streams may be initiated by both sides at any time, a muxado.Session implements the net.Listener
// interface (almost! Go unfortunately doesn't support covariant interface satisfaction so there's
// a shim). Each muxado stream implements the net.Conn interface. This allows you to integrate
// muxado into existing code which works with these interfaces (which is most Golang networking code)
// with very little difficulty. Consider the following toy example. Here we'll initiate a new secure
// connection to a server, and then ask it which application it wants via an HTTP request over a muxado stream
// and then serve an entire HTTP application *to the server*.
//
//
// sess, err := muxado.DialTLS("tcp", "example.com:1234", new(tls.Config))
// client := &http.Client{Transport: &http.Transport{Dial: sess.NetDial}}
// resp, err := client.Get("http://example.com/appchoice")
// switch getChoice(resp.Body) {
// case "foo":
// http.Serve(sess.NetListener(), fooHandler)
// case "bar":
// http.Serve(sess.NetListener(), barHandler)
// }
//
//
// In addition to enabling multiple streams over a single connection, muxado enables other
// behaviors which can be useful to the application layer:
// - Both sides of a muxado session may initiate new streams
// - muxado can transparently run application-level heartbeats and timeout dead sessions
// - When connections fail, muxado indicates to the application which streams may be safely retried
// - muxado supports prioritizing streams to maximize useful throughput when bandwidth-constrained
//
// A few examples of what these capabilities might make muxado useful for:
// - eliminating custom async/pipeling code for your protocols
// - eliminating connection pools in your protocols
// - eliminating custom NAT traversal logic for enabling server-initiated streams
//
// muxado has been tuned to be very performant within the limits of what you can expect of pure-Go code.
// Some of muxado's code looks unidiomatic in the quest for better performance. (Locks over channels, never allocating
// from the heap, etc). muxado will typically outperform TCP connections when rapidly initiating many new
// streams with small payloads. When sending a large payload over a single stream, muxado's worst case, it can
// be 2-3x slower and does not parallelize well.
package muxado
package muxado
import (
"github.com/inconshreveable/muxado/proto/frame"
"net"
"time"
)
type StreamId frame.StreamId
type StreamPriority frame.StreamPriority
type StreamType frame.StreamType
type ErrorCode frame.ErrorCode
// Stream is a full duplex stream-oriented connection that is multiplexed over a Session.
// Stream implement the net.Conn inteface.
type Stream interface {
// Write writes the bytes in the given buffer to the stream
Write([]byte) (int, error)
// Read reads the next bytes on the stream into the given buffer
Read([]byte) (int, error)
// Close closes the stream. It attempts to behave as Close does for a TCP conn in that it
// half-closes the stream for sending, and it will send an RST if any more data is received
// from the remote side.
Close() error
// SetDeadline sets a time after which future Read and Write operations will fail.
SetDeadline(time.Time) error
// SetReadDeadline sets a time after which future Read operations will fail.
SetReadDeadline(time.Time) error
// SetWriteDeadline sets a time after which future Write operations will fail.
SetWriteDeadline(time.Time) error
// HalfClose sends a data frame with a fin flag set to half-close the stream from the local side.
HalfClose([]byte) (int, error)
// Id returns the stream's id.
Id() StreamId
// StreamType returns the stream's type
StreamType() StreamType
// Session returns the session object this stream is running on.
Session() Session
// RemoteAddr returns the session transport's remote address.
RemoteAddr() net.Addr
// LocalAddr returns the session transport's local address.
LocalAddr() net.Addr
}
// Session multiplexes many Streams over a single underlying stream transport.
// Both sides of a muxado session can open new Streams. Sessions can also accept
// new streams from the remote side.
//
// A muxado Session implements the net.Listener interface, returning new Streams from the remote side.
type Session interface {
// Open initiates a new stream on the session. It is equivalent to OpenStream(0, 0, false)
Open() (Stream, error)
// OpenStream initiates a new stream on the session. A caller can specify a stream's priority and an opaque stream type.
// Setting fin to true will cause the stream to be half-closed from the local side immediately upon creation.
OpenStream(priority StreamPriority, streamType StreamType, fin bool) (Stream, error)
// Accept returns the next stream initiated by the remote side
Accept() (Stream, error)
// Kill closes the underlying transport stream immediately.
//
// You SHOULD always perfer to call Close() instead so that the connection
// closes cleanly by sending a GoAway frame.
Kill() error
// Close instructs the session to close cleanly, sending a GoAway frame if one hasn't already been sent.
//
// This implementation does not "linger". Pending writes on streams may fail.
//
// You MAY call Close() more than once. Each time after
// the first, Close() will return an error.
Close() error
// GoAway instructs the other side of the connection to stop
// initiating new streams by sending a GoAway frame. Most clients
// will just call Close(), but you may want explicit control of this
// in order to facilitate clean shutdowns.
//
// You MAY call GoAway() more than once. Each time after the first,
// GoAway() will return an error.
GoAway(ErrorCode, []byte) error
// LocalAddr returns the local address of the transport stream over which the session is running.
LocalAddr() net.Addr
// RemoteAddr returns the address of the remote side of the transport stream over which the session is running.
RemoteAddr() net.Addr
// Wait blocks until the session has shutdown and returns the error code for session termination. It also
// returns the error that caused the session to terminate as well as any debug information sent in the GoAway
// frame by the remote side.
Wait() (code ErrorCode, err error, debug []byte)
// NetListener returns an adaptor object which allows this Session to be used as a net.Listener. The returned
// net.Listener returns new streams initiated by the remote side as net.Conn's when calling Accept().
NetListener() net.Listener
// NetDial is a function that implements the same API as net.Dial and can be used in place of it. Users should keep
// in mind that it is the same as a call to Open(). It ignores both arguments passed to it, always initiate a new stream
// to the remote side.
NetDial(_, _ string) (net.Conn, error)
}
package buffer
import (
"errors"
"io"
)
var (
FullError = errors.New("Buffer is full")
)
// Reads as much data
func readInto(rd io.Reader, p []byte) (n int, err error) {
var nr int
for n < len(p) {
nr, err = rd.Read(p[n:])
n += nr
if err != nil {
return
}
}
return
}
// A circular buffer on top of a byte-array
// NOTE: It does not implement the Write() method, it implements ReadFrom()
// to avoid copies
type Circular struct {
buf []byte // the bytes
size int // == len(buf)
head int // index of the next byte to read
tail int // index of the last byte available to read
}
// Returns a new circular buffer of the given size
func NewCircular(size int) *Circular {
return &Circular{
buf: make([]byte, size+1),
size: size + 1,
}
}
// Copy data from the given reader into the buffer
// Any errors encountered while reading are returned EXCEPT io.EOF.
// If the reader fills the buffer, it returns buffer.FullError
func (c *Circular) ReadFrom(rd io.Reader) (n int, err error) {
// IF:
// [---H+++T--]
if c.tail >= c.head {
n, err = readInto(rd, c.buf[c.tail:])
c.tail = (c.tail + n) % c.size
if err == io.EOF {
return n, nil
} else if err != nil {
return
}
}
// NOW:
// [T---H++++] or [++T--H+++]
n2, err := readInto(rd, c.buf[c.tail:c.head])
n += n2
c.tail += n2
if err == nil {
err = FullError
} else if err == io.EOF {
err = nil
}
return
}
// Read data out of the buffer. This never fails but may
// return n==0 if there is no data to be read
func (c *Circular) Read(p []byte) (n int, err error) {
if c.head > c.tail {
n = copy(p, c.buf[c.head:])
c.head = (c.head + n) % c.size
if c.head != 0 {
return
}
}
n2 := copy(p[n:], c.buf[c.head:c.tail])
n += n2
c.head += n2
return
}
package buffer
import (
"bytes"
"reflect"
"testing"
)
func incBuf(start, size int) []byte {
b := make([]byte, size)
for i := 0; i < size; i++ {
b[i] = byte((start + i) % 16)
}
return b
}
func testBuffer() *Circular {
c := NewCircular(15)
c.buf = incBuf(0, 16)
return c
}
func TestEmptyRead(t *testing.T) {
t.Parallel()
var p [1]byte
c := NewCircular(16)
n, err := c.Read(p[:])
if err != nil {
t.Fatalf("Error on read operation: %v")
}
if n != 0 {
t.Errorf("Read %d bytes, expected 0", n)
}
}
// Test Read: [H+++T---]
func TestStartRead(t *testing.T) {
t.Parallel()
c := testBuffer()
readSize := 8
p := make([]byte, readSize+1)
c.tail = readSize
n, err := c.Read(p)
if err != nil {
t.Fatalf("Error while reading: %v", err)
}
if n != readSize {
t.Fatalf("Read expected %d bytes, got %d", readSize, n)
}
expected := incBuf(0, 8)
got := p[:readSize]
if !reflect.DeepEqual(expected, got) {
t.Errorf("Wrong buffer values read. Expected %v, got %v", expected, got)
}
}
func TestMiddleRead(t *testing.T) {
t.Parallel()
c := testBuffer()
readSize := 8
p := make([]byte, readSize+1)
c.head = 4
c.tail = 12
n, err := c.Read(p)
if err != nil {
t.Fatalf("Error while reading: %v", err)
}
if n != readSize {
t.Fatalf("Read expected %d bytes, got %d", readSize)
}
expected := incBuf(4, 8)
got := p[:readSize]
if !reflect.DeepEqual(expected, got) {
t.Errorf("Wrong buffer values read. Expected %v, got %v", expected, got)
}
}
func TestTwoReads(t *testing.T) {
t.Parallel()
c := testBuffer()
readSize := 4
p := make([]byte, readSize)
c.head = 4
c.tail = 12
for i := 0; i < 2; i++ {
n, err := c.Read(p)
if err != nil {
t.Fatalf("Error while reading: %v", err)
}
if n != readSize {
t.Fatalf("Wrong read size. Expected %d, got %d", readSize, n)
}
expected := incBuf(4+(4*i), 4)
if !reflect.DeepEqual(p, expected) {
t.Fatalf("Wrong buffer values for read #%d. Expected %v, got %v", i+1, expected, p)
}
}
}
func TestReadTailZero(t *testing.T) {
t.Parallel()
c := testBuffer()
readSize := 4
p := make([]byte, readSize*2)
c.head = 12
c.tail = 0
n, err := c.Read(p)
if err != nil {
t.Fatalf("Error while reading: %v", err)
}
if n != readSize {
t.Fatalf("Wrong read size. Expected %d, got %d", readSize, n)
}
expected := incBuf(12, 4)
got := p[:readSize]
if !reflect.DeepEqual(got, expected) {
t.Fatalf("Wrong buffer values for read. Expected %v, got %v", expected, got)
}
}
func TestReadWrap(t *testing.T) {
t.Parallel()
c := testBuffer()
readSize := 14
p := make([]byte, readSize*2)
c.head = 12
c.tail = 10
n, err := c.Read(p)
if err != nil {
t.Fatalf("Error while reading: %v", err)
}
if n != readSize {
t.Fatalf("Wrong read size. Expected %d, got %d", readSize, n)
}
expected := incBuf(12, readSize)
got := p[:readSize]
if !reflect.DeepEqual(got, expected) {
t.Fatalf("Wrong buffer values for read. Expected %v, got %v", expected, got)
}
}
func TestEmptyReadAfterExhaustion(t *testing.T) {
t.Parallel()
c := testBuffer()
readSize := 14
p := make([]byte, readSize*2)
c.head = 12
c.tail = 10
n, err := c.Read(p)
if err != nil {
t.Fatalf("Error while reading: %v", err)
}
if n != readSize {
t.Fatalf("Wrong read size. Expected %d, got %d", readSize, n)
}
expected := incBuf(12, readSize)
got := p[:readSize]
if !reflect.DeepEqual(got, expected) {
t.Fatalf("Wrong buffer values for read. Expected %v, got %v", expected, p)
}
n, err = c.Read(p)
if err != nil {
t.Fatalf("Error while reading: %v", err)
}
if n != 0 {
t.Fatalf("Wrong read size. Expected 0, got %d", n)
}
}
func TestWriteTooBig(t *testing.T) {
t.Parallel()
size := 16
p := bytes.NewBuffer(make([]byte, size+1))
c := NewCircular(size)