Source code for metlog.senders.zmq

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at

# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2012
# the Initial Developer. All Rights Reserved.
# Contributor(s):
#   Rob Miller (
# ***** END LICENSE BLOCK *****
from __future__ import absolute_import
    import simplejson as json
except ImportError:
    import json  # NOQA

import threading
import sys
import time

if 'gevent.monkey' in sys.modules:
    from gevent import queue as Queue
    import Queue  # NOQA

    if 'gevent.monkey' in sys.modules:
        from gevent_zeromq import zmq
        import zmq  # NOQA
except ImportError:
    zmq = None  # NOQA

# We need to set the maximum number of outbound messages so that
# applications don't consume infinite memory if outbound messages are
# not processed

class BaseClient(object):
    def __init__(self, context):
        self.context = context

        # We need to synchronize around the connected flag
        self._connect_lock = threading.RLock()

    def set_connected(self, state):
        with self._connect_lock:
            self._connected = state
        return self.connected()

    def connected(self):
        with self._connect_lock:
            return self._connected

class SimpleClient(BaseClient):
    def __init__(self, context, connect_bind, hwm=200):
        super(SimpleClient, self).__init__(context)

        self.connect_bind = connect_bind
        self.hwm = hwm
        self.socket = None

        # The pub socket must be created
        # Socket to actually do pub/sub
        self.socket = self.context.socket(zmq.PUB)
        for bindstr in self.connect_bind:
        self.socket.setsockopt(zmq.LINGER, 0)
        self.socket.setsockopt(zmq.HWM, self.hwm)

    def connect(self):
        For the SimpleClient, connect() does nothing as the connect is
        handled in the initializer
        return True

    def send(self, msg):

