17 #ifndef __COMMON_RECORDIO_HPP__
18 #define __COMMON_RECORDIO_HPP__
70 : process(process::
spawn(
71 new internal::ReaderProcess<T>(std::move(decoder), reader),
111 template <
typename T>
114 const std::function<std::string(
const T&)>& func,
120 return reader->read();
125 if (record.isNone()) {
130 if (record.isError()) {
136 if (!writer.
write(func(record.get()))) {
147 template <
typename T>
163 if (!records.empty()) {
164 Result<T> record = std::move(records.front());
179 waiters.push(std::move(waiter));
180 return waiters.back()->future();
192 fail(
"Reader is terminating");
196 void fail(
const std::string&
message)
198 error =
Error(message);
200 while (!waiters.empty()) {
201 waiters.front()->fail(message);
210 while (!waiters.empty()) {
221 .onAny(
process::defer(
this, &ReaderProcess::_consume, lambda::_1));
227 fail(
"Pipe::Reader failure: " +
233 if (read.
get().empty()) {
241 fail(
"Decoder failure: " + decode.
error());
245 foreach (
const Try<T>& record, decode.
get()) {
246 if (!waiters.empty()) {
247 waiters.front()->set(
Result<T>(std::move(record)));
250 records.push(std::move(record));
260 std::queue<process::Owned<process::Promise<Result<T>>>> waiters;
261 std::queue<Result<T>> records;
272 #endif // __COMMON_RECORDIO_HPP__
bool isReady() const
Definition: future.hpp:1231
std::string generate(const std::string &prefix="")
Returns 'prefix(N)' where N represents the number of instances where the same prefix (wrt...
Definition: errorbase.hpp:35
ProcessBase(const std::string &id="")
const T & get() const
Definition: future.hpp:1310
ControlFlow< typename std::decay< T >::type >::Break Break(T &&t)
Definition: loop.hpp:237
Result< Classifier > decode(const Netlink< struct rtnl_cls > &cls)
Definition: future.hpp:664
virtual void finalize() override
Invoked when a process is terminated.
Definition: recordio.hpp:189
Definition: recordio.hpp:45
virtual ~ReaderProcess()
Definition: recordio.hpp:159
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Definition: result.hpp:40
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
virtual void initialize() override
Invoked when a process gets spawned.
Definition: recordio.hpp:184
bool isSome() const
Definition: option.hpp:115
process::Future< Result< T > > read()
Definition: recordio.hpp:161
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
Provides RecordIO decoding on top of an http::Pipe::Reader.
Definition: recordio.hpp:62
hashmap< std::string, MessageHandler > message
Definition: process.hpp:443
process::Future< Result< T > > read()
Returns the next piece of decoded data from the pipe.
Definition: recordio.hpp:90
bool write(std::string s)
Definition: future.hpp:73
struct ev_loop * loop
Definition: loop.hpp:456
const T & get() const &
Definition: option.hpp:118
virtual ~Reader()
Definition: recordio.hpp:74
static Try error(const E &e)
Definition: try.hpp:42
process::Future< Nothing > transform(process::Owned< Reader< T >> &&reader, const std::function< std::string(const T &)> &func, process::http::Pipe::Writer writer)
This is a helper function that reads records from a Reader, applies a transformation to the records a...
Definition: recordio.hpp:112
const std::string message
Definition: errorbase.hpp:45
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Future< std::string > read()
A "process identifier" used to uniquely identify a process when dispatching messages.
Definition: pid.hpp:279
bool isError() const
Definition: try.hpp:71
Reader(::recordio::Decoder< T > &&decoder, process::http::Pipe::Reader reader)
Definition: recordio.hpp:68
const std::string & failure() const
Definition: future.hpp:1336
Definition: process.hpp:493
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
ReaderProcess(::recordio::Decoder< T > &&_decoder, process::http::Pipe::Reader _reader)
Definition: recordio.hpp:151
Given a decoding function for individual records, this provides decoding from "Record-IO" data into t...
Definition: recordio.hpp:82
const T & get() const
Definition: try.hpp:73
bool isFailed() const
Definition: future.hpp:1245
Definition: future.hpp:57