13 #ifndef __PROCESS_PROTOBUF_HPP__
14 #define __PROCESS_PROTOBUF_HPP__
16 #include <glog/logging.h>
18 #include <google/protobuf/arena.h>
19 #include <google/protobuf/message.h>
20 #include <google/protobuf/repeated_field.h>
39 const google::protobuf::Message& message)
42 message.SerializeToString(&data);
43 post(to, message.GetTypeName(), data.data(), data.size());
49 const google::protobuf::Message& message)
52 message.SerializeToString(&data);
53 post(from, to, message.GetTypeName(), data.data(), data.size());
81 std::vector<T>
convert(
const google::protobuf::RepeatedPtrField<T>& items)
83 return std::vector<T>(items.begin(), items.end());
88 std::vector<T>
convert(google::protobuf::RepeatedPtrField<T>&& items)
90 return std::vector<T>(
91 std::make_move_iterator(items.begin()),
92 std::make_move_iterator(items.end()));
108 if (protobufHandlers.count(event.message.name) > 0) {
109 from =
event.message.from;
110 protobufHandlers[
event.message.name](
111 event.message.from,
event.message.body);
119 const google::protobuf::Message&
message)
122 message.SerializeToString(&data);
130 CHECK(from) <<
"Attempting to reply without a sender";
135 template <
typename M>
138 google::protobuf::Message* m =
new M();
139 T* t =
static_cast<T*
>(
this);
140 protobufHandlers[m->GetTypeName()] =
143 lambda::_1, lambda::_2);
147 template <
typename M>
150 google::protobuf::Message* m =
new M();
151 T* t =
static_cast<T*
>(
this);
152 protobufHandlers[m->GetTypeName()] =
155 lambda::_1, lambda::_2);
159 template <
typename M,
typename P>
162 template <
typename M>
165 google::protobuf::Message* m =
new M();
166 T* t =
static_cast<T*
>(
this);
167 protobufHandlers[m->GetTypeName()] =
170 lambda::_1, lambda::_2);
174 template <
typename M,
175 typename ...P,
typename ...PC>
180 google::protobuf::Message* m =
new M();
181 T* t =
static_cast<T*
>(
this);
182 protobufHandlers[m->GetTypeName()] =
190 lambda::_1, lambda::_2, param...);
195 template <
typename M>
198 google::protobuf::Message* m =
new M();
199 T* t =
static_cast<T*
>(
this);
200 protobufHandlers[m->GetTypeName()] =
203 lambda::_1, lambda::_2);
207 template <
typename M>
210 google::protobuf::Message* m =
new M();
211 T* t =
static_cast<T*
>(
this);
212 protobufHandlers[m->GetTypeName()] =
215 lambda::_1, lambda::_2);
219 template <
typename M>
222 google::protobuf::Message* m =
new M();
223 T* t =
static_cast<T*
>(
this);
224 protobufHandlers[m->GetTypeName()] =
227 lambda::_1, lambda::_2);
231 template <
typename M,
232 typename ...P,
typename ...PC>
234 void (T::*method)(PC...),
237 google::protobuf::Message* m =
new M();
238 T* t =
static_cast<T*
>(
this);
239 protobufHandlers[m->GetTypeName()] =
247 lambda::_1, lambda::_2, param...);
255 template <
typename M>
256 static void handlerM(
260 const std::string& data)
262 google::protobuf::Arena arena;
263 M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
264 m->ParseFromString(data);
266 if (m->IsInitialized()) {
267 (t->*method)(sender, *m);
269 LOG(WARNING) <<
"Initialization errors: "
270 << m->InitializationErrorString();
274 template <
typename M>
275 static void handlerMutM(
279 const std::string& data)
282 m.ParseFromString(data);
284 if (m.IsInitialized()) {
285 (t->*method)(sender, std::move(m));
287 LOG(WARNING) <<
"Initialization errors: "
288 << m.InitializationErrorString();
292 static void handler0(
296 const std::string& data)
298 (t->*method)(sender);
301 template <
typename M,
302 typename ...P,
typename ...PC>
303 static void handlerN(
307 const std::string& data,
308 MessageProperty<M, P>... p)
310 google::protobuf::Arena arena;
311 M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
312 m->ParseFromString(data);
314 if (m->IsInitialized()) {
317 LOG(WARNING) <<
"Initialization errors: "
318 << m->InitializationErrorString();
323 template <
typename M>
324 static void _handlerM(
326 void (T::*method)(
const M&),
328 const std::string& data)
330 google::protobuf::Arena arena;
331 M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
332 m->ParseFromString(data);
334 if (m->IsInitialized()) {
337 LOG(WARNING) <<
"Initialization errors: "
338 << m->InitializationErrorString();
342 template <
typename M>
343 static void _handlerMutM(
345 void (T::*method)(M&&),
347 const std::string& data)
350 m.ParseFromString(data);
352 if (m.IsInitialized()) {
353 (t->*method)(std::move(m));
355 LOG(WARNING) <<
"Initialization errors: "
356 << m.InitializationErrorString();
360 static void _handler0(
364 const std::string& data)
369 template <
typename M,
370 typename ...P,
typename ...PC>
371 static void _handlerN(
373 void (T::*method)(PC...),
375 const std::string& data,
376 MessageProperty<M, P>... p)
378 google::protobuf::Arena arena;
379 M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
380 m->ParseFromString(data);
382 if (m->IsInitialized()) {
385 LOG(WARNING) <<
"Initialization errors: "
386 << m->InitializationErrorString();
390 typedef lambda::function<
403 template <
typename Req,
typename Res>
438 void response(
const Res& res)
452 template <
typename Req,
typename Res>
457 const Req& req)
const
460 { Req* req =
nullptr; google::protobuf::Message* m = req; (void)m; }
461 { Res* res =
nullptr; google::protobuf::Message* m = res; (void)m; }
469 #endif // __PROCESS_PROTOBUF_HPP__
std::string generate(const std::string &prefix="")
Returns 'prefix(N)' where N represents the number of instances where the same prefix (wrt...
process::Future< Res > operator()(const process::UPID &pid, const Req &req) const
Definition: protobuf.hpp:455
ProcessBase(const std::string &id="")
bool set(const T &_t)
Definition: future.hpp:826
ReqResProcess(const process::UPID &_pid, const Req &_req)
Definition: protobuf.hpp:407
const Future< T > & onDiscard(DiscardCallback &&callback) const
Definition: future.hpp:1370
void install(void(T::*method)(const process::UPID &, const M &))
Definition: protobuf.hpp:136
virtual ~ReqResProcess()
Definition: protobuf.hpp:416
void install(void(T::*method)(const M &))
Definition: protobuf.hpp:196
void install(void(T::*method)())
Definition: protobuf.hpp:220
P(M::*)() const MessageProperty
Definition: protobuf.hpp:160
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
void send(const process::UPID &to, const google::protobuf::Message &message)
Definition: protobuf.hpp:118
void reply(const google::protobuf::Message &message)
Definition: protobuf.hpp:128
Definition: protobuf.hpp:404
void send(const UPID &to, const std::string &name, const char *data=nullptr, size_t length=0)
Sends the message to the specified UPID.
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
virtual ~ProtobufProcess()
Definition: protobuf.hpp:103
void post(const UPID &to, const std::string &name, const char *data=nullptr, size_t length=0)
Sends a message with data without a return address.
hashmap< std::string, MessageHandler > message
Definition: process.hpp:443
void install(void(T::*method)(const process::UPID &, PC...), MessageProperty< M, P >...param)
Definition: protobuf.hpp:176
void install(void(T::*method)(M &&))
Definition: protobuf.hpp:208
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
void consume(process::MessageEvent &&event) override
Definition: protobuf.hpp:106
void consume(MessageEvent &&event) override
Definition: protobuf.hpp:100
Definition: protobuf.hpp:453
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
const T & convert(const T &t)
Definition: protobuf.hpp:74
process::Future< Res > run()
Definition: protobuf.hpp:422
Object protobuf(const google::protobuf::Message &message)
Definition: protobuf.hpp:836
Future< T > future() const
Definition: future.hpp:912
void install(void(T::*method)(const process::UPID &, M &&))
Definition: protobuf.hpp:148
bool discard()
Definition: future.hpp:809
Definition: event.hpp:103
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
Definition: process.hpp:493
void install(void(T::*method)(const process::UPID &))
Definition: protobuf.hpp:163
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
void install(void(T::*method)(PC...), MessageProperty< M, P >...param)
Definition: protobuf.hpp:233