class HandshakingClient(BaseClient):
    def __init__(self, context, handshake_bind, connect_bind,
        super(HandshakingClient, self).__init__(context)

        self.handshake_bind = handshake_bind
        self.connect_bind = connect_bind

        self.handshake_timeout = handshake_timeout
        self.hwm = hwm

        self.handshake_socket = None
        self.socket = None

        # Socket to actually do pub/sub
        self.socket = self.context.socket(zmq.PUB)
        self.socket.setsockopt(zmq.LINGER, 0)
        self.socket.setsockopt(zmq.HWM, self.hwm)

    def connect(self):
        Connect To the 0mq REPL socket and attempt a handshake to
        ensure we're properly connected.
        # Socket to send handshake signals
        self.handshake_socket = None
            self.handshake_socket = self.context.socket(zmq.REQ)
            self.handshake_socket.setsockopt(zmq.LINGER, 0)

            poll = zmq.Poller()
            poll.register(self.handshake_socket, zmq.POLLIN)

            socks = dict(poll.poll(self.handshake_timeout))

            if socks.get(self.handshake_socket) == zmq.POLLIN:
                return self.set_connected(True)
                return self.set_connected(False)
            # Shutdown the handshake
            if self.handshake_socket != None:

    def send(self, msg):
            if self.connected():
                sys.stderr.write("%s\n" % msg)
        except zmq.ZMQError:
            sys.stderr.write("%s\n" % msg)

[docs]class Pool(object): """ This is a threadsafe pool of 0mq clients. :param client_factory: a factory function that creates Client instances :param size: The number of clients to create in the pool :param livecheck: The time in seconds to wait to ping the server from each client """ def __init__(self, client_factory, size=10, livecheck=10): self._stop_lock = threading.RLock() self._stopped = False self._clients = Queue.Queue() self._livecheck = livecheck # This list is only used to handle reconnects if the # connection to the 0mq subscriber dies self._all_clients = [] for i in range(size): client = client_factory() self._clients.put(client) self._all_clients.append(client) # Connect the clients on a background thread so that we can # startup quickly self._connect_thread_started = False def background_thread(): while True: for client in self._all_clients: client.connect() if self.is_stopped(): break time.sleep(self._livecheck) self._connect_thread = threading.Thread(target=background_thread) self._connect_thread.daemon = True self.start_reconnecting()
[docs] def start_reconnecting(self): """ Start the background thread that handles pings to the server to synchronize the initial pub/sub """ with self._stop_lock: if self._connect_thread_started: return self._connect_thread_started = True self._connect_thread.start()
[docs] def send(self, msg): """ Threadsafely send a single text message over a 0mq socket """ sock = None try: sock = self.socket() sock.send(msg) except Queue.Empty: # Sometimes, we'll get nothing sys.stderr.write("%s\n" % msg) finally: if sock: self._clients.put(sock)
[docs] def stop(self): """ Shutdown the background reconnection thread """ with self._stop_lock: self._stopped = True
def is_stopped(self): with self._stop_lock: return self._stopped def socket(self): return self._clients.get()
[docs]class ZmqSender(object): """ Base class for ZmqPubSender and ZmqHandshakePubSender """ _zmq_context = zmq.Context() if zmq is not None else None
[docs] def __new__(cls, *args, **kwargs): """ Just check that we have pyzmq installed """ if zmq is None: # no zmq -> no ZmqPubSender raise ValueError('Must have `pyzmq` installed to use ZmqPubSender') return super(ZmqSender, cls).__new__(cls)
[docs] def send_message(self, msg): """ Serialize and send a message off to the metlog listener. :param msg: Dictionary representing the message. """ json_msg = json.dumps(msg) if self.debug_stderr: sys.stderr.write(json_msg + '\n') sys.stderr.flush() self.pool.send(json_msg)
[docs]class ZmqPubSender(ZmqSender): """ Sends metlog messages out via a ZeroMQ publisher socket. """
[docs] def __init__(self, bindstrs, pool_size=10, queue_length=MAX_MESSAGES, livecheck=10, debug_stderr=False): """ :param bindstrs: One or more URL strings which 0mq recognizes as an endpoint URL. Either a string or a list of strings is accepted. :param pool_size: The number of connections we maintain to the 0mq backend :param livecheck: Polling interval in seconds between client.connect() calls :param debug_stderr: Boolean flag to send messages to stderr in addition to the actual 0mq socket """ if isinstance(bindstrs, basestring): bindstrs = [bindstrs] def get_client(): return SimpleClient(self._zmq_context, bindstrs, queue_length) self.pool = Pool(client_factory=get_client, size=pool_size, livecheck=livecheck) self.debug_stderr = debug_stderr
[docs]class ZmqHandshakePubSender(ZmqSender): """ Sends metlog messages out via a ZeroMQ publisher socket. Redirect all dropped messages to stderr """
[docs] def __init__(self, handshake_bind, connect_bind, handshake_timeout, pool_size=10, hwm=200, livecheck=10, debug_stderr=False): """ :param handshake_bind: A single 0mq recognized endpoint URL. This should point to the endpoint for handshaking of connections :param connect_bind: A single 0mq recognized endpoint URL. This should point ot the endpoint for sending actual Metlog messages. :param handshake_timeout: Timeout in ms to wait for responses from the 0mq server on handshake :param pool_size: The number of connections we maintain to the 0mq backend :param hwm: High water mark. Set the maximum number of messages to queue before dropping messages in case of a slow reading 0mq server. :param livecheck: Polling interval in seconds between client.connect() calls :param debug_stderr: Boolean flag to send messages to stderr in addition to the actual 0mq socket """ def get_client(): client = HandshakingClient(self._zmq_context, handshake_bind, connect_bind, handshake_timeout, hwm) # Try to get all clients to connect right away client.connect() return client self.pool = Pool(client_factory=get_client, size=pool_size, livecheck=livecheck) self.debug_stderr = debug_stderr
Read the Docs v: latest
On Read the Docs
Project Home

Free document hosting provided by Read the Docs.