ZeroMQ + The Active Object Pattern

I almost missed todays blog post. I was in bed when I remembered.

Today we will implement the Active Object pattern to manage a ZeroMQ Pub socket.
This will allow us to easily send on the socket without worrying about manual thread synchronization.
Note: current versions of ZeroMQ require that only the thread that created the socket interact with the socket.
I will not cover message encoding. I recommend protocol buffers or BERT or JSON, depending on the requirements. Avro and Thrift also look interesting. Especially Avro (I personally think versioning should be part of the handshake)

First a bit of background on the mini project this is for.
Its market data streamer. It talks to an upstream FIX server, subscribes to market data for certain instruments, and encodes and forwards any received market data over ZeroMQ. So basically a FIX to ZeroMQ proxy, but just for market data.
A program does not exist in a vacuum, so as part of this example I have also done an upstream FIX server for “simulating” price ticks.
And Ive done a Python zmq subscriber. This just prints the ticks as the come in. Maybe ill make it do a pretty graph, but thats not important at the moment.

Im not doing this for work. Im between jobs at the moment and enjoying the break.
Im doing this to become more familiar with FIX, and the quirks of QuickFix and ZeroMQ.
Plus its nice to do a small project to keep the mind ticking (heh. ticking. get it?).

Now for some background on ZeroMQ.
ZeroMQ is a lot of things. The usual saying is its sockets on steroids.
One of the nice things it does is provides a uniform interface for various transport types such as TCP, interprocess communication, and pgm/epgm multicast. It also has some nice/common built in messaging patterns (like all MQs). The interesting pattern here is Pub/Sub.
Unlike all MQs, ZeroMQ is brokerless. This is occasionally a bit of a shift in thinking.
ZeroMQ also boasts extremely low latency, probably due to brokerless nature. Financial guys tend to froth at the mouth about latency.

Zed Shaw’s mongrel2 uses zeromq. Mongrel2 looks really interesting.
I havent really been looking that closely at it but I have to admit I got interested when he talked about Tir.
Im happy using django (python) or webmachine (erlang) for my web dev, but I know a lot of people that will be interested in Tir.
At my old job we had *quite* a few Lua developers.
It should be interesting to see how people cope with the Lua GC. IMO Tir will suffer from issues as node.js apparently does. This will effect the types of applications its usable on.

One of the things about QuickFix is that its very much a threaded program. Each session runs in its own thread.
One of the things about zeromq is that (currently) you can only interact with a socket from the thread that created it, regardless of locking.
As I currently only have 1 session in the proxy I could have just created the socket in the Application::onCreate( const FIX::SessionID& ) override. However there is nothing in quickfix or my program that stops a user specifying multiple initiator session in their config file.
Say if they wanted to subscribe to market data from the ASX’s Market Point service, as well as subscribing to data from HKEX using the one proxy instance.
If I went with the above approach that would not be possible to use the same endpoint for more than one FIX session.
So I needed a thread that owned the zmq socket that each session thread communicated with somehow.
And having done threading in the past I want to avoid having manual locks over the place as much as possible.

This is a perfect case for the Active Object pattern.
A nice side effect from this is that it makes it clearer what parts of the code are responsible for what functionality, makes it more modular, and eases implementation. Message Passing baby. Aww yeah.

While TBB and boost both provide some C++0X compatible threading libraries, neither provides an active object class, and only TBB provides a concurrent queue.
Yeah I know. Imagine boost not having something as useful as that. They have everything else.
I prefer using boost for C++0x style threads, so we will need to implement our own message queue and active object classes.

Firstly we need a message queue

// multiple writer, multiple consumer
// based on Anthony Williams implementation (with added support for bounded size)
// Anthony Williams is the current maintainer of boost::thread
// http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
template<typename T>
class concurrent_queue {
public:
 
	concurrent_queue():max_elements(0) {}
	explicit concurrent_queue(size_t max):max_elements(max) {}
 
	// pushes an entry onto the queue.
	// if the queue is at maximum, the current thread waits
	// this helps us avoid producers outpacing the consumer(s) and causing OOM
	void push(const T& v) {
		boost::mutex::scoped_lock l(m_mutex);
		while(max_elements!=0 && m_queue.size() >= max_elements )
		{
			m_cond.wait(l);
		}
		m_queue.push(v);
		l.unlock();
		m_cond.notify_one();
	}
 
	// pops an element off the queue and returns it
	// if there are no elements in the queue the current thread waits
	void pop(T& v) {
		boost::mutex::scoped_lock l(m_mutex);
		while(m_queue.empty())
		{
			m_cond.wait(l);
		}
		// we cant return by value and maintain strong exception safety because copy ctors can throw
		// if it throws on the return we would have already done the pop. 
		// see http://www.gotw.ca/gotw/008.htm
		v = m_queue.front();
		m_queue.pop();
		m_cond.notify_one();
	}
 
