controller.py 2.98 KB
Newer Older
1 2
#-----------------------------------------------------------------------------
#  Copyright (c) 2010 Justin Riley
3
#
4 5 6
#  Distributed under the terms of the New BSD License.  The full license is in
#  the file COPYING.BSD, distributed as part of this software.
#-----------------------------------------------------------------------------
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59

import sys
import zmq
import pymongo
import pymongo.json_util
import json

class MongoZMQ(object):
    """
    ZMQ server that adds/fetches documents (ie dictionaries) to a MongoDB.

    NOTE: mongod must be started before using this class
    """

    def __init__(self, db_name, table_name, bind_addr="tcp://127.0.0.1:5000"):
        """
        bind_addr: address to bind zmq socket on
        db_name: name of database to write to (created if doesnt exist)
        table_name: name of mongodb 'table' in the db to write to (created if doesnt exist)
        """
        self._bind_addr = bind_addr
        self._db_name = db_name
        self._table_name = table_name
        self._conn = pymongo.Connection()
        self._db = self._conn[self._db_name]
        self._table = self._db[self._table_name]

    def _doc_to_json(self, doc):
        return json.dumps(doc,default=pymongo.json_util.default)

    def add_document(self, doc):
        """
        Inserts a document (dictionary) into mongo database table
        """
        print 'adding docment %s' % (doc)
        try:
            self._table.insert(doc)
        except Exception,e:
            return 'Error: %s' % e

    def get_document_by_keys(self, keys):
        """
        Attempts to return a single document from database table that matches
        each key/value in keys dictionary.
        """
        print 'attempting to retrieve document using keys: %s' % keys
        try:
            return self._table.find_one(keys)
        except Exception,e:
            return 'Error: %s' % e

    def start(self):
        context = zmq.Context()
60
        socket = context.socket(zmq.ROUTER)
61 62 63 64 65 66 67 68 69 70 71 72 73
        socket.bind(self._bind_addr)
        while True:
            msg = socket.recv_multipart()
            print "Received msg: ", msg
            if  len(msg) != 3:
                error_msg = 'invalid message received: %s' % msg
                print error_msg
                reply = [msg[0], error_msg]
                socket.send_multipart(reply)
                continue
            id = msg[0]
            operation = msg[1]
            contents = json.loads(msg[2])
74
            # always send back the id with ROUTER
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
            reply = [id]
            if operation == 'add':
                self.add_document(contents)
                reply.append("success")
            elif operation == 'get':
                doc = self.get_document_by_keys(contents)
                json_doc = self._doc_to_json(doc)
                reply.append(json_doc)
            else:
                print 'unknown request'
            socket.send_multipart(reply)

def main():
    MongoZMQ('ipcontroller','jobs').start()

if __name__ == "__main__":
   main()