streaming.py 3.14 KB
Newer Older
Guillaume BINET's avatar
Guillaume BINET committed
1

2
import os
3
import io
4
from itertools import starmap, repeat
Guillaume BINET's avatar
Guillaume BINET committed
5
from threading import Thread
Guillaume Binet's avatar
Guillaume Binet committed
6
from .backends.base import STREAM_WAITING_TO_START, STREAM_TRANSFER_IN_PROGRESS
7
import logging
Guillaume BINET's avatar
Guillaume BINET committed
8 9 10

CHUNK_SIZE = 4096

11 12
log = logging.getLogger(__name__)

Guillaume BINET's avatar
Guillaume BINET committed
13

14 15 16 17 18 19 20 21 22 23 24 25 26 27
def repeatfunc(func, times=None, *args):  # from the itertools receipes
    """Repeat calls to func with specified arguments.

    Example:  repeatfunc(random.random)

    :param args: params to the function to call.
    :param times: number of times to repeat.
    :param func:  the function to repeatedly call.
    """
    if times is None:
        return starmap(func, repeat(args))
    return starmap(func, repeat(args, times))


Guillaume BINET's avatar
Guillaume BINET committed
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
class Tee(object):
    """ Tee implements a multi reader / single writer """
    def __init__(self, incoming_stream, clients):
        """ clients is a list of objects implementing callback_stream """
        self.incoming_stream = incoming_stream
        self.clients = clients

    def start(self):
        """ starts the transfer asynchronously """
        t = Thread(target=self.run)
        t.start()
        return t

    def run(self):
        """ streams to all the clients synchronously """
        nb_clients = len(self.clients)
44
        pipes = [(io.open(r, 'rb'), io.open(w, 'wb')) for r, w in repeatfunc(os.pipe, nb_clients)]
Guillaume BINET's avatar
Guillaume BINET committed
45 46 47
        streams = [self.incoming_stream.clone(pipe[0]) for pipe in pipes]

        def streamer(index):
48 49 50 51
            try:
                self.clients[index].callback_stream(streams[index])
                if streams[index].status == STREAM_WAITING_TO_START:
                    streams[index].reject()
52
                    plugin = self.clients[index].name
53 54
                    logging.warning('%s did not accept nor reject the incoming file transfer', plugin)
                    logging.warning('I reject it as a fallback.')
55 56 57 58 59 60 61
            except Exception as _:
                # internal error, mark the error.
                streams[index].error()
            else:
                if streams[index].status == STREAM_TRANSFER_IN_PROGRESS:
                    # if the plugin didn't do it by itself, mark the transfer as a success.
                    streams[index].success()
Guillaume BINET's avatar
Guillaume BINET committed
62
            # stop the stream if the callback_stream returns
63
            read, write = pipes[index]
Guillaume BINET's avatar
Guillaume BINET committed
64
            pipes[index] = (None, None)  # signal the main thread to stop streaming
65 66
            read.close()
            write.close()
Guillaume BINET's avatar
Guillaume BINET committed
67 68 69 70 71 72 73

        threads = [Thread(target=streamer, args=(i,)) for i in range(nb_clients)]

        for thread in threads:
            thread.start()

        while True:
74 75
            if self.incoming_stream.closed:
                break
Guillaume BINET's avatar
Guillaume BINET committed
76
            chunk = self.incoming_stream.read(CHUNK_SIZE)
77
            log.debug("dispatch %d bytes", len(chunk))
Guillaume BINET's avatar
Guillaume BINET committed
78 79 80 81 82
            if not chunk:
                break
            for (_, w) in pipes:
                if w:
                    w.write(chunk)
83
        log.debug("EOF detected")
Guillaume BINET's avatar
Guillaume BINET committed
84 85 86 87 88 89 90
        for (r, w) in pipes:
            if w:
                w.close()  # close should flush too
        # we want to be sure that if we join on the main thread,
        # everything is either fully transfered or errored
        for thread in threads:
            thread.join()