RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/typed_message_channel.h
00001 /* typed_message_channel.h                                         -*- C++ -*-
00002    Jeremy Barnes, 31 May 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    An internal message channel that keeps a ring of typed objects that
00006    are being fed between threads.
00007 */
00008 
00009 #pragma once
00010 
00011 #include "jml/utils/ring_buffer.h"
00012 #include "jml/arch/wakeup_fd.h"
00013 #include "soa/service/message_loop.h"
00014 
00015 namespace Datacratic {
00016 
00017 
00018 template<typename Message>
00019 struct TypedMessageChannel {
00020     ML::RingBufferSRMW<Message> buf;
00021 };
00022 
00023 template<typename Message>
00024 struct TypedMessageSink: public AsyncEventSource {
00025 
00026     TypedMessageSink(size_t bufferSize)
00027         : wakeup(EFD_NONBLOCK), buf(bufferSize)
00028     {
00029     }
00030 
00031     std::function<void (Message && message)> onEvent;
00032 
00033     void push(const Message & message)
00034     {
00035         if (buf.tryPush(message))
00036             wakeup.signal();
00037         else
00038             throw ML::Exception("the message queue is full");
00039     }
00040 
00041     void push(Message && message)
00042     {
00043         buf.push(message);
00044         wakeup.signal();
00045     }
00046 
00047     bool tryPush(Message && message)
00048     {
00049         bool pushed = buf.tryPush(message);
00050         if (pushed)
00051             wakeup.signal();
00052 
00053         return pushed;
00054     }
00055 
00056     //protected:
00057     virtual int selectFd() const
00058     {
00059         return wakeup.fd();
00060     }
00061 
00062     virtual bool poll() const
00063     {
00064         return buf.couldPop();
00065     }
00066 
00067     virtual bool processOne()
00068     {
00069         // Try to do one
00070         Message msg;
00071         if (!buf.tryPop(msg))
00072             return false;
00073         onEvent(std::move(msg));
00074 
00075         // Are there more waiting for us?
00076         if (buf.couldPop())
00077             return true;
00078         
00079         // Warning: race condition... that's why we need the couldPop from
00080         // the next instruction to be accurate
00081         wakeup.tryRead();
00082 
00083         return buf.couldPop();
00084     }
00085 
00086 private:
00087     ML::Wakeup_Fd wakeup;
00088     ML::RingBufferSRMW<Message> buf;
00089 };
00090 
00091 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator