13 #ifndef __PROCESS_GRPC_HPP__
14 #define __PROCESS_GRPC_HPP__
20 #include <type_traits>
22 #include <google/protobuf/message.h>
24 #include <grpc++/grpc++.h>
43 #define GRPC_RPC(service, rpc) \
44 (&service::Stub::Async##rpc)
63 const std::shared_ptr<::grpc::ChannelCredentials>& credentials =
64 ::grpc::InsecureChannelCredentials())
65 : channel(::grpc::CreateChannel(uri, credentials)) {}
68 std::shared_ptr<::grpc::Channel> channel;
82 RpcResult(const ::grpc::Status& _status,
const T& _response)
118 template <
typename Stub,
typename Request,
typename Response>
121 std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*rpc)(
122 ::grpc::ClientContext*,
124 ::grpc::CompletionQueue*),
128 std::is_convertible<Request*, google::protobuf::Message*>::value,
129 "Request must be a protobuf message");
131 synchronized (data->lock) {
132 if (data->terminating) {
133 return Failure(
"Runtime has been terminated.");
136 std::shared_ptr<::grpc::ClientContext>
context(
137 new ::grpc::ClientContext());
140 context->set_deadline(
141 std::chrono::system_clock::now() + std::chrono::seconds(5));
147 std::shared_ptr<Promise<RpcResult<Response>>>
promise(
150 promise->future().onDiscard([=] { context->TryCancel(); });
152 std::shared_ptr<Response> response(
new Response());
153 std::shared_ptr<::grpc::Status>
status(new ::grpc::Status());
155 std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader(
156 (Stub(channel.channel).*rpc)(context.get(),
request, &data->queue));
161 new lambda::function<void()>(
167 CHECK(promise->future().isPending());
168 if (promise->future().hasDiscard()) {
176 return promise->future();
204 std::unique_ptr<std::thread> looper;
205 ::grpc::CompletionQueue queue;
207 std::atomic_flag lock = ATOMIC_FLAG_INIT;
208 bool terminating =
false;
212 std::shared_ptr<Data> data;
220 #endif // __PROCESS_GRPC_HPP__
T response
Definition: grpc.hpp:86
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
Definition: future.hpp:664
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
Definition: process.hpp:72
The response of a RPC call.
Definition: grpc.hpp:80
RpcResult(const ::grpc::Status &_status, const T &_response)
Definition: grpc.hpp:82
A copyable interface to manage an internal gRPC runtime instance for asynchronous gRPC calls...
Definition: grpc.hpp:104
Definition: future.hpp:73
struct ev_loop * loop
Definition: loop.hpp:456
Channel(const std::string &uri, const std::shared_ptr<::grpc::ChannelCredentials > &credentials=::grpc::InsecureChannelCredentials())
Definition: grpc.hpp:62
Protocol< PromiseRequest, PromiseResponse > promise
Runtime()
Definition: grpc.hpp:107
Future< RpcResult< Response > > call(const Channel &channel, std::unique_ptr<::grpc::ClientAsyncResponseReader< Response >>(Stub::*rpc)(::grpc::ClientContext *, const Request &,::grpc::CompletionQueue *), const Request &request)
Sends an asynchronous gRPC call.
Definition: grpc.hpp:119
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
A copyable interface to manage a connection to a gRPC server.
Definition: grpc.hpp:59
void terminate()
Asks the internal gRPC runtime instance to shut down the CompletionQueue, which would stop its looper...
::grpc::Status status
Definition: grpc.hpp:85
Definition: future.hpp:57