17 #ifndef __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
18 #define __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
20 #include <glog/logging.h>
64 template <
typename Call,
typename Event>
65 class HttpConnectionProcess
89 const std::function<
void(
void)>&
connected,
91 const std::function<
void(
const std::queue<Event>&)>& received)
93 state(State::DISCONNECTED),
94 contentType(_contentType),
96 detector(std::move(_detector)) {}
115 "Cannot process 'SUBSCRIBE' call as the driver is in "
122 "Cannot process '" +
stringify(call.type()) +
"' call "
123 "as the driver is in state " +
stringify(state));
129 VLOG(1) <<
"Sending " << call.type() <<
" call to " << endpoint.
get();
133 request.
url = endpoint.
get();
137 {
"Content-Type",
stringify(contentType)}};
140 if (call.type() == Call::SUBSCRIBE) {
145 response = connections->subscribe.send(request,
true);
152 response = connections->nonSubscribe.send(request);
157 return response.
then(
167 detection = detector->detect(
None())
184 if (future.isFailed()) {
185 LOG(WARNING) <<
"Failed to detect an endpoint: " << future.failure();
210 if (future.isDiscarded()) {
211 LOG(INFO) <<
"Re-detecting endpoint";
214 }
else if (future->isNone()) {
215 LOG(INFO) <<
"Lost endpoint";
219 endpoint = future.
get().get();
221 LOG(INFO) <<
"New endpoint detected at " << endpoint.
get();
228 detection = detector->detect(endpoint)
236 if (connectionId != _connectionId) {
237 VLOG(1) <<
"Ignoring connection attempt from stale connection";
262 if (connectionId != _connectionId) {
263 VLOG(1) <<
"Ignoring connection attempt from stale connection";
269 if (!_connections.isReady()) {
271 _connections.isFailed()
272 ? _connections.failure()
273 :
"Connection future discarded");
277 VLOG(1) <<
"Connected with the remote endpoint at " << endpoint.
get();
281 connections = Connections {
282 std::get<0>(_connections.get()),
283 std::get<1>(_connections.get())};
285 connections->subscribe.disconnected()
290 "Subscribe connection interrupted"));
292 connections->nonSubscribe.disconnected()
297 "Non-subscribe connection interrupted"));
310 if (connections.
isSome()) {
311 connections->subscribe.disconnect();
312 connections->nonSubscribe.disconnect();
315 if (subscribed.
isSome()) {
316 subscribed->reader.close();
321 connections =
None();
324 connectionId =
None();
331 if (connectionId != _connectionId) {
332 VLOG(1) <<
"Ignoring disconnection attempt from stale connection";
349 if (connectionId != _connectionId) {
357 CHECK_EQ(Call::SUBSCRIBE, call.type());
366 lambda::bind(deserialize<Event>, contentType, lambda::_1);
373 subscribed = SubscribedResponse(reader, std::move(decoder));
381 streamId = uuid.
get();
391 CHECK_NE(Call::SUBSCRIBE, call.type());
397 if (call.type() == Call::SUBSCRIBE) {
404 "Received '" + response.
status +
"' (" + response.
body +
")");
408 "Received unexpected '" + response.
status +
409 "' (" + response.
body +
")");
414 subscribed->decoder->read()
425 CHECK(!event.isDiscarded());
428 if (!subscribed.
isSome() || subscribed->reader != reader) {
429 VLOG(1) <<
"Ignoring event from old stale connection";
436 if (event.isFailed()) {
437 LOG(ERROR) <<
"Failed to decode stream of events: "
444 if (event->isNone()) {
445 const std::string
error =
"End-Of-File received";
452 if (event->isError()) {
453 LOG(ERROR) <<
"Failed to de-serialize event: " <<
event->error();
465 LOG(WARNING) <<
"Ignoring " <<
stringify(event.type())
466 <<
" event because we're no longer subscribed";
475 if (events.size() == 1) {
480 events = std::queue<Event>();
493 std::function<void(const std::queue<Event>&)> received;
502 struct SubscribedResponse
507 : reader(std::move(_reader)),
508 decoder(std::move(_decoder)) {}
511 SubscribedResponse(
const SubscribedResponse&) =
delete;
512 SubscribedResponse& operator=(
const SubscribedResponse&) =
delete;
513 SubscribedResponse& operator=(SubscribedResponse&&) =
default;
514 SubscribedResponse(SubscribedResponse&&) =
default;
529 friend std::ostream&
operator<<(std::ostream& stream, State state)
547 const Callbacks callbacks;
550 std::queue<Event> events;
566 #endif // __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
void connected(const id::UUID &_connectionId, const process::Future< std::tuple< process::http::Connection, process::http::Connection >> &_connections)
Definition: http_connection.hpp:255
std::string generate(const std::string &prefix="")
Returns 'prefix(N)' where N represents the number of instances where the same prefix (wrt...
void disconnected(const id::UUID &_connectionId, const std::string &failure)
Definition: http_connection.hpp:328
Definition: nothing.hpp:16
ContentType
Definition: http.hpp:43
Option< Error > validate(const std::string &imageDir)
void unlock()
Definition: mutex.hpp:50
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
ProcessBase(const std::string &id="")
URL url
Definition: http.hpp:529
std::string status
Definition: http.hpp:621
Definition: future.hpp:664
uint16_t code
Definition: http.hpp:658
constexpr const char * prefix
Definition: os.hpp:94
void receive(const Event &event)
Definition: http_connection.hpp:461
mesos::v1::scheduler::Call Call
Definition: mesos.hpp:2583
HTTP connection handler.
Definition: resource_provider.hpp:41
bool discard()
Definition: future.hpp:1173
Definition: result.hpp:40
bool isSome() const
Definition: option.hpp:115
void connect(const id::UUID &_connectionId)
Definition: http_connection.hpp:232
mesos::v1::scheduler::Event Event
Definition: mesos.hpp:2584
Option< Pipe::Reader > reader
Definition: http.hpp:656
HttpConnectionProcess< Call, Event > Self
Definition: http_connection.hpp:175
Provides RecordIO decoding on top of an http::Pipe::Reader.
Definition: recordio.hpp:62
process::Future< Nothing > send(const Call &call)
Definition: http_connection.hpp:98
#define CHECK_SOME(expression)
Definition: check.hpp:44
static UUID random()
Definition: uuid.hpp:38
std::string body
Definition: http.hpp:654
static const uint16_t SERVICE_UNAVAILABLE
Definition: http.hpp:253
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1458
void detected(const process::Future< Option< process::http::URL >> &future)
Definition: http_connection.hpp:182
static const uint16_t ACCEPTED
Definition: http.hpp:220
HttpConnectionProcess(const std::string &prefix, process::Owned< EndpointDetector > _detector, ContentType _contentType, const std::function< Option< Error >(const Call &)> &validate, const std::function< void(void)> &connected, const std::function< void(void)> &disconnected, const std::function< void(const std::queue< Event > &)> &received)
Construct a HTTP connection process.
Definition: http_connection.hpp:84
const T & get() const &
Definition: option.hpp:118
Future< Connection > connect(const network::Address &address, Scheme scheme)
bool keepAlive
Definition: http.hpp:543
static const uint16_t OK
Definition: http.hpp:218
void start()
Definition: http_connection.hpp:165
void dispatch(const UPID &pid, std::unique_ptr< lambda::CallableOnce< void(ProcessBase *)>> f, const Option< const std::type_info * > &functionType=None())
Future< typename result_of< F()>::type > async(const F &f, typename std::enable_if<!std::is_void< typename result_of< F()>::type >::value >::type *=nullptr)
Definition: async.hpp:238
#define UNREACHABLE()
Definition: unreachable.hpp:22
void finalize() override
Invoked when a process is terminated.
Definition: http_connection.hpp:177
Future< X > then(lambda::CallableOnce< Future< X >(const T &)> f) const
Definition: future.hpp:1592
Future< Nothing > lock()
Definition: mutex.hpp:33
const std::string message
Definition: errorbase.hpp:45
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
std::string error(const std::string &msg, uint32_t code)
std::string method
Definition: http.hpp:523
std::string body
Definition: http.hpp:563
PID< HttpConnectionProcess< Call, Event > > self() const
Returns the PID of the process.
Definition: process.hpp:502
bool isNone() const
Definition: option.hpp:116
process::Future< Nothing > _send(const id::UUID &_connectionId, const Call &call, const process::http::Response &response)
Definition: http_connection.hpp:342
std::string toString() const
Definition: uuid.hpp:87
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
void read()
Definition: http_connection.hpp:412
std::string serialize(ContentType contentType, const google::protobuf::Message &message)
static const uint16_t NOT_FOUND
Definition: http.hpp:236
std::string stringify(int flags)
Headers headers
Definition: http.hpp:623
void disconnect()
Definition: http_connection.hpp:308
Definition: process.hpp:493
enum process::http::Response::@4 type
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
bool contains(const Key &key) const
Definition: hashmap.hpp:86
static Try< UUID > fromString(const std::string &s)
Definition: uuid.hpp:67
Represents a connection to an HTTP server.
Definition: http.hpp:945
Given a decoding function for individual records, this provides decoding from "Record-IO" data into t...
Definition: recordio.hpp:82
const T & get() const
Definition: try.hpp:73
void _read(const process::http::Pipe::Reader &reader, const process::Future< Result< Event >> &event)
Definition: http_connection.hpp:421
Headers headers
Definition: http.hpp:531
friend std::ostream & operator<<(std::ostream &stream, State state)
Definition: http_connection.hpp:529
Future< std::list< T > > collect(const std::list< Future< T >> &futures)
Definition: collect.hpp:270