RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* typed_message_channel.h -*- C++ -*- 00002 Jeremy Barnes, 31 May 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 An internal message channel that keeps a ring of typed objects that 00006 are being fed between threads. 00007 */ 00008 00009 #pragma once 00010 00011 #include "jml/utils/ring_buffer.h" 00012 #include "jml/arch/wakeup_fd.h" 00013 #include "soa/service/message_loop.h" 00014 00015 namespace Datacratic { 00016 00017 00018 template<typename Message> 00019 struct TypedMessageChannel { 00020 ML::RingBufferSRMW<Message> buf; 00021 }; 00022 00023 template<typename Message> 00024 struct TypedMessageSink: public AsyncEventSource { 00025 00026 TypedMessageSink(size_t bufferSize) 00027 : wakeup(EFD_NONBLOCK), buf(bufferSize) 00028 { 00029 } 00030 00031 std::function<void (Message && message)> onEvent; 00032 00033 void push(const Message & message) 00034 { 00035 if (buf.tryPush(message)) 00036 wakeup.signal(); 00037 else 00038 throw ML::Exception("the message queue is full"); 00039 } 00040 00041 void push(Message && message) 00042 { 00043 buf.push(message); 00044 wakeup.signal(); 00045 } 00046 00047 bool tryPush(Message && message) 00048 { 00049 bool pushed = buf.tryPush(message); 00050 if (pushed) 00051 wakeup.signal(); 00052 00053 return pushed; 00054 } 00055 00056 //protected: 00057 virtual int selectFd() const 00058 { 00059 return wakeup.fd(); 00060 } 00061 00062 virtual bool poll() const 00063 { 00064 return buf.couldPop(); 00065 } 00066 00067 virtual bool processOne() 00068 { 00069 // Try to do one 00070 Message msg; 00071 if (!buf.tryPop(msg)) 00072 return false; 00073 onEvent(std::move(msg)); 00074 00075 // Are there more waiting for us? 00076 if (buf.couldPop()) 00077 return true; 00078 00079 // Warning: race condition... that's why we need the couldPop from 00080 // the next instruction to be accurate 00081 wakeup.tryRead(); 00082 00083 return buf.couldPop(); 00084 } 00085 00086 private: 00087 ML::Wakeup_Fd wakeup; 00088 ML::RingBufferSRMW<Message> buf; 00089 }; 00090 00091 } // namespace Datacratic