	// no guarantee that this is accurate as soon as its returned.
	// but may be useful for diagnostics
	bool empty() const {
		boost::mutex::scoped_lock l(m_mutex);
		return m_queue.empty();
	}
 
	// no guarantee that this is accurate as soon as its returned.
	// but may be useful for diagnostics
	size_t size() const{
		boost::mutex::scoped_lock l(m_mutex);
		return m_queue.size();
	}
 
	size_t max_size () const {
		boost::mutex::scoped_lock l(m_mutex);
		return max_elements;
	}
 
private:
	mutable boost::mutex m_mutex;
	std::queue<T> m_queue;
	size_t max_elements;
	boost::condition_variable m_cond;
};

We can create a helper to ease implementing active objects.

// helper for the Active Object pattern
// see Sutters article at http://www.drdobbs.com/high-performance-computing/225700095
class active_object_helper {
public:
	active_object_helper():m_exit(false) {
		m_thread.reset( new boost::thread( boost::bind(&active_object_helper::run, this) ) );
	}
 
	~active_object_helper(){
		send( boost::bind(&active_object_helper::exit, this) );
		// wait for queue to drain and thread to exit
		m_thread->join();
	}
 
	void send(const boost::function0<void>& f) {m_queue.push(f);}
private:
 
	// gets run on the launched thread
	void run(){
		boost::function0<void> f;
		while (true){
			m_queue.pop(f);
			f();
			f.clear();
			if (m_exit)
				return;
		}
	}
 
	// a message we use to exit the thread
	void exit() { m_exit = true; }
 
	concurrent_queue< boost::function0<void> > m_queue;
	boost::scoped_ptr<boost::thread> m_thread;
	bool m_exit;
};

Now we have our utility classes out of the way, onto the publisher implementation.
Our tick_publisher becomes

class tick_publisher {
   virtual void tick(const MarketData& md) = 0;
   virtual ~tick_publisher() {}
};
 
class zmq_tick_publisher: public tick_publisher {
public:
	zmq_tick_publisher(zmq::context_t& ctx, const std::string& bind_address) {
		m_active_object.send( boost::bind(&zmq_tick_publisher::init, this, boost::ref(ctx), bind_address) );
	}
	virtual ~zmq_tick_publisher(){
		m_active_object.send( boost::bind(&zmq_tick_publisher::deinit, this) );
	}
	virtual void tick(const MarketData& md) {
		m_active_object.send( boost::bind(&zmq_tick_publisher::tick_, this, md) );
	}
private:
	void init(zmq::context_t& ctx, const std::string& bind_address) {
		// setup socket
		m_socket = new zmq::socket_t(ctx, ZMQ_PUB);
		m_socket->bind(bind_address.c_str());
	}
	void deinit(){
		// teardown socket
		delete m_socket;
	}
	void tick_(const MarketData& md){
		// encode and broadcast on socket
		zmq::message_t msg;
		encode(md, msg);
		bool success = m_socket->send(msg);
		assert(success);
	}
	active_object_helper m_active_object;
	zmq::socket_t* m_socket;
};

and using it is as simple as creating it and just calling methods on it. easy.

main() {
 // ...
 std::string bind_address = settings.get().getString("BindAddress"); // eg "tcp://*:5000"
 
 boost::shared_ptr<tick_publisher> tick_pub(new zmq_tick_publisher(zmq_ctx, bind_address) );
 MQFeederApplication(settings, tick_pub);
 // ...
}
 
MQFeederApplication::MQFeederApplication(const FIX::SessionSettings& s, boost::shared_ptr<tick_publisher> publisher) 
	:m_settings(s), m_publisher(publisher)
{
}
 
void MQFeederApplication::onMessage( const FIX44::MarketDataSnapshotFullRefresh& m, const FIX::SessionID& sessionID)
{
	FIX::Symbol s = FIELD_GET_REF(m, Symbol);
	MarketData md(s);
	// fill in market data
	// ==snip==
	// publish
	m_publisher->tick(md);
}

Feels a bit like a gen_server, except in this example we only use the equivalent of gen_server:cast. To implement active objects returning results we can either block the caller thread (gen_server:call style) or return a boost::unique_future. But thats for another day.

Some awesome things about this:

  • No locking in user code. all the locking is in the message_queue. And it doesnt call any unknown code while it holds the lock. Deadlocks are impossible to accidently introduce.
  • The caller of tick() doesnt even need to know about threading. We can call it from any thread without messing up our resources.
  • The zmq socket is completely managed by the active objects thread. No awkward resource sharing. just very simple message passing.

Leave a Comment


NOTE - You can use these HTML tags and attributes:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>