Commit 686fde50 authored by Guillaume BINET's avatar Guillaume BINET

First hook on tox backend

It works, still have very rough edges.
parent 5c605ae5
......@@ -427,6 +427,12 @@ class Presence(object):
def __unicode__(self):
return str(self.__str__())
STREAM_WAITING_TO_START = 'pending'
STREAM_TRANSFER_IN_PROGRESS = 'in progress'
STREAM_SUCCESSFULLY_TRANSFERED = 'success'
STREAM_PAUSED = 'paused'
STREAM_ERROR = 'error'
STREAM_REJECTED = 'rejected'
class Stream(io.BufferedReader):
"""
......@@ -442,6 +448,7 @@ class Stream(io.BufferedReader):
self._name = name
self._size = size
self._stream_type = stream_type
self._status = STREAM_WAITING_TO_START
@property
def identifier(self):
......@@ -473,6 +480,46 @@ class Stream(io.BufferedReader):
"""
return self._stream_type
@property
def status(self):
"""
The status for this stream.
"""
return self._status
def accept(self):
"""
Signal that the stream has been accepted.
"""
if self._status != STREAM_WAITING_TO_START:
raise ValueError("Invalid state, the stream is not pending.")
self._status = STREAM_TRANSFER_IN_PROGRESS
def reject(self):
"""
Signal that the stream has been rejected.
"""
if self._status != STREAM_WAITING_TO_START:
raise ValueError("Invalid state, the stream is not pending.")
self._status = STREAM_REJECTED
def error(self, reason="unknown"):
"""
An internal plugin error prevented the transfer.
"""
self._status = STREAM_ERROR
self._reason = reason
def success(self):
"""
The streaming finished normally.
"""
if self._status != STREAM_TRANSFER_IN_PROGRESS:
raise ValueError("Invalid state, the stream is not in progress.")
self._status = STREAM_SUCCESSFULLY_TRANSFERED
def clone(self, new_fsource):
"""
Creates a clone and with an alternative stream
......
import logging
import sys
from time import sleep
from os import pipe, fdopen
from os.path import exists, join
from io import BufferedRWPair
from errbot.errBot import ErrBot
import config
from errbot.backends.base import Message, Connection, Identifier, Presence
from errbot.backends.base import Message, Identifier, Presence, Stream
from errbot.backends.base import ONLINE, OFFLINE, AWAY, DND
from errbot.backends.base import build_message, build_text_html_message_pair
......@@ -55,11 +56,18 @@ TOX_GROUP_TO_ERR_STATUS = {
Tox.CHAT_CHANGE_PEER_NAME: None,
}
class ToxStreamer(BufferedRWPair):
def __init__(self):
r, w = pipe()
self.r, self.w = fdopen(r, 'rb'), fdopen(w, 'wb')
super(ToxStreamer, self).__init__(self.r, self.w)
class ToxConnection(Tox, Connection):
def __init__(self, callback, name):
class ToxConnection(Tox):
def __init__(self, backend, name):
super(ToxConnection, self).__init__()
self.callback = callback
self.backend = backend
self.incoming_streams = {}
if exists(TOX_STATEFILE):
self.load_from_file(TOX_STATEFILE)
self.set_name(name)
......@@ -72,6 +80,9 @@ class ToxConnection(Tox, Connection):
logging.info('TOX: connecting...')
self.bootstrap_from_address(*TOX_BOOTSTRAP_SERVER)
def friend_to_idd(self, friend_number):
return Identifier(node=str(friend_number), resource=self.get_name(friend_number))
def on_friend_request(self, friend_pk, message):
logging.info('TOX: Friend request from %s: %s' % (friend_pk, message))
self.add_friend_norequest(friend_pk)
......@@ -79,20 +90,17 @@ class ToxConnection(Tox, Connection):
def on_group_invite(self, friend_number, group_pk):
logging.info('TOX: Group invite from %s : %s' % (self.get_name(friend_number), group_pk))
if not self.callback.is_admin(friend_number):
if not self.backend.is_admin(friend_number):
super(ToxConnection, self).send_message(friend_number, NOT_ADMIN)
return
self.join_groupchat(friend_number, group_pk)
def on_friend_message(self, friend_number, message):
name = self.get_name(friend_number)
# friendId is just a local ordinal as int
friend = Identifier(node=str(friend_number), resource=name)
logging.debug('TOX: %s: %s' % (name, message))
msg = Message(message)
msg.frm = friend
msg.to = self.callback.jid
self.callback.callback_message(msg)
msg.frm = self.friend_to_idd(friend_number)
logging.debug('TOX: %s: %s' % (msg.frm, message))
msg.to = self.backend.jid
self.backend.callback_message(msg)
def on_group_namelist_change(self, group_number, friend_group_number, change):
logging.debug("TOX: user %s changed state in group %s" % (friend_group_number, group_number))
......@@ -102,24 +110,24 @@ class ToxConnection(Tox, Connection):
pres = Presence(nick=self.group_peername(group_number, friend_group_number),
status=newstatus,
chatroom=chatroom)
self.callback.callback_presence(pres)
self.backend.callback_presence(pres)
def on_user_status(self, friend_number, kind):
logging.debug("TOX: user %s changed state", friend_number)
pres = Presence(identifier=Identifier(node=str(friend_number), resource=self.get_name(friend_number)),
pres = Presence(identifier=self.friend_to_idd(friend_number),
status=TOX_TO_ERR_STATUS[kind])
self.callback.callback_presence(pres)
self.backend.callback_presence(pres)
def on_status_message(self, friend_number, message):
pres = Presence(identifier=Identifier(node=str(friend_number), resource=self.get_name(friend_number)),
pres = Presence(identifier=self.friend_to_idd(friend_number),
message=message)
self.callback.callback_presence(pres)
self.backend.callback_presence(pres)
def on_connection_status(self, friend_number, status):
logging.debug("TOX: user %s changed connection status", friend_number)
pres = Presence(identifier=Identifier(node=str(friend_number), resource=self.get_name(friend_number)),
pres = Presence(identifier=self.friend_to_idd(friend_number),
status=ONLINE if status else OFFLINE)
self.callback.callback_presence(pres)
self.backend.callback_presence(pres)
def on_group_message(self, group_number, friend_group_number, message):
logging.debug('TOX: Group-%i User-%i: %s' % (group_number, friend_group_number, message))
......@@ -127,8 +135,41 @@ class ToxConnection(Tox, Connection):
msg.frm = Identifier(node=str(group_number), resource=str(friend_group_number))
msg.to = self.callback.jid
logging.debug('TOX: callback with type = %s' % msg.type)
self.callback.callback_message(msg)
self.backend.callback_message(msg)
#### File transfers
def on_file_send_request(self, friend_number, file_number, file_size, filename):
logging.debug("TOX: incoming file transfer %s : %s", friend_number, filename)
# make a pipe on which we will be able to write from tox
pipe = ToxStreamer()
# make the original stream with all the info
stream = Stream(self.friend_to_idd(friend_number), pipe, filename, file_size)
# store it for tracking purposes
self.incoming_streams[(friend_number, file_number)] = (pipe, stream)
# callback err so it will duplicate the stream and send it to all the plugins
self.backend.callback_stream(stream)
# always say ok, and kill it later if finally we don't want it
self.file_send_control(friend_number, 1, file_number, Tox.FILECONTROL_ACCEPT)
def on_file_data(self, friend_number, file_number, data):
logging.debug("TOX: file data received : %s, size : %d", friend_number, len(data))
pipe, _ = self.incoming_streams[(friend_number, file_number)]
pipe.write(data)
def on_file_control(self, friend_number, receive_send, file_number, control_type, data):
logging.debug("TOX: file control received : %s, type : %d", friend_number, control_type)
if receive_send == 0:
pipe, stream = self.incoming_streams[(friend_number, file_number)]
if control_type == Tox.FILECONTROL_KILL:
stream.error("Other party killed the transfer")
pipe.w.close()
elif control_type == Tox.FILECONTROL_FINISHED:
logging.debug("Other party signal the end of transfer on %s:%s"%(friend_number,file_number))
pipe.flush()
pipe.w.close()
logging.debug("Receive file control %s", control_type)
else:
logging.warn("Sending file is not supported yet")
class ToxBackend(ErrBot):
......
......@@ -41,7 +41,7 @@ from errbot.storage import StoreMixin
from errbot.utils import PLUGINS_SUBDIR, human_name_for_git_url, tail, format_timedelta, which, get_sender_username
from errbot.repos import KNOWN_PUBLIC_REPOS
from errbot.version import VERSION
from errbot.streaming import Tee
def get_class_that_defined_method(meth):
for cls in inspect.getmro(type(meth.__self__)):
......@@ -168,6 +168,10 @@ class ErrBot(Backend, StoreMixin):
except Exception as _:
logging.exception('Crash in the callback_presence handler.')
def callback_stream(self, stream):
logging.info("Initiated an incoming transfer %s" % stream)
Tee(stream, get_all_active_plugin_objects()).start()
def activate_non_started_plugins(self):
logging.info('Activating all the plugins...')
configs = self[CONFIGS]
......
from os import pipe, fdopen
from threading import Thread
from errbot.backends.base import (STREAM_WAITING_TO_START,
STREAM_TRANSFER_IN_PROGRESS,
STREAM_SUCCESSFULLY_TRANSFERED,
STREAM_ERROR,
STREAM_PAUSED,
STREAM_REJECTED,)
import logging
CHUNK_SIZE = 4096
......@@ -26,7 +33,18 @@ class Tee(object):
streams = [self.incoming_stream.clone(pipe[0]) for pipe in pipes]
def streamer(index):
self.clients[index].callback_stream(streams[index])
try:
self.clients[index].callback_stream(streams[index])
if streams[index].status == STREAM_WAITING_TO_START:
streams[index].reject()
logging.warning("%s did not accept nor reject the incoming file transfer, I reject it as a fallback." % self.clients[index].__class__.__name__)
except Exception as _:
# internal error, mark the error.
streams[index].error()
else:
if streams[index].status == STREAM_TRANSFER_IN_PROGRESS:
# if the plugin didn't do it by itself, mark the transfer as a success.
streams[index].success()
# stop the stream if the callback_stream returns
r, w = pipes[index]
pipes[index] = (None, None) # signal the main thread to stop streaming
......@@ -39,12 +57,16 @@ class Tee(object):
thread.start()
while True:
if self.incoming_stream.closed:
break
chunk = self.incoming_stream.read(CHUNK_SIZE)
logging.debug("dispatch %d bytes", len(chunk))
if not chunk:
break
for (_, w) in pipes:
if w:
w.write(chunk)
logging.debug("EOF detected")
for (r, w) in pipes:
if w:
w.close() # close should flush too
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment