17 #ifndef __NETWORK_HPP__
18 #define __NETWORK_HPP__
64 explicit Network(
const std::set<process::UPID>&
pids);
74 void set(
const std::set<process::UPID>&
pids);
86 template <
typename Req,
typename Res>
90 const std::set<process::UPID>&
filter = std::set<process::UPID>())
const;
97 const std::set<process::UPID>&
filter = std::set<process::UPID>())
const;
112 const std::string& servers,
114 const std::string& znode,
116 const std::set<process::UPID>& base = std::set<process::UPID>());
126 void watch(
const std::set<zookeeper::Group::Membership>& expected);
129 void watched(
const process::Future<std::set<zookeeper::Group::Membership>>&);
139 std::set<process::UPID> base;
194 void set(
const std::set<process::UPID>& _pids)
207 if (satisfied(size, mode)) {
211 Watch*
watch =
new Watch(size, mode);
212 watches.push_back(watch);
216 return watch->promise.future();
221 template <
typename Req,
typename Res>
225 const std::set<process::UPID>&
filter)
227 std::set<process::Future<Res>> futures;
228 typename std::set<process::UPID>::const_iterator iterator;
229 for (iterator = pids.begin(); iterator != pids.end(); ++iterator) {
231 if (filter.count(pid) == 0) {
232 futures.insert(protocol(pid, req));
239 template <
typename M>
242 const std::set<process::UPID>&
filter)
244 std::set<process::UPID>::const_iterator iterator;
245 for (iterator = pids.begin(); iterator != pids.end(); ++iterator) {
247 if (filter.count(pid) == 0) {
260 foreach (Watch*
watch, watches) {
261 watch->promise.fail(
"Network is being terminated");
285 const size_t size = watches.size();
286 for (
size_t i = 0; i <
size; i++) {
287 Watch*
watch = watches.front();
290 if (satisfied(watch->size, watch->mode)) {
291 watch->promise.set(pids.size());
294 watches.push_back(watch);
305 return pids.size() ==
size;
307 return pids.size() !=
size;
309 return pids.size() <
size;
311 return pids.size() <=
size;
313 return pids.size() >
size;
315 return pids.size() >=
size;
317 LOG(FATAL) <<
"Invalid watch mode";
322 std::set<process::UPID> pids;
323 std::list<Watch*> watches;
374 template <
typename Req,
typename Res>
378 const std::set<process::UPID>&
filter)
const
381 protocol, req, filter);
385 template <
typename M>
388 const std::set<process::UPID>&
filter)
const
392 = &NetworkProcess::broadcast<M>;
399 const std::string& servers,
401 const std::string& znode,
403 const std::set<process::UPID>& _base)
404 :
group(servers, timeout, znode, auth),
410 watch(std::set<zookeeper::Group::Membership>());
414 inline void ZooKeeperNetwork::watch(
415 const std::set<zookeeper::Group::Membership>& expected)
417 memberships =
group.watch(expected);
423 inline void ZooKeeperNetwork::watched(
431 LOG(FATAL) <<
"Failed to watch ZooKeeper group: " << memberships.
failure();
436 LOG(INFO) <<
"ZooKeeper group memberships changed";
439 std::list<process::Future<Option<std::string>>> futures;
442 futures.push_back(
group.data(membership));
457 inline void ZooKeeperNetwork::collected(
460 if (datas.isFailed()) {
461 LOG(WARNING) <<
"Failed to get data for ZooKeeper group members: "
466 watch(std::set<zookeeper::Group::Membership>());
472 std::set<process::UPID>
pids;
479 CHECK(pid) <<
"Failed to parse '" << data.
get() <<
"'";
484 LOG(INFO) <<
"ZooKeeper group PIDs: " <<
stringify(pids);
490 watch(memberships.
get());
493 #endif // __NETWORK_HPP__
std::string generate(const std::string &prefix="")
Returns 'prefix(N)' where N represents the number of instances where the same prefix (wrt...
NetworkProcess()
Definition: network.hpp:151
void remove(const process::UPID &pid)
Definition: network.hpp:185
Definition: nothing.hpp:16
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:100
ProcessBase(const std::string &id="")
const T & get() const
Definition: future.hpp:1310
ZooKeeperNetwork(const std::string &servers, const Duration &timeout, const std::string &znode, const Option< zookeeper::Authentication > &auth, const std::set< process::UPID > &base=std::set< process::UPID >())
Definition: network.hpp:398
void add(const process::UPID &pid)
Definition: network.hpp:349
Definition: future.hpp:664
NetworkProcess(const std::set< process::UPID > &pids)
Definition: network.hpp:153
Definition: network.hpp:58
void set(const std::set< process::UPID > &pids)
Definition: network.hpp:361
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Definition: duration.hpp:32
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
Definition: network.hpp:55
void send(const process::UPID &to, const google::protobuf::Message &message)
Definition: protobuf.hpp:118
bool isSome() const
Definition: option.hpp:115
Definition: network.hpp:50
std::set< process::Future< Res > > broadcast(const Protocol< Req, Res > &protocol, const Req &req, const std::set< process::UPID > &filter)
Definition: network.hpp:222
process::Future< size_t > watch(size_t size, WatchMode mode=NOT_EQUAL_TO) const
Definition: network.hpp:367
_Deferred< F > defer(F &&f)
Definition: executor.hpp:54
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
Network()
Definition: network.hpp:327
Definition: network.hpp:148
Definition: network.hpp:57
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1458
Definition: duration.hpp:259
virtual ~Network()
Definition: network.hpp:341
Definition: network.hpp:59
const T & get() const &
Definition: option.hpp:118
Protocol< PromiseRequest, PromiseResponse > promise
process::Future< std::set< process::Future< Res > > > broadcast(const Protocol< Req, Res > &protocol, const Req &req, const std::set< process::UPID > &filter=std::set< process::UPID >()) const
Definition: network.hpp:375
Definition: network.hpp:56
UPID link(const UPID &pid, const RemoteConnection remote=RemoteConnection::REUSE)
Links with the specified UPID.
Definition: protobuf.hpp:100
If a persistent socket to the target pid does not exist, a new link is created.
Nothing broadcast(const M &m, const std::set< process::UPID > &filter)
Definition: network.hpp:240
void remove(const process::UPID &pid)
Definition: network.hpp:355
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Definition: protobuf.hpp:453
#define UNREACHABLE()
Definition: unreachable.hpp:22
Try< std::vector< Entry > > list(const std::string &hierarchy, const std::string &cgroup)
#define CHECK_READY(expression)
Definition: check.hpp:29
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
void add(const process::UPID &pid)
Definition: network.hpp:159
Definition: network.hpp:60
virtual void finalize()
Invoked when a process is terminated.
Definition: network.hpp:258
WatchMode
Definition: network.hpp:53
Future< T > after(const Duration &duration, lambda::CallableOnce< Future< T >(const Future< T > &)> f) const
Definition: future.hpp:1708
Try< mode_t > mode(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:126
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
Definition: network.hpp:108
const std::string & failure() const
Definition: future.hpp:1336
std::string stringify(int flags)
Definition: executor.hpp:29
void set(const std::set< process::UPID > &_pids)
Definition: network.hpp:194
Try< std::set< pid_t > > pids()
Definition: freebsd.hpp:62
process::Future< size_t > watch(size_t size, Network::WatchMode mode)
Definition: network.hpp:205
void filter(Filter *filter)
bool isFailed() const
Definition: future.hpp:1245
Future< std::list< T > > collect(const std::list< Future< T >> &futures)
Definition: collect.hpp:270