13 #ifndef __PROCESS_EVENT_QUEUE_HPP__
14 #define __PROCESS_EVENT_QUEUE_HPP__
20 #ifdef LOCK_FREE_EVENT_QUEUE
21 #include <concurrentqueue.h>
22 #endif // LOCK_FREE_EVENT_QUEUE
90 bool empty() {
return queue->empty(); }
93 size_t count() {
return queue->count<T>(); }
108 #ifndef LOCK_FREE_EVENT_QUEUE
109 void enqueue(
Event* event)
111 bool enqueued =
false;
112 synchronized (mutex) {
114 events.push_back(event);
126 Event*
event =
nullptr;
128 synchronized (mutex) {
129 if (events.size() > 0) {
130 Event*
event = events.front();
138 return CHECK_NOTNULL(event);
143 synchronized (mutex) {
144 return events.size() == 0;
150 synchronized (mutex) {
152 while (!events.empty()) {
153 Event*
event = events.front();
160 template <
typename T>
163 synchronized (mutex) {
164 return std::count_if(
167 [](
const Event* event) {
168 return event->is<T>();
176 synchronized (mutex) {
177 foreach (
Event* event, events) {
185 std::deque<Event*> events;
186 bool comissioned =
true;
187 #else // LOCK_FREE_EVENT_QUEUE
188 void enqueue(
Event* event)
190 Item item = {sequence.fetch_add(1),
event};
191 if (comissioned.load()) {
192 queue.enqueue(std::move(item));
194 sequence.fetch_sub(1);
204 Event*
event =
nullptr;
211 event = try_dequeue();
212 }
while (event ==
nullptr);
221 return (sequence.load() - next) == 0;
226 comissioned.store(
true);
235 Event*
event = try_dequeue();
236 if (event !=
nullptr) {
242 template <
typename T>
246 queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
248 return std::count_if(
251 [](
const Item& item) {
252 if (item.event !=
nullptr) {
253 return item.event->is<T>();
262 queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
265 foreach (
const Item& item, items) {
266 if (item.event !=
nullptr) {
295 while (!items.empty() && next > items.front().sequence) {
302 if (!items.empty() && items.front().sequence == next) {
303 Event*
event = items.front().event;
316 for (; index < items.size(); index++) {
317 if (items[index].sequence == next) {
319 items[
index].event =
nullptr;
338 }
while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0);
344 moodycamel::ConcurrentQueue<Item> queue;
350 std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0);
368 std::deque<Item> items;
373 std::atomic<bool> comissioned = ATOMIC_VAR_INIT(
true);
374 #endif // LOCK_FREE_EVENT_QUEUE
379 #endif // __PROCESS_EVENT_QUEUE_HPP__
class process::EventQueue::Producer producer
size_t count()
Definition: event_queue.hpp:93
void enqueue(Event *event)
Definition: event_queue.hpp:76
Definition: event_queue.hpp:68
class process::EventQueue::Consumer consumer
bool empty()
Definition: event_queue.hpp:90
mesos::v1::scheduler::Event Event
Definition: mesos.hpp:2584
Result< int > index(const std::string &link)
Definition: event_queue.hpp:73
EventQueue()
Definition: event_queue.hpp:71
std::vector< Value > values
Definition: json.hpp:199
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
void decomission()
Definition: event_queue.hpp:91
Definition: event_queue.hpp:86
Event * dequeue()
Definition: event_queue.hpp:89