Apache Mesos
master.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __MASTER_HPP__
18 #define __MASTER_HPP__
19 
20 #include <stdint.h>
21 
22 #include <list>
23 #include <memory>
24 #include <set>
25 #include <string>
26 #include <vector>
27 
28 #include <boost/circular_buffer.hpp>
29 
30 #include <mesos/mesos.hpp>
31 #include <mesos/resources.hpp>
32 #include <mesos/type_utils.hpp>
33 
35 
39 #include <mesos/master/master.hpp>
40 
42 
43 #include <mesos/quota/quota.hpp>
44 
46 
47 #include <process/limiter.hpp>
48 #include <process/http.hpp>
49 #include <process/owned.hpp>
50 #include <process/process.hpp>
51 #include <process/protobuf.hpp>
52 #include <process/timer.hpp>
53 
55 
56 #include <stout/boundedhashmap.hpp>
57 #include <stout/cache.hpp>
58 #include <stout/foreach.hpp>
59 #include <stout/hashmap.hpp>
60 #include <stout/hashset.hpp>
61 #include <stout/linkedhashmap.hpp>
62 #include <stout/multihashmap.hpp>
63 #include <stout/nothing.hpp>
64 #include <stout/option.hpp>
65 #include <stout/recordio.hpp>
66 #include <stout/try.hpp>
67 #include <stout/uuid.hpp>
68 
69 #include "common/http.hpp"
72 
73 #include "files/files.hpp"
74 
75 #include "internal/devolve.hpp"
76 #include "internal/evolve.hpp"
77 
78 #include "master/constants.hpp"
79 #include "master/flags.hpp"
80 #include "master/machine.hpp"
81 #include "master/metrics.hpp"
82 #include "master/validation.hpp"
83 
84 #include "messages/messages.hpp"
85 
86 namespace process {
87 class RateLimiter; // Forward declaration.
88 }
89 
90 namespace mesos {
91 
92 // Forward declarations.
93 class Authorizer;
94 class ObjectApprover;
95 
96 namespace internal {
97 
98 // Forward declarations.
99 namespace registry {
100 class Slaves;
101 }
102 
103 class Registry;
104 class WhitelistWatcher;
105 
106 namespace master {
107 
108 class Master;
109 class Registrar;
110 class SlaveObserver;
111 
112 struct BoundedRateLimiter;
113 struct Framework;
114 struct Role;
115 
116 
117 struct Slave
118 {
119 Slave(Master* const _master,
120  SlaveInfo _info,
121  const process::UPID& _pid,
122  const MachineID& _machineId,
123  const std::string& _version,
124  std::vector<SlaveInfo::Capability> _capabilites,
125  const process::Time& _registeredTime,
126  std::vector<Resource> _checkpointedResources,
127  const Option<id::UUID>& resourceVersion,
128  std::vector<ExecutorInfo> executorInfos = std::vector<ExecutorInfo>(),
129  std::vector<Task> tasks = std::vector<Task>());
130 
131  ~Slave();
132 
133  Task* getTask(
134  const FrameworkID& frameworkId,
135  const TaskID& taskId) const;
136 
137  void addTask(Task* task);
138 
139  // Update slave to recover the resources that were previously
140  // being used by `task`.
141  //
142  // TODO(bmahler): This is a hack for performance. We need to
143  // maintain resource counters because computing task resources
144  // functionally for all tasks is expensive, for now.
145  void recoverResources(Task* task);
146 
147  void removeTask(Task* task);
148 
149  void addOperation(Operation* operation);
150 
151  void recoverResources(Operation* operation);
152 
153  void removeOperation(Operation* operation);
154 
155  Operation* getOperation(const id::UUID& uuid) const;
156 
157  void addOffer(Offer* offer);
158 
159  void removeOffer(Offer* offer);
160 
161  void addInverseOffer(InverseOffer* inverseOffer);
162 
163  void removeInverseOffer(InverseOffer* inverseOffer);
164 
165  bool hasExecutor(
166  const FrameworkID& frameworkId,
167  const ExecutorID& executorId) const;
168 
169  void addExecutor(
170  const FrameworkID& frameworkId,
171  const ExecutorInfo& executorInfo);
172 
173  void removeExecutor(
174  const FrameworkID& frameworkId,
175  const ExecutorID& executorId);
176 
177  void apply(const std::vector<ResourceConversion>& conversions);
178 
180  const SlaveInfo& info,
181  const std::string& _version,
182  const std::vector<SlaveInfo::Capability>& _capabilites,
183  const Resources& _checkpointedResources,
184  const Option<id::UUID>& resourceVersion);
185 
186  Master* const master;
187  const SlaveID id;
188  SlaveInfo info;
189 
190  const MachineID machineId;
191 
193 
194  // TODO(bmahler): Use stout's Version when it can parse labels, etc.
195  std::string version;
196 
197  // Agent capabilities.
199 
202 
203  // Slave becomes disconnected when the socket closes.
204  bool connected;
205 
206  // Slave becomes deactivated when it gets disconnected. In the
207  // future this might also happen via HTTP endpoint.
208  // No offers will be made for a deactivated slave.
209  bool active;
210 
211  // Timer for marking slaves unreachable that become disconnected and
212  // don't re-register. This timeout is larger than the slave
213  // observer's timeout, so typically the slave observer will be the
214  // one to mark such slaves unreachable; this timer is a backup for
215  // when a slave responds to pings but does not re-register (e.g.,
216  // because agent recovery has hung).
218 
219  // Executors running on this slave.
220  //
221  // TODO(bmahler): Make this private to enforce that `addExecutor()`
222  // and `removeExecutor()` are used, and provide a const view into
223  // the executors.
225 
226  // Tasks that have not yet been launched because they are currently
227  // being authorized. This is similar to Framework's pendingTasks but we
228  // track pendingTasks per agent separately to determine if any offer
229  // operation for this agent would change resources requested by these tasks.
231 
232  // Tasks present on this slave.
233  //
234  // TODO(bmahler): Make this private to enforce that `addTask()` and
235  // `removeTask()` are used, and provide a const view into the tasks.
236  //
237  // TODO(bmahler): The task pointer ownership complexity arises from the fact
238  // that we own the pointer here, but it's shared with the Framework struct.
239  // We should find a way to eliminate this.
241 
242  // Tasks that were asked to kill by frameworks.
243  // This is used for reconciliation when the slave re-registers.
245 
246  // Pending operations or terminal operations that have
247  // unacknowledged status updates on this agent.
249 
250  // Active offers on this slave.
252 
253  // Active inverse offers on this slave.
255 
256  // Resources for active task / executors / operations.
257  // Note that we maintain multiple copies of each shared resource in
258  // `usedResources` as they are used by multiple tasks.
260 
262 
263  // Resources that should be checkpointed by the slave (e.g.,
264  // persistent volumes, dynamic reservations, etc). These are either
265  // in use by a task/executor, or are available for use and will be
266  // re-offered to the framework.
267  // TODO(jieyu): `checkpointedResources` is only for agent default
268  // resources. Resources from resource providers are not included in
269  // this field. Consider removing this field.
271 
272  // The current total resources of the slave. Note that this is
273  // different from 'info.resources()' because this also considers
274  // operations (e.g., CREATE, RESERVE) that have been applied and
275  // includes revocable resources and resources from resource
276  // providers as well.
278 
279  SlaveObserver* observer;
280 
283 
284 private:
285  Slave(const Slave&); // No copying.
286  Slave& operator=(const Slave&); // No assigning.
287 };
288 
289 
290 inline std::ostream& operator<<(std::ostream& stream, const Slave& slave)
291 {
292  return stream << slave.id << " at " << slave.pid
293  << " (" << slave.info.hostname() << ")";
294 }
295 
296 
297 // Represents the streaming HTTP connection to a framework or a client
298 // subscribed to the '/api/vX' endpoint.
300 {
302  ContentType _contentType,
303  id::UUID _streamId)
304  : writer(_writer),
305  contentType(_contentType),
306  streamId(_streamId) {}
307 
308  // We need to evolve the internal old style message/unversioned event into a
309  // versioned event e.g., `v1::scheduler::Event` or `v1::master::Event`.
310  template <typename Message, typename Event = v1::scheduler::Event>
311  bool send(const Message& message)
312  {
314  serialize, contentType, lambda::_1));
315 
316  return writer.write(encoder.encode(evolve(message)));
317  }
318 
319  bool close()
320  {
321  return writer.close();
322  }
323 
325  {
326  return writer.readerClosed();
327  }
328 
332 };
333 
334 
335 // This process periodically sends heartbeats to a given HTTP connection.
336 // The `Message` template parameter is the type of the heartbeat event passed
337 // into the heartbeater during construction, while the `Event` template
338 // parameter is the versioned event type which is sent to the client.
339 // The optional delay parameter is used to specify the delay period before it
340 // sends the first heartbeat.
341 template <typename Message, typename Event>
342 class Heartbeater : public process::Process<Heartbeater<Message, Event>>
343 {
344 public:
345  Heartbeater(const std::string& _logMessage,
346  const Message& _heartbeatMessage,
347  const HttpConnection& _http,
348  const Duration& _interval,
349  const Option<Duration>& _delay = None())
350  : process::ProcessBase(process::ID::generate("heartbeater")),
351  logMessage(_logMessage),
352  heartbeatMessage(_heartbeatMessage),
353  http(_http),
354  interval(_interval),
355  delay(_delay) {}
356 
357 protected:
358  virtual void initialize() override
359  {
360  if (delay.isSome()) {
362  delay.get(),
363  this,
365  } else {
366  heartbeat();
367  }
368  }
369 
370 private:
371  void heartbeat()
372  {
373  // Only send a heartbeat if the connection is not closed.
374  if (http.closed().isPending()) {
375  VLOG(2) << "Sending heartbeat to " << logMessage;
376 
377  Message message(heartbeatMessage);
378  http.send<Message, Event>(message);
379  }
380 
381  process::delay(interval, this, &Heartbeater<Message, Event>::heartbeat);
382  }
383 
384  const std::string logMessage;
385  const Message heartbeatMessage;
386  HttpConnection http;
387  const Duration interval;
388  const Option<Duration> delay;
389 };
390 
391 
392 class Master : public ProtobufProcess<Master>
393 {
394 public:
396  Registrar* registrar,
397  Files* files,
400  const Option<Authorizer*>& authorizer,
401  const Option<std::shared_ptr<process::RateLimiter>>&
402  slaveRemovalLimiter,
403  const Flags& flags = Flags());
404 
405  virtual ~Master();
406 
407  // Message handlers.
408  void submitScheduler(
409  const std::string& name);
410 
411  void registerFramework(
412  const process::UPID& from,
413  const FrameworkInfo& frameworkInfo);
414 
415  void reregisterFramework(
416  const process::UPID& from,
417  const FrameworkInfo& frameworkInfo,
418  bool failover);
419 
420  void unregisterFramework(
421  const process::UPID& from,
422  const FrameworkID& frameworkId);
423 
424  void deactivateFramework(
425  const process::UPID& from,
426  const FrameworkID& frameworkId);
427 
428  // TODO(vinod): Remove this once the old driver is removed.
429  void resourceRequest(
430  const process::UPID& from,
431  const FrameworkID& frameworkId,
432  const std::vector<Request>& requests);
433 
434  void launchTasks(
435  const process::UPID& from,
436  const FrameworkID& frameworkId,
437  const std::vector<TaskInfo>& tasks,
438  const Filters& filters,
439  const std::vector<OfferID>& offerIds);
440 
441  void reviveOffers(
442  const process::UPID& from,
443  const FrameworkID& frameworkId,
444  const std::vector<std::string>& role);
445 
446  void killTask(
447  const process::UPID& from,
448  const FrameworkID& frameworkId,
449  const TaskID& taskId);
450 
452  const process::UPID& from,
453  const SlaveID& slaveId,
454  const FrameworkID& frameworkId,
455  const TaskID& taskId,
456  const std::string& uuid);
457 
458  void schedulerMessage(
459  const process::UPID& from,
460  const SlaveID& slaveId,
461  const FrameworkID& frameworkId,
462  const ExecutorID& executorId,
463  const std::string& data);
464 
465  void executorMessage(
466  const process::UPID& from,
467  const SlaveID& slaveId,
468  const FrameworkID& frameworkId,
469  const ExecutorID& executorId,
470  const std::string& data);
471 
472  void registerSlave(
473  const process::UPID& from,
474  RegisterSlaveMessage&& registerSlaveMessage);
475 
476  void reregisterSlave(
477  const process::UPID& from,
478  ReregisterSlaveMessage&& incomingMessage);
479 
480  void unregisterSlave(
481  const process::UPID& from,
482  const SlaveID& slaveId);
483 
484  void statusUpdate(
485  StatusUpdate update,
486  const process::UPID& pid);
487 
488  void reconcileTasks(
489  const process::UPID& from,
490  const FrameworkID& frameworkId,
491  const std::vector<TaskStatus>& statuses);
492 
494  const UpdateOperationStatusMessage& update);
495 
496  void exitedExecutor(
497  const process::UPID& from,
498  const SlaveID& slaveId,
499  const FrameworkID& frameworkId,
500  const ExecutorID& executorId,
501  int32_t status);
502 
503  void updateSlave(UpdateSlaveMessage&& message);
504 
506  const MachineID& machineId,
508 
509  // Marks the agent unreachable and returns whether the agent was
510  // marked unreachable. Returns false if the agent is already
511  // in a transitioning state or has transitioned into another
512  // state (this includes already being marked unreachable).
513  // The `duringMasterFailover` parameter specifies whether this
514  // agent is transitioning from a recovered state (true) or a
515  // registered state (false).
516  //
517  // Discarding currently not supported.
518  //
519  // Will not return a failure (this will crash the master
520  // internally in the case of a registry failure).
522  const SlaveInfo& slave,
523  bool duringMasterFailover,
524  const std::string& message);
525 
526  void markGone(Slave* slave, const TimeInfo& goneTime);
527 
528  void authenticate(
529  const process::UPID& from,
530  const process::UPID& pid);
531 
532  // TODO(bmahler): It would be preferred to use a unique libprocess
533  // Process identifier (PID is not sufficient) for identifying the
534  // framework instance, rather than relying on re-registration time.
536  const FrameworkID& frameworkId,
537  const process::Time& reregisteredTime);
538 
539  void offer(
540  const FrameworkID& frameworkId,
541  const hashmap<std::string, hashmap<SlaveID, Resources>>& resources);
542 
543  void inverseOffer(
544  const FrameworkID& frameworkId,
545  const hashmap<SlaveID, UnavailableResources>& resources);
546 
547  // Invoked when there is a newly elected leading master.
548  // Made public for testing purposes.
549  void detected(const process::Future<Option<MasterInfo>>& _leader);
550 
551  // Invoked when the contender has lost the candidacy.
552  // Made public for testing purposes.
554 
555  // Continuation of recover().
556  // Made public for testing purposes.
557  process::Future<Nothing> _recover(const Registry& registry);
558 
559  MasterInfo info() const
560  {
561  return info_;
562  }
563 
564 protected:
565  void initialize() override;
566  void finalize() override;
567 
568  void consume(process::MessageEvent&& event) override;
569  void consume(process::ExitedEvent&& event) override;
570 
571  void exited(const process::UPID& pid) override;
572  void exited(const FrameworkID& frameworkId, const HttpConnection& http);
573  void _exited(Framework* framework);
574 
575  // Invoked upon noticing a subscriber disconnection.
576  void exited(const id::UUID& id);
577 
578  void agentReregisterTimeout(const SlaveID& slaveId);
579  Nothing _agentReregisterTimeout(const SlaveID& slaveId);
580 
581  // Invoked when the message is ready to be executed after
582  // being throttled.
583  // 'principal' being None indicates it is throttled by
584  // 'defaultLimiter'.
585  void throttled(
586  process::MessageEvent&& event,
587  const Option<std::string>& principal);
588 
589  // Continuations of consume().
590  void _consume(process::MessageEvent&& event);
591  void _consume(process::ExitedEvent&& event);
592 
593  // Helper method invoked when the capacity for a framework
594  // principal is exceeded.
595  void exceededCapacity(
596  const process::MessageEvent& event,
597  const Option<std::string>& principal,
598  uint64_t capacity);
599 
600  // Recovers state from the registrar.
602  void recoveredSlavesTimeout(const Registry& registry);
603 
604  void _registerSlave(
605  const process::UPID& pid,
606  RegisterSlaveMessage&& registerSlaveMessage,
608  const process::Future<bool>& authorized);
609 
610  void __registerSlave(
611  const process::UPID& pid,
612  RegisterSlaveMessage&& registerSlaveMessage,
613  const process::Future<bool>& admit);
614 
615  void _reregisterSlave(
616  const process::UPID& pid,
617  ReregisterSlaveMessage&& incomingMessage,
619  const process::Future<bool>& authorized);
620 
621  void __reregisterSlave(
622  const process::UPID& pid,
623  ReregisterSlaveMessage&& incomingMessage,
624  const process::Future<bool>& readmit);
625 
626  void ___reregisterSlave(
627  const process::UPID& pid,
628  ReregisterSlaveMessage&& incomingMessage,
629  const process::Future<bool>& updated);
630 
632  Slave* slave,
633  const std::vector<FrameworkInfo>& frameworks);
634 
635  // 'future' is the future returned by the authenticator.
636  void _authenticate(
637  const process::UPID& pid,
638  const process::Future<Option<std::string>>& future);
639 
641 
642  void fileAttached(const process::Future<Nothing>& result,
643  const std::string& path);
644 
645  // Invoked when the contender has entered the contest.
646  void contended(const process::Future<process::Future<Nothing>>& candidacy);
647 
648  // Task reconciliation, split from the message handler
649  // to allow re-use.
650  void _reconcileTasks(
651  Framework* framework,
652  const std::vector<TaskStatus>& statuses);
653 
654  // When a slave that was previously registered with this master
655  // re-registers, we need to reconcile the master's view of the
656  // slave's tasks and executors. This function also sends the
657  // `SlaveReregisteredMessage`.
658  void reconcileKnownSlave(
659  Slave* slave,
660  const std::vector<ExecutorInfo>& executors,
661  const std::vector<Task>& tasks);
662 
663  // Add a framework.
664  void addFramework(
665  Framework* framework,
666  const std::set<std::string>& suppressedRoles);
667 
668  // Recover a framework from its `FrameworkInfo`. This happens after
669  // master failover, when an agent running one of the framework's
670  // tasks re-registers or when the framework itself re-registers,
671  // whichever happens first. The result of this function is a
672  // registered, inactive framework with state `RECOVERED`.
673  void recoverFramework(
674  const FrameworkInfo& info,
675  const std::set<std::string>& suppressedRoles);
676 
677  // Transition a framework from `RECOVERED` to `CONNECTED` state and
678  // activate it. This happens at most once after master failover, the
679  // first time that the framework re-registers with the new master.
680  // Exactly one of `newPid` or `http` must be provided.
682  Framework* framework,
683  const FrameworkInfo& frameworkInfo,
684  const Option<process::UPID>& pid,
685  const Option<HttpConnection>& http,
686  const std::set<std::string>& suppressedRoles);
687 
688  // Replace the scheduler for a framework with a new process ID, in
689  // the event of a scheduler failover.
690  void failoverFramework(Framework* framework, const process::UPID& newPid);
691 
692  // Replace the scheduler for a framework with a new HTTP connection,
693  // in the event of a scheduler failover.
694  void failoverFramework(Framework* framework, const HttpConnection& http);
695 
696  void _failoverFramework(Framework* framework);
697 
698  // Kill all of a framework's tasks, delete the framework object, and
699  // reschedule offers that were assigned to this framework.
700  void removeFramework(Framework* framework);
701 
702  // Remove a framework from the slave, i.e., remove its tasks and
703  // executors and recover the resources.
704  void removeFramework(Slave* slave, Framework* framework);
705 
706  void updateFramework(
707  Framework* framework,
708  const FrameworkInfo& frameworkInfo,
709  const std::set<std::string>& suppressedRoles);
710 
711  void disconnect(Framework* framework);
712  void deactivate(Framework* framework, bool rescind);
713 
714  void disconnect(Slave* slave);
715  void deactivate(Slave* slave);
716 
717  // Add a slave.
718  void addSlave(
719  Slave* slave,
720  std::vector<Archive::Framework>&& completedFrameworks);
721 
722  void _markUnreachable(
723  const SlaveInfo& slave,
724  const TimeInfo& unreachableTime,
725  bool duringMasterFailover,
726  const std::string& message,
727  bool registrarResult);
728 
729  void sendSlaveLost(const SlaveInfo& slaveInfo);
730 
731  // Remove the slave from the registrar and from the master's state.
732  //
733  // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved.
734  void removeSlave(
735  Slave* slave,
736  const std::string& message,
738 
739  void _removeSlave(
740  Slave* slave,
741  const process::Future<bool>& registrarResult,
742  const std::string& removalCause,
744 
745  void __removeSlave(
746  Slave* slave,
747  const std::string& message,
748  const Option<TimeInfo>& unreachableTime);
749 
750  // Validates that the framework is authenticated, if required.
752  const FrameworkInfo& frameworkInfo,
753  const process::UPID& from);
754 
755  // Returns whether the framework is authorized.
756  // Returns failure for transient authorization failures.
758  const FrameworkInfo& frameworkInfo);
759 
760  // Returns whether the principal is authorized to (re-)register an agent
761  // and whether the `SlaveInfo` is authorized.
763  const SlaveInfo& slaveInfo,
765 
766  // Returns whether the task is authorized.
767  // Returns failure for transient authorization failures.
769  const TaskInfo& task,
770  Framework* framework);
771 
790  const Offer::Operation::Reserve& reserve,
792 
793  // Authorizes whether the provided `principal` is allowed to reserve
794  // the specified `resources`.
796  const Resources& resources,
798 
817  const Offer::Operation::Unreserve& unreserve,
819 
838  const Offer::Operation::Create& create,
840 
859  const Offer::Operation::Destroy& destroy,
861 
862  // Add the task and its executor (if not already running) to the
863  // framework and slave. Returns the resources consumed as a result,
864  // which includes resources for the task and its executor
865  // (if not already running).
866  Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave);
867 
868  // Transitions the task, and recovers resources if the task becomes
869  // terminal.
870  void updateTask(Task* task, const StatusUpdate& update);
871 
872  // Removes the task. `unreachable` indicates whether the task is removed due
873  // to being unreachable. Note that we cannot rely on the task state because
874  // it may not reflect unreachability due to being set to TASK_LOST for
875  // backwards compatibility.
876  void removeTask(Task* task, bool unreachable = false);
877 
878  // Remove an executor and recover its resources.
879  void removeExecutor(
880  Slave* slave,
881  const FrameworkID& frameworkId,
882  const ExecutorID& executorId);
883 
884  // Adds the given operation to the framework and the agent.
885  void addOperation(
886  Framework* framework,
887  Slave* slave,
888  Operation* operation);
889 
890  // Transitions the operation, and updates and recovers resources if
891  // the operation becomes terminal. If `convertResources` is `false`
892  // only the consumed resources of terminal operations are recovered,
893  // but no resources are converted.
894  void updateOperation(
895  Operation* operation,
896  const UpdateOperationStatusMessage& update,
897  bool convertResources = true);
898 
899  // Remove the operation.
900  void removeOperation(Operation* operation);
901 
902  // Attempts to update the allocator by applying the given operation.
903  // If successful, updates the slave's resources, sends a
904  // 'CheckpointResourcesMessage' to the slave with the updated
905  // checkpointed resources, and returns a 'Future' with 'Nothing'.
906  // Otherwise, no action is taken and returns a failed 'Future'.
908  Slave* slave,
909  const Offer::Operation& operation);
910 
911  // Forwards the update to the framework.
912  void forward(
913  const StatusUpdate& update,
914  const process::UPID& acknowledgee,
915  Framework* framework);
916 
917  // Remove an offer after specified timeout
918  void offerTimeout(const OfferID& offerId);
919 
920  // Remove an offer and optionally rescind the offer as well.
921  void removeOffer(Offer* offer, bool rescind = false);
922 
923  // Remove an inverse offer after specified timeout
924  void inverseOfferTimeout(const OfferID& inverseOfferId);
925 
926  // Remove an inverse offer and optionally rescind it as well.
927  void removeInverseOffer(InverseOffer* inverseOffer, bool rescind = false);
928 
929  bool isCompletedFramework(const FrameworkID& frameworkId);
930 
931  Framework* getFramework(const FrameworkID& frameworkId) const;
932  Offer* getOffer(const OfferID& offerId) const;
933  InverseOffer* getInverseOffer(const OfferID& inverseOfferId) const;
934 
935  FrameworkID newFrameworkId();
936  OfferID newOfferId();
937  SlaveID newSlaveId();
938 
939 private:
940  // Updates the agent's resources by applying the given operation.
941  // Sends either `ApplyOperationMessage` or
942  // `CheckpointResourcesMessage` (with updated checkpointed
943  // resources) to the agent depending on if the agent has
944  // `RESOURCE_PROVIDER` capability.
945  void _apply(
946  Slave* slave,
947  Framework* framework,
948  const Offer::Operation& operationInfo);
949 
950  void drop(
951  const process::UPID& from,
952  const scheduler::Call& call,
953  const std::string& message);
954 
955  void drop(
956  Framework* framework,
957  const Offer::Operation& operation,
958  const std::string& message);
959 
960  void drop(
961  Framework* framework,
962  const scheduler::Call& call,
963  const std::string& message);
964 
965  void drop(
966  Framework* framework,
967  const scheduler::Call::Suppress& suppress,
968  const std::string& message);
969 
970  void drop(
971  Framework* framework,
972  const scheduler::Call::Revive& revive,
973  const std::string& message);
974 
975  // Call handlers.
976  void receive(
977  const process::UPID& from,
978  const scheduler::Call& call);
979 
980  void subscribe(
981  HttpConnection http,
982  const scheduler::Call::Subscribe& subscribe);
983 
984  void _subscribe(
985  HttpConnection http,
986  const FrameworkInfo& frameworkInfo,
987  bool force,
988  const std::set<std::string>& suppressedRoles,
989  const process::Future<bool>& authorized);
990 
991  void subscribe(
992  const process::UPID& from,
993  const scheduler::Call::Subscribe& subscribe);
994 
995  void _subscribe(
996  const process::UPID& from,
997  const FrameworkInfo& frameworkInfo,
998  bool force,
999  const std::set<std::string>& suppressedRoles,
1000  const process::Future<bool>& authorized);
1001 
1002  // Subscribes a client to the 'api/vX' endpoint.
1003  void subscribe(
1004  const HttpConnection& http,
1006 
1007  void teardown(Framework* framework);
1008 
1009  void accept(
1010  Framework* framework,
1011  scheduler::Call::Accept accept);
1012 
1013  void _accept(
1014  const FrameworkID& frameworkId,
1015  const SlaveID& slaveId,
1016  const Resources& offeredResources,
1017  const scheduler::Call::Accept& accept,
1018  const process::Future<std::list<process::Future<bool>>>& authorizations);
1019 
1020  void acceptInverseOffers(
1021  Framework* framework,
1022  const scheduler::Call::AcceptInverseOffers& accept);
1023 
1024  void decline(
1025  Framework* framework,
1026  const scheduler::Call::Decline& decline);
1027 
1028  void declineInverseOffers(
1029  Framework* framework,
1030  const scheduler::Call::DeclineInverseOffers& decline);
1031 
1032  void revive(
1033  Framework* framework,
1034  const scheduler::Call::Revive& revive);
1035 
1036  void kill(
1037  Framework* framework,
1038  const scheduler::Call::Kill& kill);
1039 
1040  void shutdown(
1041  Framework* framework,
1042  const scheduler::Call::Shutdown& shutdown);
1043 
1044  void acknowledge(
1045  Framework* framework,
1046  const scheduler::Call::Acknowledge& acknowledge);
1047 
1048  void acknowledgeOperationStatus(
1049  Framework* framework,
1050  const scheduler::Call::AcknowledgeOperationStatus& acknowledge);
1051 
1052  void reconcile(
1053  Framework* framework,
1054  const scheduler::Call::Reconcile& reconcile);
1055 
1056  void reconcileOperations(
1057  Framework* framework,
1058  const scheduler::Call::ReconcileOperations& reconcile);
1059 
1060  void message(
1061  Framework* framework,
1062  const scheduler::Call::Message& message);
1063 
1064  void request(
1065  Framework* framework,
1066  const scheduler::Call::Request& request);
1067 
1068  void suppress(
1069  Framework* framework,
1070  const scheduler::Call::Suppress& suppress);
1071 
1072  bool elected() const
1073  {
1074  return leader.isSome() && leader.get() == info_;
1075  }
1076 
1077  void scheduleRegistryGc();
1078 
1079  void doRegistryGc();
1080 
1081  void _doRegistryGc(
1082  const hashset<SlaveID>& toRemoveUnreachable,
1083  const hashset<SlaveID>& toRemoveGone,
1084  const process::Future<bool>& registrarResult);
1085 
1086  process::Future<bool> authorizeLogAccess(
1088 
1096  bool isWhitelistedRole(const std::string& name) const;
1097 
1105  class QuotaHandler
1106  {
1107  public:
1108  explicit QuotaHandler(Master* _master) : master(_master)
1109  {
1110  CHECK_NOTNULL(master);
1111  }
1112 
1113  // Returns a list of set quotas.
1115  const mesos::master::Call& call,
1117  ContentType contentType) const;
1118 
1120  const process::http::Request& request,
1122  principal) const;
1123 
1125  const mesos::master::Call& call,
1127  principal) const;
1128 
1130  const process::http::Request& request,
1132  principal) const;
1133 
1135  const mesos::master::Call& call,
1137  principal) const;
1138 
1140  const process::http::Request& request,
1142  principal) const;
1143 
1144  private:
1145  // Heuristically tries to determine whether a quota request could
1146  // reasonably be satisfied given the current cluster capacity. The
1147  // goal is to determine whether a user may accidentally request an
1148  // amount of resources that would prevent frameworks without quota
1149  // from getting any offers. A force flag will allow users to bypass
1150  // this check.
1151  //
1152  // The heuristic tests whether the total quota, including the new
1153  // request, does not exceed the sum of non-static cluster resources,
1154  // i.e. the following inequality holds:
1155  // total - statically reserved >= total quota + quota request
1156  //
1157  // Please be advised that:
1158  // * It is up to an allocator how to satisfy quota (for example,
1159  // what resources to account towards quota, as well as which
1160  // resources to consider allocatable for quota).
1161  // * Even if there are enough resources at the moment of this check,
1162  // agents may terminate at any time, rendering the cluster under
1163  // quota.
1164  Option<Error> capacityHeuristic(
1165  const mesos::quota::QuotaInfo& request) const;
1166 
1167  // We always want to rescind offers after the capacity heuristic. The
1168  // reason for this is the race between the allocator and the master:
1169  // it can happen that there are not enough free resources at the
1170  // allocator's disposal when it is notified about the quota request,
1171  // but at this point it's too late to rescind.
1172  //
1173  // While rescinding, we adhere to the following rules:
1174  // * Rescind at least as many resources as there are in the quota request.
1175  // * Rescind all offers from an agent in order to make the potential
1176  // offer bigger, which increases the chances that a quota'ed framework
1177  // will be able to use the offer.
1178  // * Rescind offers from at least `numF` agents to make it possible
1179  // (but not guaranteed, due to fair sharing) that each framework in
1180  // the role for which quota is set gets an offer (`numF` is the
1181  // number of frameworks in the quota'ed role). Though this is not
1182  // strictly necessary, we think this will increase the debugability
1183  // and will improve user experience.
1184  //
1185  // TODO(alexr): Consider removing this function once offer management
1186  // (including rescinding) is moved to allocator.
1187  void rescindOffers(const mesos::quota::QuotaInfo& request) const;
1188 
1189  process::Future<bool> authorizeGetQuota(
1191  const mesos::quota::QuotaInfo& quotaInfo) const;
1192 
1193  process::Future<bool> authorizeUpdateQuota(
1195  const mesos::quota::QuotaInfo& quotaInfo) const;
1196 
1199  principal) const;
1200 
1202  const mesos::quota::QuotaRequest& quotaRequest,
1204  principal) const;
1205 
1207  const mesos::quota::QuotaInfo& quotaInfo,
1208  bool forced) const;
1209 
1211  const std::string& role,
1213  principal) const;
1214 
1216  const std::string& role) const;
1217 
1218  // To perform actions related to quota management, we require access to the
1219  // master data structures. No synchronization primitives are needed here
1220  // since `QuotaHandler`'s functions are invoked in the Master's actor.
1221  Master* master;
1222  };
1223 
1231  class WeightsHandler
1232  {
1233  public:
1234  explicit WeightsHandler(Master* _master) : master(_master)
1235  {
1236  CHECK_NOTNULL(master);
1237  }
1238 
1242  principal) const;
1243 
1245  const mesos::master::Call& call,
1247  ContentType contentType) const;
1248 
1250  const process::http::Request& request,
1252  principal) const;
1253 
1255  const mesos::master::Call& call,
1257  ContentType contentType) const;
1258 
1259  private:
1260  process::Future<bool> authorizeGetWeight(
1262  const WeightInfo& weight) const;
1263 
1264  process::Future<bool> authorizeUpdateWeights(
1266  const std::vector<std::string>& roles) const;
1267 
1269  const std::vector<WeightInfo>& weightInfos,
1270  const std::list<bool>& roleAuthorizations) const;
1271 
1274  principal) const;
1275 
1278  const google::protobuf::RepeatedPtrField<WeightInfo>& weightInfos)
1279  const;
1280 
1282  const std::vector<WeightInfo>& weightInfos) const;
1283 
1284  // Rescind all outstanding offers if any of the 'weightInfos' roles has
1285  // an active framework.
1286  void rescindOffers(const std::vector<WeightInfo>& weightInfos) const;
1287 
1288  Master* master;
1289  };
1290 
1291  // Inner class used to namespace HTTP route handlers (see
1292  // master/http.cpp for implementations).
1293  class Http
1294  {
1295  public:
1296  explicit Http(Master* _master) : master(_master),
1297  quotaHandler(_master),
1298  weightsHandler(_master) {}
1299 
1300  // /api/v1
1302  const process::http::Request& request,
1304  principal) const;
1305 
1306  // /api/v1/scheduler
1308  const process::http::Request& request,
1310  principal) const;
1311 
1312  // /master/create-volumes
1314  const process::http::Request& request,
1316  principal) const;
1317 
1318  // /master/destroy-volumes
1320  const process::http::Request& request,
1322  principal) const;
1323 
1324  // /master/flags
1326  const process::http::Request& request,
1328  principal) const;
1329 
1330  // /master/frameworks
1332  const process::http::Request& request,
1334  principal) const;
1335 
1336  // /master/health
1338  const process::http::Request& request) const;
1339 
1340  // /master/redirect
1342  const process::http::Request& request) const;
1343 
1344  // /master/reserve
1346  const process::http::Request& request,
1348  principal) const;
1349 
1350  // /master/roles
1352  const process::http::Request& request,
1354  principal) const;
1355 
1356  // /master/teardown
1358  const process::http::Request& request,
1360  principal) const;
1361 
1362  // /master/slaves
1364  const process::http::Request& request,
1366  principal) const;
1367 
1368  // /master/state
1370  const process::http::Request& request,
1372  principal) const;
1373 
1374  // /master/state-summary
1376  const process::http::Request& request,
1378  principal) const;
1379 
1380  // /master/tasks
1382  const process::http::Request& request,
1384  principal) const;
1385 
1386  // /master/maintenance/schedule
1387  process::Future<process::http::Response> maintenanceSchedule(
1388  const process::http::Request& request,
1390  principal) const;
1391 
1392  // /master/maintenance/status
1393  process::Future<process::http::Response> maintenanceStatus(
1394  const process::http::Request& request,
1396  principal) const;
1397 
1398  // /master/machine/down
1400  const process::http::Request& request,
1402  principal) const;
1403 
1404  // /master/machine/up
1406  const process::http::Request& request,
1408  principal) const;
1409 
1410  // /master/unreserve
1412  const process::http::Request& request,
1414  principal) const;
1415 
1416  // /master/quota
1418  const process::http::Request& request,
1420  principal) const;
1421 
1422  // /master/weights
1424  const process::http::Request& request,
1426  principal) const;
1427 
1428  static std::string API_HELP();
1429  static std::string SCHEDULER_HELP();
1430  static std::string FLAGS_HELP();
1431  static std::string FRAMEWORKS_HELP();
1432  static std::string HEALTH_HELP();
1433  static std::string REDIRECT_HELP();
1434  static std::string ROLES_HELP();
1435  static std::string TEARDOWN_HELP();
1436  static std::string SLAVES_HELP();
1437  static std::string STATE_HELP();
1438  static std::string STATESUMMARY_HELP();
1439  static std::string TASKS_HELP();
1440  static std::string MAINTENANCE_SCHEDULE_HELP();
1441  static std::string MAINTENANCE_STATUS_HELP();
1442  static std::string MACHINE_DOWN_HELP();
1443  static std::string MACHINE_UP_HELP();
1444  static std::string CREATE_VOLUMES_HELP();
1445  static std::string DESTROY_VOLUMES_HELP();
1446  static std::string RESERVE_HELP();
1447  static std::string UNRESERVE_HELP();
1448  static std::string QUOTA_HELP();
1449  static std::string WEIGHTS_HELP();
1450 
1451  private:
1452  JSON::Object __flags() const;
1453 
1454  class FlagsError; // Forward declaration.
1455 
1458  principal) const;
1459 
1461  const size_t limit,
1462  const size_t offset,
1463  const std::string& order,
1465  principal) const;
1466 
1468  const FrameworkID& id,
1470  principal) const;
1471 
1473  const FrameworkID& id) const;
1474 
1475  process::Future<process::http::Response> _updateMaintenanceSchedule(
1476  const mesos::maintenance::Schedule& schedule,
1478  principal) const;
1479 
1480  process::Future<process::http::Response> __updateMaintenanceSchedule(
1481  const mesos::maintenance::Schedule& schedule,
1482  const process::Owned<ObjectApprover>& approver) const;
1483 
1484  process::Future<process::http::Response> ___updateMaintenanceSchedule(
1485  const mesos::maintenance::Schedule& schedule,
1486  bool applied) const;
1487 
1488  mesos::maintenance::Schedule _getMaintenanceSchedule(
1489  const process::Owned<ObjectApprover>& approver) const;
1490 
1492  const process::Owned<ObjectApprover>& approver) const;
1493 
1494  process::Future<process::http::Response> _startMaintenance(
1495  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1496  const process::Owned<ObjectApprover>& approver) const;
1497 
1499  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1500  const process::Owned<ObjectApprover>& approver) const;
1501 
1503  const SlaveID& slaveId,
1504  const google::protobuf::RepeatedPtrField<Resource>& resources,
1506  principal) const;
1507 
1509  const SlaveID& slaveId,
1510  const google::protobuf::RepeatedPtrField<Resource>& resources,
1512  principal) const;
1513 
1515  const SlaveID& slaveId,
1516  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1518  principal) const;
1519 
1521  const SlaveID& slaveId,
1522  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1524  principal) const;
1525 
1546  const SlaveID& slaveId,
1547  Resources required,
1548  const Offer::Operation& operation) const;
1549 
1552  principal) const;
1553 
1554  // Master API handlers.
1555 
1557  const mesos::master::Call& call,
1559  ContentType contentType) const;
1560 
1561  mesos::master::Response::GetAgents _getAgents(
1562  const process::Owned<AuthorizationAcceptor>& rolesAcceptor) const;
1563 
1565  const mesos::master::Call& call,
1567  ContentType contentType) const;
1568 
1570  const mesos::master::Call& call,
1572  ContentType contentType) const;
1573 
1575  const mesos::master::Call& call,
1577  ContentType contentType) const;
1578 
1580  const mesos::master::Call& call,
1582  ContentType contentType) const;
1583 
1585  const mesos::master::Call& call,
1587  ContentType contentType) const;
1588 
1590  const mesos::master::Call& call,
1592  ContentType contentType) const;
1593 
1595  const mesos::master::Call& call,
1597  ContentType contentType) const;
1598 
1600  const mesos::master::Call& call,
1602  ContentType contentType) const;
1603 
1605  const mesos::master::Call& call,
1607  ContentType contentType) const;
1608 
1609  process::Future<process::http::Response> updateMaintenanceSchedule(
1610  const mesos::master::Call& call,
1612  ContentType contentType) const;
1613 
1614  process::Future<process::http::Response> getMaintenanceSchedule(
1615  const mesos::master::Call& call,
1617  ContentType contentType) const;
1618 
1619  process::Future<process::http::Response> getMaintenanceStatus(
1620  const mesos::master::Call& call,
1622  ContentType contentType) const;
1623 
1625  const mesos::master::Call& call,
1627  ContentType contentType) const;
1628 
1630  const mesos::master::Call& call,
1632  ContentType contentType) const;
1633 
1635  const mesos::master::Call& call,
1637  ContentType contentType) const;
1638 
1639  mesos::master::Response::GetTasks _getTasks(
1640  const process::Owned<ObjectApprover>& frameworksApprover,
1641  const process::Owned<ObjectApprover>& tasksApprover) const;
1642 
1644  const mesos::master::Call& call,
1646  ContentType contentType) const;
1647 
1649  const mesos::master::Call& call,
1651  ContentType contentType) const;
1652 
1654  const mesos::master::Call& call,
1656  ContentType contentType) const;
1657 
1658  process::Future<process::http::Response> unreserveResources(
1659  const mesos::master::Call& call,
1661  ContentType contentType) const;
1662 
1664  const mesos::master::Call& call,
1666  ContentType contentType) const;
1667 
1668  mesos::master::Response::GetFrameworks _getFrameworks(
1669  const process::Owned<ObjectApprover>& frameworksApprover) const;
1670 
1672  const mesos::master::Call& call,
1674  ContentType contentType) const;
1675 
1676  mesos::master::Response::GetExecutors _getExecutors(
1677  const process::Owned<ObjectApprover>& frameworksApprover,
1678  const process::Owned<ObjectApprover>& executorsApprover) const;
1679 
1681  const mesos::master::Call& call,
1683  ContentType contentType) const;
1684 
1685  mesos::master::Response::GetState _getState(
1686  const process::Owned<ObjectApprover>& frameworksApprover,
1687  const process::Owned<ObjectApprover>& taskApprover,
1688  const process::Owned<ObjectApprover>& executorsApprover,
1689  const process::Owned<AuthorizationAcceptor>& rolesAcceptor) const;
1690 
1692  const mesos::master::Call& call,
1694  ContentType contentType) const;
1695 
1697  const mesos::master::Call& call,
1699  ContentType contentType) const;
1700 
1702  const mesos::master::Call& call,
1704  ContentType contentType) const;
1705 
1707  const mesos::master::Call& call,
1709  ContentType contentType) const;
1710 
1712  const SlaveID& slaveId) const;
1713 
1714  Master* master;
1715 
1716  // NOTE: The quota specific pieces of the Operator API are factored
1717  // out into this separate class.
1718  QuotaHandler quotaHandler;
1719 
1720  // NOTE: The weights specific pieces of the Operator API are factored
1721  // out into this separate class.
1722  WeightsHandler weightsHandler;
1723  };
1724 
1725  Master(const Master&); // No copying.
1726  Master& operator=(const Master&); // No assigning.
1727 
1728  friend struct Framework;
1729  friend struct Metrics;
1730  friend struct Slave;
1731  friend struct SlavesWriter;
1732  friend struct Subscriber;
1733 
1734  // NOTE: Since 'getOffer', 'getInverseOffer' and 'slaves' are
1735  // protected, we need to make the following functions friends.
1736  friend Offer* validation::offer::getOffer(
1737  Master* master, const OfferID& offerId);
1738 
1739  friend InverseOffer* validation::offer::getInverseOffer(
1740  Master* master, const OfferID& offerId);
1741 
1743  Master* master, const SlaveID& slaveId);
1744 
1745  const Flags flags;
1746 
1747  Http http;
1748 
1749  Option<MasterInfo> leader; // Current leading master.
1750 
1751  mesos::allocator::Allocator* allocator;
1752  WhitelistWatcher* whitelistWatcher;
1753  Registrar* registrar;
1754  Files* files;
1755 
1758 
1759  const Option<Authorizer*> authorizer;
1760 
1761  MasterInfo info_;
1762 
1763  // Holds some info which affects how a machine behaves, as well as state that
1764  // represent the master's view of this machine. See the `MachineInfo` protobuf
1765  // and `Machine` struct for more information.
1767 
1768  struct Maintenance
1769  {
1770  // Holds the maintenance schedule, as given by the operator.
1771  std::list<mesos::maintenance::Schedule> schedules;
1772  } maintenance;
1773 
1774  // Indicates when recovery is complete. Recovery begins once the
1775  // master is elected as a leader.
1777 
1778  // If this is the leading master, we periodically check whether we
1779  // should GC some information from the registry.
1780  Option<process::Timer> registryGcTimer;
1781 
1782  struct Slaves
1783  {
1784  Slaves() : removed(MAX_REMOVED_SLAVES) {}
1785 
1786  // Imposes a time limit for slaves that we recover from the
1787  // registry to re-register with the master.
1788  Option<process::Timer> recoveredTimer;
1789 
1790  // Slaves that have been recovered from the registrar after master
1791  // failover. Slaves are removed from this collection when they
1792  // either re-register with the master or are marked unreachable
1793  // because they do not re-register before `recoveredTimer` fires.
1794  // We must not answer questions related to these slaves (e.g.,
1795  // during task reconciliation) until we determine their fate
1796  // because their are in this transitioning state.
1797  hashmap<SlaveID, SlaveInfo> recovered;
1798 
1799  // Agents that are in the process of (re-)registering. They are
1800  // maintained here while the (re-)registration is in progress and
1801  // possibly pending in the authorizer or the registrar in order
1802  // to help deduplicate (re-)registration requests.
1803  hashset<process::UPID> registering;
1804  hashset<SlaveID> reregistering;
1805 
1806  // Registered slaves are indexed by SlaveID and UPID. Note that
1807  // iteration is supported but is exposed as iteration over a
1808  // hashmap<SlaveID, Slave*> since it is tedious to convert
1809  // the map's key/value iterator into a value iterator.
1810  //
1811  // TODO(bmahler): Consider pulling in boost's multi_index,
1812  // or creating a simpler indexing abstraction in stout.
1813  struct
1814  {
1815  bool contains(const SlaveID& slaveId) const
1816  {
1817  return ids.contains(slaveId);
1818  }
1819 
1820  bool contains(const process::UPID& pid) const
1821  {
1822  return pids.contains(pid);
1823  }
1824 
1825  Slave* get(const SlaveID& slaveId) const
1826  {
1827  return ids.get(slaveId).getOrElse(nullptr);
1828  }
1829 
1830  Slave* get(const process::UPID& pid) const
1831  {
1832  return pids.get(pid).getOrElse(nullptr);
1833  }
1834 
1835  void put(Slave* slave)
1836  {
1837  CHECK_NOTNULL(slave);
1838  ids[slave->id] = slave;
1839  pids[slave->pid] = slave;
1840  }
1841 
1842  void remove(Slave* slave)
1843  {
1844  CHECK_NOTNULL(slave);
1845  ids.erase(slave->id);
1846  pids.erase(slave->pid);
1847  }
1848 
1849  void clear()
1850  {
1851  ids.clear();
1852  pids.clear();
1853  }
1854 
1855  size_t size() const { return ids.size(); }
1856 
1857  typedef hashmap<SlaveID, Slave*>::iterator iterator;
1858  typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator;
1859 
1860  iterator begin() { return ids.begin(); }
1861  iterator end() { return ids.end(); }
1862 
1863  const_iterator begin() const { return ids.begin(); }
1864  const_iterator end() const { return ids.end(); }
1865 
1866  private:
1869  } registered;
1870 
1871  // Slaves that are in the process of being removed from the
1872  // registrar.
1873  hashset<SlaveID> removing;
1874 
1875  // Slaves that are in the process of being marked unreachable.
1876  hashset<SlaveID> markingUnreachable;
1877 
1878  // Slaves that are in the process of being marked gone.
1879  hashset<SlaveID> markingGone;
1880 
1881  // This collection includes agents that have gracefully shutdown,
1882  // as well as those that have been marked unreachable or gone. We
1883  // keep a cache here to prevent this from growing in an unbounded
1884  // manner.
1885  //
1886  // TODO(bmahler): Ideally we could use a cache with set semantics.
1887  //
1888  // TODO(neilc): Consider storing all agent IDs that have been
1889  // marked unreachable by this master.
1891 
1892  // Slaves that have been marked unreachable. We recover this from
1893  // the registry, so it includes slaves marked as unreachable by
1894  // other instances of the master. Note that we use a LinkedHashMap
1895  // to ensure the order of elements here matches the order in the
1896  // registry's unreachable list, which matches the order in which
1897  // agents are marked unreachable. This list is garbage collected;
1898  // GC behavior is governed by the `registry_gc_interval`,
1899  // `registry_max_agent_age`, and `registry_max_agent_count` flags.
1901 
1902  // Slaves that have been marked gone. We recover this from the
1903  // registry, so it includes slaves marked as gone by other instances
1904  // of the master. Note that we use a LinkedHashMap to ensure the order
1905  // of elements here matches the order in the registry's gone list, which
1906  // matches the order in which agents are marked gone.
1908 
1909  // This rate limiter is used to limit the removal of slaves failing
1910  // health checks.
1911  // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
1912  // a wrapper around libprocess process which is thread safe.
1914  } slaves;
1915 
1916  struct Frameworks
1917  {
1918  Frameworks(const Flags& masterFlags)
1919  : completed(masterFlags.max_completed_frameworks) {}
1920 
1922 
1924 
1925  // Principals of frameworks keyed by PID.
1926  // NOTE: Multiple PIDs can map to the same principal. The
1927  // principal is None when the framework doesn't specify it.
1928  // The differences between this map and 'authenticated' are:
1929  // 1) This map only includes *registered* frameworks. The mapping
1930  // is added when a framework (re-)registers.
1931  // 2) This map includes unauthenticated frameworks (when Master
1932  // allows them) if they have principals specified in
1933  // FrameworkInfo.
1935 
1936  // BoundedRateLimiters keyed by the framework principal.
1937  // Like Metrics::Frameworks, all frameworks of the same principal
1938  // are throttled together at a common rate limit.
1940 
1941  // The default limiter is for frameworks not specified in
1942  // 'flags.rate_limits'.
1944  } frameworks;
1945 
1946  struct Subscribers
1947  {
1948  Subscribers(Master* _master) : master(_master) {};
1949 
1950  // Represents a client subscribed to the 'api/vX' endpoint.
1951  //
1952  // TODO(anand): Add support for filtering. Some subscribers
1953  // might only be interested in a subset of events.
1954  struct Subscriber
1955  {
1957  const HttpConnection& _http,
1959  : http(_http),
1960  principal(_principal)
1961  {
1962  mesos::master::Event event;
1963  event.set_type(mesos::master::Event::HEARTBEAT);
1964 
1965  heartbeater =
1968  "subscriber " + stringify(http.streamId),
1969  event,
1970  http,
1973 
1974  process::spawn(heartbeater.get());
1975  }
1976 
1977  // Not copyable, not assignable.
1978  Subscriber(const Subscriber&) = delete;
1979  Subscriber& operator=(const Subscriber&) = delete;
1980 
1981  // The `AuthorizationAcceptor` parameters here are not all required for
1982  // every event, but we currently construct and pass them all regardless
1983  // of the event type.
1984  //
1985  // TODO(greggomann): Refactor this function into multiple event-specific
1986  // overloads. See MESOS-8475.
1987  void send(
1989  const process::Owned<AuthorizationAcceptor>& authorizeRole,
1992  const process::Owned<AuthorizationAcceptor>& authorizeExecutor,
1993  const Option<process::Shared<FrameworkInfo>>& frameworkInfo,
1994  const Option<process::Shared<Task>>& task);
1995 
1997  {
1998  // TODO(anand): Refactor `HttpConnection` to being a RAII class instead.
1999  // It is possible that a caller might accidentally invoke `close()`
2000  // after passing ownership to the `Subscriber` object. See MESOS-5843
2001  // for more details.
2002  http.close();
2003 
2004  terminate(heartbeater.get());
2005  wait(heartbeater.get());
2006  }
2007 
2012  };
2013 
2014  // Sends the event to all subscribers connected to the 'api/vX' endpoint.
2015  void send(mesos::master::Event&& event);
2016 
2017  Master* master;
2018 
2019  // Active subscribers to the 'api/vX' endpoint keyed by the stream
2020  // identifier.
2022  };
2023 
2024  Subscribers subscribers;
2025 
2026  hashmap<OfferID, Offer*> offers;
2028 
2029  hashmap<OfferID, InverseOffer*> inverseOffers;
2030  hashmap<OfferID, process::Timer> inverseOfferTimers;
2031 
2032  // We track information about roles that we're aware of in the system.
2033  // Specifically, we keep track of the roles when a framework subscribes to
2034  // the role, and/or when there are resources allocated to the role
2035  // (e.g. some tasks and/or executors are consuming resources under the role).
2037 
2038  // Configured role whitelist if using the (deprecated) "explicit
2039  // roles" feature. If this is `None`, any role is allowed.
2040  Option<hashset<std::string>> roleWhitelist;
2041 
2042  // Configured weight for each role, if any. If a role does not
2043  // appear here, it has the default weight of 1.
2045 
2046  // Configured quota for each role, if any. We store quotas by role
2047  // because we set them at the role level.
2049 
2050  // Authenticator names as supplied via flags.
2051  std::vector<std::string> authenticatorNames;
2052 
2053  Option<Authenticator*> authenticator;
2054 
2055  // Frameworks/slaves that are currently in the process of authentication.
2056  // 'authenticating' future is completed when authenticator
2057  // completes authentication.
2058  // The future is removed from the map when master completes authentication.
2060 
2061  // Principals of authenticated frameworks/slaves keyed by PID.
2063 
2064  int64_t nextFrameworkId; // Used to give each framework a unique ID.
2065  int64_t nextOfferId; // Used to give each slot offer a unique ID.
2066  int64_t nextSlaveId; // Used to give each slave a unique ID.
2067 
2068  // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
2069  // thread safe.
2070  // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains
2071  // copyable metric types only.
2072  std::shared_ptr<Metrics> metrics;
2073 
2074  // Gauge handlers.
2075  double _uptime_secs()
2076  {
2077  return (process::Clock::now() - startTime).secs();
2078  }
2079 
2080  double _elected()
2081  {
2082  return elected() ? 1 : 0;
2083  }
2084 
2085  double _slaves_connected();
2086  double _slaves_disconnected();
2087  double _slaves_active();
2088  double _slaves_inactive();
2089  double _slaves_unreachable();
2090 
2091  double _frameworks_connected();
2092  double _frameworks_disconnected();
2093  double _frameworks_active();
2094  double _frameworks_inactive();
2095 
2096  double _outstanding_offers()
2097  {
2098  return static_cast<double>(offers.size());
2099  }
2100 
2101  double _event_queue_messages()
2102  {
2103  return static_cast<double>(eventCount<process::MessageEvent>());
2104  }
2105 
2106  double _event_queue_dispatches()
2107  {
2108  return static_cast<double>(eventCount<process::DispatchEvent>());
2109  }
2110 
2111  double _event_queue_http_requests()
2112  {
2113  return static_cast<double>(eventCount<process::HttpEvent>());
2114  }
2115 
2116  double _tasks_staging();
2117  double _tasks_starting();
2118  double _tasks_running();
2119  double _tasks_unreachable();
2120  double _tasks_killing();
2121 
2122  double _resources_total(const std::string& name);
2123  double _resources_used(const std::string& name);
2124  double _resources_percent(const std::string& name);
2125 
2126  double _resources_revocable_total(const std::string& name);
2127  double _resources_revocable_used(const std::string& name);
2128  double _resources_revocable_percent(const std::string& name);
2129 
2130  process::Time startTime; // Start time used to calculate uptime.
2131 
2132  Option<process::Time> electedTime; // Time when this master is elected.
2133 
2134  // Validates the framework including authorization.
2135  // Returns None if the framework is valid.
2136  // Returns Error if the framework is invalid.
2137  // Returns Failure if authorization returns 'Failure'.
2139  const FrameworkInfo& frameworkInfo,
2140  const process::UPID& from);
2141 };
2142 
2143 
2144 inline std::ostream& operator<<(
2145  std::ostream& stream,
2146  const Framework& framework);
2147 
2148 
2149 // TODO(bmahler): Keeping the task and executor information in sync
2150 // across the Slave and Framework structs is error prone!
2152 {
2153  enum State
2154  {
2155  // Framework has never connected to this master. This implies the
2156  // master failed over and the framework has not yet re-registered,
2157  // but some framework state has been recovered from re-registering
2158  // agents that are running tasks for the framework.
2160 
2161  // Framework was previously connected to this master. A framework
2162  // becomes disconnected when there is a socket error.
2164 
2165  // The framework is connected but not active.
2167 
2168  // Framework is connected and eligible to receive offers. No
2169  // offers will be made to frameworks that are not active.
2171  };
2172 
2174  const Flags& masterFlags,
2175  const FrameworkInfo& info,
2176  const process::UPID& _pid,
2178  : Framework(master, masterFlags, info, ACTIVE, time)
2179  {
2180  pid = _pid;
2181  }
2182 
2183  Framework(Master* const master,
2184  const Flags& masterFlags,
2185  const FrameworkInfo& info,
2186  const HttpConnection& _http,
2188  : Framework(master, masterFlags, info, ACTIVE, time)
2189  {
2190  http = _http;
2191  }
2192 
2193  Framework(Master* const master,
2194  const Flags& masterFlags,
2195  const FrameworkInfo& info)
2196  : Framework(master, masterFlags, info, RECOVERED, process::Time()) {}
2197 
2199  {
2200  if (http.isSome()) {
2202  }
2203  }
2204 
2205  Task* getTask(const TaskID& taskId)
2206  {
2207  if (tasks.count(taskId) > 0) {
2208  return tasks[taskId];
2209  }
2210 
2211  return nullptr;
2212  }
2213 
2214  void addTask(Task* task)
2215  {
2216  CHECK(!tasks.contains(task->task_id()))
2217  << "Duplicate task " << task->task_id()
2218  << " of framework " << task->framework_id();
2219 
2220  // Verify that Resource.AllocationInfo is set,
2221  // this should be guaranteed by the master.
2222  foreach (const Resource& resource, task->resources()) {
2223  CHECK(resource.has_allocation_info());
2224  }
2225 
2226  tasks[task->task_id()] = task;
2227 
2228  // Unreachable tasks should be added via `addUnreachableTask`.
2229  CHECK(task->state() != TASK_UNREACHABLE)
2230  << "Task '" << task->task_id() << "' of framework " << id()
2231  << " added in TASK_UNREACHABLE state";
2232 
2233  // Since we track terminal but unacknowledged tasks within
2234  // `tasks` rather than `completedTasks`, we need to handle
2235  // them here: don't count them as consuming resources.
2236  //
2237  // TODO(bmahler): Users currently get confused because
2238  // terminal tasks can show up as "active" tasks in the UI and
2239  // endpoints. Ideally, we show the terminal unacknowledged
2240  // tasks as "completed" as well.
2241  if (!protobuf::isTerminalState(task->state())) {
2242  // Note that we explicitly convert from protobuf to `Resources` once
2243  // and then use the result for calculations to avoid performance penalty
2244  // for multiple conversions and validations implied by `+=` with protobuf
2245  // arguments.
2246  // Conversion is safe, as resources have already passed validation.
2247  const Resources resources = task->resources();
2248  totalUsedResources += resources;
2249  usedResources[task->slave_id()] += resources;
2250 
2251  // It's possible that we're not tracking the task's role for
2252  // this framework if the role is absent from the framework's
2253  // set of roles. In this case, we track the role's allocation
2254  // for this framework.
2255  CHECK(!task->resources().empty());
2256  const std::string& role =
2257  task->resources().begin()->allocation_info().role();
2258 
2259  if (!isTrackedUnderRole(role)) {
2260  trackUnderRole(role);
2261  }
2262  }
2263  }
2264 
2265  // Update framework to recover the resources that were previously
2266  // being used by `task`.
2267  //
2268  // TODO(bmahler): This is a hack for performance. We need to
2269  // maintain resource counters because computing task resources
2270  // functionally for all tasks is expensive, for now.
2271  void recoverResources(Task* task)
2272  {
2273  CHECK(tasks.contains(task->task_id()))
2274  << "Unknown task " << task->task_id()
2275  << " of framework " << task->framework_id();
2276 
2277  totalUsedResources -= task->resources();
2278  usedResources[task->slave_id()] -= task->resources();
2279  if (usedResources[task->slave_id()].empty()) {
2280  usedResources.erase(task->slave_id());
2281  }
2282 
2283  // If we are no longer subscribed to the role to which these resources are
2284  // being returned to, and we have no more resources allocated to us for that
2285  // role, stop tracking the framework under the role.
2286  CHECK(!task->resources().empty());
2287  const std::string& role =
2288  task->resources().begin()->allocation_info().role();
2289 
2290  auto allocatedToRole = [&role](const Resource& resource) {
2291  return resource.allocation_info().role() == role;
2292  };
2293 
2294  if (roles.count(role) == 0 &&
2295  totalUsedResources.filter(allocatedToRole).empty()) {
2296  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2297  untrackUnderRole(role);
2298  }
2299  }
2300 
2301  // Sends a message to the connected framework.
2302  template <typename Message>
2303  void send(const Message& message)
2304  {
2305  if (!connected()) {
2306  LOG(WARNING) << "Master attempted to send message to disconnected"
2307  << " framework " << *this;
2308  }
2309 
2310  if (http.isSome()) {
2311  if (!http.get().send(message)) {
2312  LOG(WARNING) << "Unable to send event to framework " << *this << ":"
2313  << " connection closed";
2314  }
2315  } else {
2316  CHECK_SOME(pid);
2317  master->send(pid.get(), message);
2318  }
2319  }
2320 
2321  void addCompletedTask(Task&& task)
2322  {
2323  // TODO(neilc): We currently allow frameworks to reuse the task
2324  // IDs of completed tasks (although this is discouraged). This
2325  // means that there might be multiple completed tasks with the
2326  // same task ID. We should consider rejecting attempts to reuse
2327  // task IDs (MESOS-6779).
2328  completedTasks.push_back(process::Owned<Task>(new Task(std::move(task))));
2329  }
2330 
2331  void addUnreachableTask(const Task& task)
2332  {
2333  // TODO(adam-mesos): Check if unreachable task already exists.
2334  unreachableTasks.set(task.task_id(), process::Owned<Task>(new Task(task)));
2335  }
2336 
2337  // Removes the task. `unreachable` indicates whether the task is removed due
2338  // to being unreachable. Note that we cannot rely on the task state because
2339  // it may not reflect unreachability due to being set to TASK_LOST for
2340  // backwards compatibility.
2341  void removeTask(Task* task, bool unreachable)
2342  {
2343  CHECK(tasks.contains(task->task_id()))
2344  << "Unknown task " << task->task_id()
2345  << " of framework " << task->framework_id();
2346 
2347  // The invariant here is that the master will have already called
2348  // `recoverResources()` prior to removing terminal or unreachable tasks.
2349  if (!protobuf::isTerminalState(task->state()) &&
2350  task->state() != TASK_UNREACHABLE) {
2351  recoverResources(task);
2352  }
2353 
2354  if (unreachable) {
2355  addUnreachableTask(*task);
2356  } else {
2357  CHECK(task->state() != TASK_UNREACHABLE);
2358 
2359  // TODO(bmahler): This moves a potentially non-terminal task into
2360  // the completed list!
2361  addCompletedTask(Task(*task));
2362  }
2363 
2364  tasks.erase(task->task_id());
2365  }
2366 
2367  void addOffer(Offer* offer)
2368  {
2369  CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
2370  offers.insert(offer);
2371  totalOfferedResources += offer->resources();
2372  offeredResources[offer->slave_id()] += offer->resources();
2373  }
2374 
2375  void removeOffer(Offer* offer)
2376  {
2377  CHECK(offers.find(offer) != offers.end())
2378  << "Unknown offer " << offer->id();
2379 
2380  totalOfferedResources -= offer->resources();
2381  offeredResources[offer->slave_id()] -= offer->resources();
2382  if (offeredResources[offer->slave_id()].empty()) {
2383  offeredResources.erase(offer->slave_id());
2384  }
2385 
2386  offers.erase(offer);
2387  }
2388 
2389  void addInverseOffer(InverseOffer* inverseOffer)
2390  {
2391  CHECK(!inverseOffers.contains(inverseOffer))
2392  << "Duplicate inverse offer " << inverseOffer->id();
2393  inverseOffers.insert(inverseOffer);
2394  }
2395 
2396  void removeInverseOffer(InverseOffer* inverseOffer)
2397  {
2398  CHECK(inverseOffers.contains(inverseOffer))
2399  << "Unknown inverse offer " << inverseOffer->id();
2400 
2401  inverseOffers.erase(inverseOffer);
2402  }
2403 
2404  bool hasExecutor(const SlaveID& slaveId,
2405  const ExecutorID& executorId)
2406  {
2407  return executors.contains(slaveId) &&
2408  executors[slaveId].contains(executorId);
2409  }
2410 
2411  void addExecutor(const SlaveID& slaveId,
2412  const ExecutorInfo& executorInfo)
2413  {
2414  CHECK(!hasExecutor(slaveId, executorInfo.executor_id()))
2415  << "Duplicate executor '" << executorInfo.executor_id()
2416  << "' on agent " << slaveId;
2417 
2418  // Verify that Resource.AllocationInfo is set,
2419  // this should be guaranteed by the master.
2420  foreach (const Resource& resource, executorInfo.resources()) {
2421  CHECK(resource.has_allocation_info());
2422  }
2423 
2424  executors[slaveId][executorInfo.executor_id()] = executorInfo;
2425  totalUsedResources += executorInfo.resources();
2426  usedResources[slaveId] += executorInfo.resources();
2427 
2428  // It's possible that we're not tracking the task's role for
2429  // this framework if the role is absent from the framework's
2430  // set of roles. In this case, we track the role's allocation
2431  // for this framework.
2432  if (!executorInfo.resources().empty()) {
2433  const std::string& role =
2434  executorInfo.resources().begin()->allocation_info().role();
2435 
2436  if (!isTrackedUnderRole(role)) {
2437  trackUnderRole(role);
2438  }
2439  }
2440  }
2441 
2442  void removeExecutor(const SlaveID& slaveId,
2443  const ExecutorID& executorId)
2444  {
2445  CHECK(hasExecutor(slaveId, executorId))
2446  << "Unknown executor '" << executorId
2447  << "' of framework " << id()
2448  << " of agent " << slaveId;
2449 
2450  const ExecutorInfo& executorInfo = executors[slaveId][executorId];
2451 
2452  totalUsedResources -= executorInfo.resources();
2453  usedResources[slaveId] -= executorInfo.resources();
2454  if (usedResources[slaveId].empty()) {
2455  usedResources.erase(slaveId);
2456  }
2457 
2458  // If we are no longer subscribed to the role to which these resources are
2459  // being returned to, and we have no more resources allocated to us for that
2460  // role, stop tracking the framework under the role.
2461  if (!executorInfo.resources().empty()) {
2462  const std::string& role =
2463  executorInfo.resources().begin()->allocation_info().role();
2464 
2465  auto allocatedToRole = [&role](const Resource& resource) {
2466  return resource.allocation_info().role() == role;
2467  };
2468 
2469  if (roles.count(role) == 0 &&
2470  totalUsedResources.filter(allocatedToRole).empty()) {
2471  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2472  untrackUnderRole(role);
2473  }
2474  }
2475 
2476  executors[slaveId].erase(executorId);
2477  if (executors[slaveId].empty()) {
2478  executors.erase(slaveId);
2479  }
2480  }
2481 
2482  void addOperation(Operation* operation)
2483  {
2484  CHECK(operation->has_framework_id());
2485 
2486  const FrameworkID& frameworkId = operation->framework_id();
2487 
2488  Try<id::UUID> uuid = id::UUID::fromBytes(operation->uuid().value());
2489  CHECK_SOME(uuid);
2490 
2491  CHECK(!operations.contains(uuid.get()))
2492  << "Duplicate operation '" << operation->info().id()
2493  << "' (uuid: " << uuid->toString() << ") "
2494  << "of framework " << frameworkId;
2495 
2496  operations.put(uuid.get(), operation);
2497 
2498  if (operation->info().has_id()) {
2499  operationUUIDs.put(operation->info().id(), uuid.get());
2500  }
2501 
2502  if (!protobuf::isSpeculativeOperation(operation->info()) &&
2503  !protobuf::isTerminalState(operation->latest_status().state())) {
2504  Try<Resources> consumed =
2505  protobuf::getConsumedResources(operation->info());
2506  CHECK_SOME(consumed);
2507 
2508  CHECK(operation->has_slave_id())
2509  << "External resource provider is not supported yet";
2510 
2511  const SlaveID& slaveId = operation->slave_id();
2512 
2513  totalUsedResources += consumed.get();
2514  usedResources[slaveId] += consumed.get();
2515 
2516  // It's possible that we're not tracking the role from the
2517  // resources in the operation for this framework if the role is
2518  // absent from the framework's set of roles. In this case, we
2519  // track the role's allocation for this framework.
2520  foreachkey (const std::string& role, consumed->allocations()) {
2521  if (!isTrackedUnderRole(role)) {
2522  trackUnderRole(role);
2523  }
2524  }
2525  }
2526  }
2527 
2528  void recoverResources(Operation* operation)
2529  {
2530  CHECK(operation->has_slave_id())
2531  << "External resource provider is not supported yet";
2532 
2533  const SlaveID& slaveId = operation->slave_id();
2534 
2535  if (protobuf::isSpeculativeOperation(operation->info())) {
2536  return;
2537  }
2538 
2539  Try<Resources> consumed = protobuf::getConsumedResources(operation->info());
2540  CHECK_SOME(consumed);
2541 
2542  CHECK(totalUsedResources.contains(consumed.get()))
2543  << "Tried to recover resources " << consumed.get()
2544  << " which do not seem used";
2545 
2546  CHECK(usedResources[slaveId].contains(consumed.get()))
2547  << "Tried to recover resources " << consumed.get() << " of agent "
2548  << slaveId << " which do not seem used";
2549 
2550  totalUsedResources -= consumed.get();
2551  usedResources[slaveId] -= consumed.get();
2552  if (usedResources[slaveId].empty()) {
2553  usedResources.erase(slaveId);
2554  }
2555 
2556  // If we are no longer subscribed to the role to which these
2557  // resources are being returned to, and we have no more resources
2558  // allocated to us for that role, stop tracking the framework
2559  // under the role.
2560  foreachkey (const std::string& role, consumed->allocations()) {
2561  auto allocatedToRole = [&role](const Resource& resource) {
2562  return resource.allocation_info().role() == role;
2563  };
2564 
2565  if (roles.count(role) == 0 &&
2566  totalUsedResources.filter(allocatedToRole).empty()) {
2567  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2568  untrackUnderRole(role);
2569  }
2570  }
2571  }
2572 
2573  void removeOperation(Operation* operation)
2574  {
2575  Try<id::UUID> uuid = id::UUID::fromBytes(operation->uuid().value());
2576  CHECK_SOME(uuid);
2577 
2578  CHECK(operations.contains(uuid.get()))
2579  << "Unknown operation '" << operation->info().id()
2580  << "' (uuid: " << uuid->toString() << ") "
2581  << "of framework " << operation->framework_id();
2582 
2583  if (!protobuf::isSpeculativeOperation(operation->info()) &&
2584  !protobuf::isTerminalState(operation->latest_status().state())) {
2585  recoverResources(operation);
2586  }
2587 
2588  operations.erase(uuid.get());
2589  }
2590 
2591  const FrameworkID id() const { return info.id(); }
2592 
2593  // Update fields in 'info' using those in 'newInfo'. Currently this
2594  // only updates `role`/`roles`, 'name', 'failover_timeout', 'hostname',
2595  // 'webui_url', 'capabilities', and 'labels'.
2596  void update(const FrameworkInfo& newInfo)
2597  {
2598  // We only merge 'info' from the same framework 'id'.
2599  CHECK_EQ(info.id(), newInfo.id());
2600 
2601  // Save the old list of roles for later.
2602  std::set<std::string> oldRoles = roles;
2603 
2604  // TODO(jmlvanre): Merge other fields as per design doc in
2605  // MESOS-703.
2606 
2607  info.clear_role();
2608  info.clear_roles();
2609 
2610  if (newInfo.has_role()) {
2611  info.set_role(newInfo.role());
2612  }
2613 
2614  if (newInfo.roles_size() > 0) {
2615  info.mutable_roles()->CopyFrom(newInfo.roles());
2616  }
2617 
2619 
2620  if (newInfo.user() != info.user()) {
2621  LOG(WARNING) << "Cannot update FrameworkInfo.user to '" << newInfo.user()
2622  << "' for framework " << id() << ". Check MESOS-703";
2623  }
2624 
2625  info.set_name(newInfo.name());
2626 
2627  if (newInfo.has_failover_timeout()) {
2628  info.set_failover_timeout(newInfo.failover_timeout());
2629  } else {
2630  info.clear_failover_timeout();
2631  }
2632 
2633  if (newInfo.checkpoint() != info.checkpoint()) {
2634  LOG(WARNING) << "Cannot update FrameworkInfo.checkpoint to '"
2635  << stringify(newInfo.checkpoint()) << "' for framework "
2636  << id() << ". Check MESOS-703";
2637  }
2638 
2639  if (newInfo.has_hostname()) {
2640  info.set_hostname(newInfo.hostname());
2641  } else {
2642  info.clear_hostname();
2643  }
2644 
2645  if (newInfo.principal() != info.principal()) {
2646  LOG(WARNING) << "Cannot update FrameworkInfo.principal to '"
2647  << newInfo.principal() << "' for framework " << id()
2648  << ". Check MESOS-703";
2649  }
2650 
2651  if (newInfo.has_webui_url()) {
2652  info.set_webui_url(newInfo.webui_url());
2653  } else {
2654  info.clear_webui_url();
2655  }
2656 
2657  if (newInfo.capabilities_size() > 0) {
2658  info.mutable_capabilities()->CopyFrom(newInfo.capabilities());
2659  } else {
2660  info.clear_capabilities();
2661  }
2662  capabilities = protobuf::framework::Capabilities(info.capabilities());
2663 
2664  if (newInfo.has_labels()) {
2665  info.mutable_labels()->CopyFrom(newInfo.labels());
2666  } else {
2667  info.clear_labels();
2668  }
2669 
2670  const std::set<std::string>& newRoles = roles;
2671 
2672  const std::set<std::string> removedRoles = [&]() {
2673  std::set<std::string> result = oldRoles;
2674  foreach (const std::string& role, newRoles) {
2675  result.erase(role);
2676  }
2677  return result;
2678  }();
2679 
2680  foreach (const std::string& role, removedRoles) {
2681  auto allocatedToRole = [&role](const Resource& resource) {
2682  return resource.allocation_info().role() == role;
2683  };
2684 
2685  // Stop tracking the framework under this role if there are
2686  // no longer any resources allocated to it.
2687  if (totalUsedResources.filter(allocatedToRole).empty()) {
2688  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2689  untrackUnderRole(role);
2690  }
2691  }
2692 
2693  const std::set<std::string> addedRoles = [&]() {
2694  std::set<std::string> result = newRoles;
2695  foreach (const std::string& role, oldRoles) {
2696  result.erase(role);
2697  }
2698  return result;
2699  }();
2700 
2701  foreach (const std::string& role, addedRoles) {
2702  // NOTE: It's possible that we're already tracking this framework
2703  // under the role because a framework can unsubscribe from a role
2704  // while it still has resources allocated to the role.
2705  if (!isTrackedUnderRole(role)) {
2706  trackUnderRole(role);
2707  }
2708  }
2709  }
2710 
2711  void updateConnection(const process::UPID& newPid)
2712  {
2713  // Cleanup the HTTP connnection if this is a downgrade from HTTP
2714  // to PID. Note that the connection may already be closed.
2715  if (http.isSome()) {
2717  }
2718 
2719  // TODO(benh): unlink(oldPid);
2720  pid = newPid;
2721  }
2722 
2723  void updateConnection(const HttpConnection& newHttp)
2724  {
2725  if (pid.isSome()) {
2726  // Wipe the PID if this is an upgrade from PID to HTTP.
2727  // TODO(benh): unlink(oldPid);
2728  pid = None();
2729  } else if (http.isSome()) {
2730  // Cleanup the old HTTP connection.
2731  // Note that master creates a new HTTP connection for every
2732  // subscribe request, so 'newHttp' should always be different
2733  // from 'http'.
2735  }
2736 
2737  CHECK_NONE(http);
2738 
2739  http = newHttp;
2740  }
2741 
2742  // Closes the HTTP connection and stops the heartbeat.
2743  //
2744  // TODO(vinod): Currently `state` variable is set separately
2745  // from this method. We need to make sure these are in sync.
2747  {
2748  CHECK_SOME(http);
2749 
2750  if (connected() && !http.get().close()) {
2751  LOG(WARNING) << "Failed to close HTTP pipe for " << *this;
2752  }
2753 
2754  http = None();
2755 
2757 
2758  terminate(heartbeater.get().get());
2759  wait(heartbeater.get().get());
2760 
2761  heartbeater = None();
2762  }
2763 
2764  void heartbeat()
2765  {
2767  CHECK_SOME(http);
2768 
2769  // TODO(vinod): Make heartbeat interval configurable and include
2770  // this information in the SUBSCRIBED response.
2771  scheduler::Event event;
2772  event.set_type(scheduler::Event::HEARTBEAT);
2773 
2774  heartbeater =
2776  "framework " + stringify(info.id()),
2777  event,
2778  http.get(),
2780 
2781  process::spawn(heartbeater.get().get());
2782  }
2783 
2784  bool active() const { return state == ACTIVE; }
2785  bool connected() const { return state == ACTIVE || state == INACTIVE; }
2786  bool recovered() const { return state == RECOVERED; }
2787 
2788  bool isTrackedUnderRole(const std::string& role) const;
2789  void trackUnderRole(const std::string& role);
2790  void untrackUnderRole(const std::string& role);
2791 
2792  Master* const master;
2793 
2794  FrameworkInfo info;
2795 
2796  std::set<std::string> roles;
2797 
2799 
2800  // Frameworks can either be connected via HTTP or by message passing
2801  // (scheduler driver). At most one of `http` and `pid` will be set
2802  // according to the last connection made by the framework; neither
2803  // field will be set if the framework is in state `RECOVERED`.
2806 
2808 
2812 
2813  // Tasks that have not yet been launched because they are currently
2814  // being authorized.
2816 
2817  // TODO(bmahler): Make this private to enforce that `addTask()` and
2818  // `removeTask()` are used, and provide a const view into the tasks.
2820 
2821  // Tasks launched by this framework that have reached a terminal
2822  // state and have had all their updates acknowledged. We only keep a
2823  // fixed-size cache to avoid consuming too much memory. We use
2824  // boost::circular_buffer rather than BoundedHashMap because there
2825  // can be multiple completed tasks with the same task ID.
2826  boost::circular_buffer<process::Owned<Task>> completedTasks;
2827 
2828  // When an agent is marked unreachable, tasks running on it are stored
2829  // here. We only keep a fixed-size cache to avoid consuming too much memory.
2830  // NOTE: Non-partition-aware unreachable tasks in this map are marked
2831  // TASK_LOST instead of TASK_UNREACHABLE for backward compatibility.
2833 
2834  hashset<Offer*> offers; // Active offers for framework.
2835 
2836  hashset<InverseOffer*> inverseOffers; // Active inverse offers for framework.
2837 
2838  // TODO(bmahler): Make this private to enforce that `addExecutor()`
2839  // and `removeExecutor()` are used, and provide a const view into
2840  // the executors.
2842 
2843  // Pending operations or terminal operations that have
2844  // unacknowledged status updates.
2846 
2847  // The map from the framework-specified operation ID to the
2848  // corresponding internal operation UUID.
2850 
2851  // NOTE: For the used and offered resources below, we keep the
2852  // total as well as partitioned by SlaveID.
2853  // We expose the total resources via the HTTP endpoint, and we
2854  // keep a running total of the resources because looping over the
2855  // slaves to sum the resources has led to perf issues (MESOS-1862).
2856  // We keep the resources partitioned by SlaveID because non-scalar
2857  // resources can be lost when summing them up across multiple
2858  // slaves (MESOS-2373).
2859  //
2860  // Also note that keeping the totals is safe even though it yields
2861  // incorrect results for non-scalar resources.
2862  // (1) For overlapping set items / ranges across slaves, these
2863  // will get added N times but only represented once.
2864  // (2) When an initial subtraction occurs (N-1), the resource is
2865  // no longer represented. (This is the source of the bug).
2866  // (3) When any further subtractions occur (N-(1+M)), the
2867  // Resources simply ignores the subtraction since there's
2868  // nothing to remove, so this is safe for now.
2869 
2870  // TODO(mpark): Strip the non-scalar resources out of the totals
2871  // in order to avoid reporting incorrect statistics (MESOS-2623).
2872 
2873  // Active task / executor / operation resources.
2875 
2876  // Note that we maintain multiple copies of each shared resource in
2877  // `usedResources` as they are used by multiple tasks.
2879 
2880  // Offered resources.
2883 
2884  // This is only set for HTTP frameworks.
2887 
2888 private:
2889  Framework(Master* const _master,
2890  const Flags& masterFlags,
2891  const FrameworkInfo& _info,
2892  State state,
2893  const process::Time& time)
2894  : master(_master),
2895  info(_info),
2896  roles(protobuf::framework::getRoles(_info)),
2897  capabilities(_info.capabilities()),
2898  state(state),
2899  registeredTime(time),
2900  reregisteredTime(time),
2901  completedTasks(masterFlags.max_completed_tasks_per_framework),
2902  unreachableTasks(masterFlags.max_unreachable_tasks_per_framework)
2903  {
2904  foreach (const std::string& role, roles) {
2905  // NOTE: It's possible that we're already being tracked under the role
2906  // because a framework can unsubscribe from a role while it still has
2907  // resources allocated to the role.
2908  if (!isTrackedUnderRole(role)) {
2909  trackUnderRole(role);
2910  }
2911  }
2912  }
2913 
2914  Framework(const Framework&); // No copying.
2915  Framework& operator=(const Framework&); // No assigning.
2916 };
2917 
2918 
2919 inline std::ostream& operator<<(
2920  std::ostream& stream,
2921  const Framework& framework)
2922 {
2923  // TODO(vinod): Also log the hostname once FrameworkInfo is properly
2924  // updated on framework failover (MESOS-1784).
2925  stream << framework.id() << " (" << framework.info.name() << ")";
2926 
2927  if (framework.pid.isSome()) {
2928  stream << " at " << framework.pid.get();
2929  }
2930 
2931  return stream;
2932 }
2933 
2934 
2935 // Information about an active role.
2936 struct Role
2937 {
2938  Role() = delete;
2939 
2940  Role(const std::string& _role) : role(_role) {}
2941 
2942  void addFramework(Framework* framework)
2943  {
2944  frameworks[framework->id()] = framework;
2945  }
2946 
2947  void removeFramework(Framework* framework)
2948  {
2949  frameworks.erase(framework->id());
2950  }
2951 
2953  {
2954  Resources resources;
2955 
2956  auto allocatedTo = [](const std::string& role) {
2957  return [role](const Resource& resource) {
2958  CHECK(resource.has_allocation_info());
2959  return resource.allocation_info().role() == role;
2960  };
2961  };
2962 
2963  foreachvalue (Framework* framework, frameworks) {
2964  resources += framework->totalUsedResources.filter(allocatedTo(role));
2965  resources += framework->totalOfferedResources.filter(allocatedTo(role));
2966  }
2967 
2968  return resources;
2969  }
2970 
2971  const std::string role;
2972 
2973  // NOTE: The dynamic role/quota relation is stored in and administrated
2974  // by the master. There is no direct representation of quota information
2975  // here to avoid duplication and to support that an operator can associate
2976  // quota with a role before the role is created. Such ordering of operator
2977  // requests prevents a race of premature unbounded allocation that setting
2978  // quota first is intended to contain.
2979 
2981 };
2982 
2983 } // namespace master {
2984 } // namespace internal {
2985 } // namespace mesos {
2986 
2987 #endif // __MASTER_HPP__
void _consume(process::MessageEvent &&event)
void recoverFramework(const FrameworkInfo &info, const std::set< std::string > &suppressedRoles)
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
Try< Resources > getConsumedResources(const Offer::Operation &operation)
bool send(const Message &message)
Definition: master.hpp:311
Framework(Master *const master, const Flags &masterFlags, const FrameworkInfo &info, const process::UPID &_pid, const process::Time &time=process::Clock::now())
Definition: master.hpp:2173
void recoverResources(Operation *operation)
Definition: master.hpp:2528
Definition: nothing.hpp:16
hashmap< TaskID, Task * > tasks
Definition: master.hpp:2819
Definition: master.hpp:2936
Master *const master
Definition: master.hpp:2792
void removeOperation(Operation *operation)
ContentType
Definition: http.hpp:43
void removeExecutor(const FrameworkID &frameworkId, const ExecutorID &executorId)
Try< bool > update(const std::string &link, const Handle &parent, uint16_t protocol, const action::Mirror &mirror)
void _removeSlave(Slave *slave, const process::Future< bool > &registrarResult, const std::string &removalCause, Option< process::metrics::Counter > reason=None())
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:100
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
Subscriber(const HttpConnection &_http, const Option< process::http::authentication::Principal > _principal)
Definition: master.hpp:1956
ProcessBase(const std::string &id="")
void finalize() override
Invoked when a process is terminated.
void launchTasks(const process::UPID &from, const FrameworkID &frameworkId, const std::vector< TaskInfo > &tasks, const Filters &filters, const std::vector< OfferID > &offerIds)
SlaveInfo info
Definition: master.hpp:188
bool connected() const
Definition: master.hpp:2785
void removeSlave(Slave *slave, const std::string &message, Option< process::metrics::Counter > reason=None())
~Framework()
Definition: master.hpp:2198
Role(const std::string &_role)
Definition: master.hpp:2940
Definition: try.hpp:34
Slave(Master *const _master, SlaveInfo _info, const process::UPID &_pid, const MachineID &_machineId, const std::string &_version, std::vector< SlaveInfo::Capability > _capabilites, const process::Time &_registeredTime, std::vector< Resource > _checkpointedResources, const Option< id::UUID > &resourceVersion, std::vector< ExecutorInfo > executorInfos=std::vector< ExecutorInfo >(), std::vector< Task > tasks=std::vector< Task >())
const SlaveID id
Definition: master.hpp:187
void exceededCapacity(const process::MessageEvent &event, const Option< std::string > &principal, uint64_t capacity)
hashset< Offer * > offers
Definition: master.hpp:251
process::Future< Nothing > recover()
friend struct Subscriber
Definition: master.hpp:1732
Option< process::Timer > reregistrationTimer
Definition: master.hpp:217
bool connected
Definition: master.hpp:204
#define CHECK_NONE(expression)
Definition: check.hpp:48
void updateSlave(UpdateSlaveMessage &&message)
Task * getTask(const TaskID &taskId)
Definition: master.hpp:2205
Resources totalResources
Definition: master.hpp:277
void addFramework(Framework *framework, const std::set< std::string > &suppressedRoles)
constexpr Duration DEFAULT_HEARTBEAT_INTERVAL
Definition: constants.hpp:52
Definition: protobuf_utils.hpp:251
hashmap< SlaveID, Resources > offeredResources
Definition: master.hpp:2882
protobuf::framework::Capabilities capabilities
Definition: master.hpp:2798
bool hasExecutor(const SlaveID &slaveId, const ExecutorID &executorId)
Definition: master.hpp:2404
std::set< std::string > getRoles(const FrameworkInfo &frameworkInfo)
void deactivateFramework(const process::UPID &from, const FrameworkID &frameworkId)
Try< Nothing > activateRecoveredFramework(Framework *framework, const FrameworkInfo &frameworkInfo, const Option< process::UPID > &pid, const Option< HttpConnection > &http, const std::set< std::string > &suppressedRoles)
Resources filter(const lambda::function< bool(const Resource &)> &predicate) const
v1::AgentID evolve(const SlaveID &slaveId)
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
InverseOffer * getInverseOffer(Master *master, const OfferID &offerId)
Try< Nothing > machines(const google::protobuf::RepeatedPtrField< MachineID > &ids)
Performs the following checks on a list of machines:
void addUnreachableTask(const Task &task)
Definition: master.hpp:2331
process::Future< bool > authorizeTask(const TaskInfo &task, Framework *framework)
void offer(const FrameworkID &frameworkId, const hashmap< std::string, hashmap< SlaveID, Resources >> &resources)
mesos::v1::scheduler::Call Call
Definition: mesos.hpp:2583
Definition: resources.hpp:79
Resources totalUsedResources
Definition: master.hpp:2874
void exited(const process::UPID &pid) override
Invoked when a linked process has exited.
Slave * getSlave(Master *master, const SlaveID &slaveId)
void _registerSlave(const process::UPID &pid, RegisterSlaveMessage &&registerSlaveMessage, const Option< process::http::authentication::Principal > &principal, const process::Future< bool > &authorized)
void removeExecutor(const SlaveID &slaveId, const ExecutorID &executorId)
Definition: master.hpp:2442
Option< HttpConnection > http
Definition: master.hpp:2804
bool contains(const Elem &elem) const
Definition: hashset.hpp:102
void unregisterFramework(const process::UPID &from, const FrameworkID &frameworkId)
void update(const FrameworkInfo &newInfo)
Definition: master.hpp:2596
Definition: flags.hpp:42
Definition: registrar.hpp:91
void addFramework(Framework *framework)
Definition: master.hpp:2942
void executorMessage(const process::UPID &from, const SlaveID &slaveId, const FrameworkID &frameworkId, const ExecutorID &executorId, const std::string &data)
Definition: files.hpp:73
void unregisterSlave(const process::UPID &from, const SlaveID &slaveId)
void addCompletedTask(Task &&task)
Definition: master.hpp:2321
void addOperation(Operation *operation)
Operation
Definition: cgroups.hpp:441
Future< Nothing > redirect(int_fd from, Option< int_fd > to, size_t chunk=4096, const std::vector< lambda::function< void(const std::string &)>> &hooks={})
Redirect output from the &#39;from&#39; file descriptor to the &#39;to&#39; file descriptor (or /dev/null if &#39;to&#39; is ...
void updateConnection(const HttpConnection &newHttp)
Definition: master.hpp:2723
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
void __reregisterSlave(const process::UPID &pid, ReregisterSlaveMessage &&incomingMessage, const process::Future< bool > &readmit)
Framework(Master *const master, const Flags &masterFlags, const FrameworkInfo &info)
Definition: master.hpp:2193
Definition: duration.hpp:32
void removeOperation(Operation *operation)
Definition: master.hpp:2573
void killTask(const process::UPID &from, const FrameworkID &frameworkId, const TaskID &taskId)
void resourceRequest(const process::UPID &from, const FrameworkID &frameworkId, const std::vector< Request > &requests)
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
bool isPending() const
Definition: future.hpp:1224
Try< Nothing > update(const SlaveInfo &info, const std::string &_version, const std::vector< SlaveInfo::Capability > &_capabilites, const Resources &_checkpointedResources, const Option< id::UUID > &resourceVersion)
void lostCandidacy(const process::Future< Nothing > &lost)
void heartbeat()
Definition: master.hpp:2764
std::ostream & operator<<(std::ostream &stream, const Slave &slave)
Definition: master.hpp:290
void send(const process::UPID &to, const google::protobuf::Message &message)
Definition: protobuf.hpp:118
void removeInverseOffer(InverseOffer *inverseOffer, bool rescind=false)
void _exited(Framework *framework)
bool isSome() const
Definition: option.hpp:115
Definition: event.hpp:214
process::Future< Nothing > closed() const
Definition: master.hpp:324
Definition: http.hpp:518
Definition: json.hpp:154
void trackUnderRole(const std::string &role)
hashmap< FrameworkID, hashmap< TaskID, Task * > > tasks
Definition: master.hpp:240
void updateConnection(const process::UPID &newPid)
Definition: master.hpp:2711
void removeOffer(Offer *offer)
Definition: master.hpp:2375
void detected(const process::Future< Option< MasterInfo >> &_leader)
multihashmap< FrameworkID, TaskID > killedTasks
Definition: master.hpp:244
mesos::v1::scheduler::Event Event
Definition: mesos.hpp:2584
void __registerSlave(const process::UPID &pid, RegisterSlaveMessage &&registerSlaveMessage, const process::Future< bool > &admit)
void registerSlave(const process::UPID &from, RegisterSlaveMessage &&registerSlaveMessage)
void updateUnavailability(const MachineID &machineId, const Option< Unavailability > &unavailability)
Task * getTask(const FrameworkID &frameworkId, const TaskID &taskId) const
Definition: hashmap.hpp:38
void failoverFramework(Framework *framework, const process::UPID &newPid)
process::Future< Nothing > _recover(const Registry &registry)
hashmap< ResourceProviderID, ResourceProviderInfo > resourceProviders
Definition: master.hpp:282
void removeTask(Task *task, bool unreachable=false)
bool contains(const std::string &s, const std::string &substr)
Definition: strings.hpp:406
#define CHECK_SOME(expression)
Definition: check.hpp:44
hashmap< std::string, MessageHandler > message
Definition: process.hpp:443
Definition: master.hpp:299
bool active() const
Definition: master.hpp:2784
void exitedExecutor(const process::UPID &from, const SlaveID &slaveId, const FrameworkID &frameworkId, const ExecutorID &executorId, int32_t status)
Resources checkpointedResources
Definition: master.hpp:270
SlaveObserver * observer
Definition: master.hpp:279
bool isTerminalState(const TaskState &state)
Definition: owned.hpp:25
Try< Nothing > unavailability(const Unavailability &unavailability)
An &quot;untyped&quot; PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
void addOffer(Offer *offer)
BoundedHashMap< TaskID, process::Owned< Task > > unreachableTasks
Definition: master.hpp:2832
Option< Error > validateFrameworkAuthentication(const FrameworkInfo &frameworkInfo, const process::UPID &from)
void markGone(Slave *slave, const TimeInfo &goneTime)
bool recovered() const
Definition: master.hpp:2786
Result< std::vector< Filter< Classifier > > > filters(const std::string &_link, const Handle &parent)
Definition: internal.hpp:776
process::Time registeredTime
Definition: master.hpp:200
bool active
Definition: master.hpp:209
hashmap< id::UUID, Operation * > operations
Definition: master.hpp:2845
Definition: http.hpp:340
An abstraction for contending to be a leading master.
Definition: contender.hpp:40
std::set< std::string > roles
Definition: master.hpp:2796
void reregisterSlave(const process::UPID &from, ReregisterSlaveMessage &&incomingMessage)
process::UPID pid
Definition: master.hpp:192
Definition: uuid.hpp:35
process::Future< Nothing > apply(Slave *slave, const Offer::Operation &operation)
Definition: protobuf_utils.hpp:439
void addSlave(Slave *slave, std::vector< Archive::Framework > &&completedFrameworks)
process::http::Pipe::Writer writer
Definition: master.hpp:329
bool write(std::string s)
Option< process::Time > reregisteredTime
Definition: master.hpp:201
Future< Nothing > readerClosed() const
void apply(const std::vector< ResourceConversion > &conversions)
void consume(process::MessageEvent &&event) override
process::Time reregisteredTime
Definition: master.hpp:2810
Master *const master
Definition: master.hpp:186
friend struct SlavesWriter
Definition: master.hpp:1731
void addOffer(Offer *offer)
Definition: master.hpp:2367
Timer delay(const Duration &duration, const PID< T > &pid, void(T::*method)())
Definition: delay.hpp:31
void put(const Key &key, const Value &value)
Definition: hashmap.hpp:104
void disconnect(Framework *framework)
Option< Error > quotaInfo(const mesos::quota::QuotaInfo &quotaInfo)
const T & get() const &
Definition: option.hpp:118
void removeExecutor(Slave *slave, const FrameworkID &frameworkId, const ExecutorID &executorId)
const std::string role
Definition: master.hpp:2971
Framework(Master *const master, const Flags &masterFlags, const FrameworkInfo &info, const HttpConnection &_http, const process::Time &time=process::Clock::now())
Definition: master.hpp:2183
void removeOffer(Offer *offer, bool rescind=false)
process::Future< bool > authorizeDestroyVolume(const Offer::Operation::Destroy &destroy, const Option< process::http::authentication::Principal > &principal)
Authorizes a DESTROY operation.
void updateSlaveFrameworks(Slave *slave, const std::vector< FrameworkInfo > &frameworks)
Definition: protobuf.hpp:100
process::Time registeredTime
Definition: master.hpp:2809
process::Future< Nothing > destroy(const std::string &hierarchy, const std::string &cgroup="/")
#define foreachvalue(VALUE, ELEMS)
Definition: foreach.hpp:77
bool isTrackedUnderRole(const std::string &role) const
void removeInverseOffer(InverseOffer *inverseOffer)
Definition: master.hpp:2396
void addExecutor(const FrameworkID &frameworkId, const ExecutorInfo &executorInfo)
Operation * getOperation(const id::UUID &uuid) const
Definition: whitelist_watcher.hpp:37
void removeFramework(Framework *framework)
Definition: master.hpp:2947
MasterInfo info() const
Definition: master.hpp:559
void inverseOfferTimeout(const OfferID &inverseOfferId)
bool isCompletedFramework(const FrameworkID &frameworkId)
hashmap< Option< ResourceProviderID >, id::UUID > resourceVersions
Definition: master.hpp:281
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Definition: time.hpp:23
void offerTimeout(const OfferID &offerId)
void schedulerMessage(const process::UPID &from, const SlaveID &slaveId, const FrameworkID &frameworkId, const ExecutorID &executorId, const std::string &data)
virtual void initialize() override
Invoked when a process gets spawned.
Definition: master.hpp:358
FrameworkInfo info
Definition: master.hpp:2794
ContentType contentType
Definition: master.hpp:330
void statusUpdateAcknowledgement(const process::UPID &from, const SlaveID &slaveId, const FrameworkID &frameworkId, const TaskID &taskId, const std::string &uuid)
Try< std::vector< Entry > > list(const std::string &hierarchy, const std::string &cgroup)
void initialize() override
Invoked when a process gets spawned.
id::UUID streamId
Definition: master.hpp:331
HttpConnection http
Definition: master.hpp:2008
void send(const Message &message)
Definition: master.hpp:2303
Definition: boundedhashmap.hpp:27
Basic model of an allocator: resources are allocated to a framework in the form of offers...
Definition: allocator.hpp:55
void _markUnreachable(const SlaveInfo &slave, const TimeInfo &unreachableTime, bool duringMasterFailover, const std::string &message, bool registrarResult)
void removeInverseOffer(InverseOffer *inverseOffer)
void agentReregisterTimeout(const SlaveID &slaveId)
void registerFramework(const process::UPID &from, const FrameworkInfo &frameworkInfo)
Offer * getOffer(const OfferID &offerId) const
void send(const process::Shared< mesos::master::Event > &event, const process::Owned< AuthorizationAcceptor > &authorizeRole, const process::Owned< AuthorizationAcceptor > &authorizeFramework, const process::Owned< AuthorizationAcceptor > &authorizeTask, const process::Owned< AuthorizationAcceptor > &authorizeExecutor, const Option< process::Shared< FrameworkInfo >> &frameworkInfo, const Option< process::Shared< Task >> &task)
void recoverResources(Task *task)
Definition: master.hpp:2271
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
#define flags
Definition: decoder.hpp:18
process::Future< bool > markUnreachable(const SlaveInfo &slave, bool duringMasterFailover, const std::string &message)
bool empty() const
Definition: resources.hpp:388
Resources addTask(const TaskInfo &task, Framework *framework, Slave *slave)
void contended(const process::Future< process::Future< Nothing >> &candidacy)
Definition: none.hpp:27
InverseOffer * getInverseOffer(const OfferID &inverseOfferId) const
friend void * schedule(void *)
static Try< UUID > fromBytes(const std::string &s)
Definition: uuid.hpp:49
void updateOperation(Operation *operation, const UpdateOperationStatusMessage &update, bool convertResources=true)
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:2836
void updateTask(Task *task, const StatusUpdate &update)
void removeTask(Task *task, bool unreachable)
Definition: master.hpp:2341
friend struct Slave
Definition: master.hpp:1730
void updateFramework(Framework *framework, const FrameworkInfo &frameworkInfo, const std::set< std::string > &suppressedRoles)
Object protobuf(const google::protobuf::Message &message)
Definition: protobuf.hpp:836
void addInverseOffer(InverseOffer *inverseOffer)
void _failoverFramework(Framework *framework)
const MachineID machineId
Definition: master.hpp:190
void set(const Key &key, const Value &value)
Definition: boundedhashmap.hpp:39
void frameworkFailoverTimeout(const FrameworkID &frameworkId, const process::Time &reregisteredTime)
void ___reregisterSlave(const process::UPID &pid, ReregisterSlaveMessage &&incomingMessage, const process::Future< bool > &updated)
hashmap< OperationID, id::UUID > operationUUIDs
Definition: master.hpp:2849
void reregisterFramework(const process::UPID &from, const FrameworkInfo &frameworkInfo, bool failover)
boost::circular_buffer< process::Owned< Task > > completedTasks
Definition: master.hpp:2826
Definition: master.hpp:117
hashmap< uint16_t, std::string > * statuses
Resources allocatedResources() const
Definition: master.hpp:2952
#define foreachkey(KEY, ELEMS)
Definition: foreach.hpp:74
HttpConnection(const process::http::Pipe::Writer &_writer, ContentType _contentType, id::UUID _streamId)
Definition: master.hpp:301
void recoverResources(Task *task)
constexpr size_t MAX_REMOVED_SLAVES
Definition: constants.hpp:84
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:254
bool close()
Definition: master.hpp:319
void authenticate(const process::UPID &from, const process::UPID &pid)
An abstraction of a Master detector which can be used to detect the leading master from a group...
Definition: detector.hpp:38
static Time now()
The current clock time for either the current process that makes this call or the global clock time i...
void addExecutor(const SlaveID &slaveId, const ExecutorInfo &executorInfo)
Definition: master.hpp:2411
process::Future< bool > authorizeUnreserveResources(const Offer::Operation::Unreserve &unreserve, const Option< process::http::authentication::Principal > &principal)
Authorizes an UNRESERVE operation.
void _reconcileTasks(Framework *framework, const std::vector< TaskStatus > &statuses)
Definition: event.hpp:103
void statusUpdate(StatusUpdate update, const process::UPID &pid)
process::Future< bool > authorizeSlave(const SlaveInfo &slaveInfo, const Option< process::http::authentication::Principal > &principal)
void inverseOffer(const FrameworkID &frameworkId, const hashmap< SlaveID, UnavailableResources > &resources)
hashmap< TaskID, TaskInfo > pendingTasks
Definition: master.hpp:2815
process::Future< bool > authorizeFramework(const FrameworkInfo &frameworkInfo)
Heartbeater(const std::string &_logMessage, const Message &_heartbeatMessage, const HttpConnection &_http, const Duration &_interval, const Option< Duration > &_delay=None())
Definition: master.hpp:345
void recoveredSlavesTimeout(const Registry &registry)
virtual void lost(const UPID &)
Invoked when a linked process can no longer be monitored.
Definition: process.hpp:133
Definition: master.hpp:342
bool hasExecutor(const FrameworkID &frameworkId, const ExecutorID &executorId) const
Given an encoding function for individual records, this provides encoding from typed records into &quot;Re...
Definition: recordio.hpp:57
Option< process::Owned< Heartbeater< scheduler::Event, v1::scheduler::Event > > > heartbeater
Definition: master.hpp:2886
Option< process::UPID > pid
Definition: master.hpp:2805
Master(mesos::allocator::Allocator *allocator, Registrar *registrar, Files *files, mesos::master::contender::MasterContender *contender, mesos::master::detector::MasterDetector *detector, const Option< Authorizer * > &authorizer, const Option< std::shared_ptr< process::RateLimiter >> &slaveRemovalLimiter, const Flags &flags=Flags())
void _reregisterSlave(const process::UPID &pid, ReregisterSlaveMessage &&incomingMessage, const Option< process::http::authentication::Principal > &principal, const process::Future< bool > &authorized)
void authenticationTimeout(process::Future< Option< std::string >> future)
Definition: metrics.hpp:38
std::string toString() const
Definition: uuid.hpp:87
bool isSpeculativeOperation(const Offer::Operation &operation)
State
Definition: master.hpp:2153
Offer * getOffer(Master *master, const OfferID &offerId)
Try< Nothing > create(const std::string &hierarchy, const std::string &cgroup, bool recursive=false)
hashmap< FrameworkID, Framework * > frameworks
Definition: master.hpp:2980
Subscriber & operator=(const Subscriber &)=delete
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
process::Time unregisteredTime
Definition: master.hpp:2811
std::string serialize(ContentType contentType, const google::protobuf::Message &message)
void addOperation(Operation *operation)
Definition: master.hpp:2482
std::string version
Definition: master.hpp:195
URI http(const std::string &host, const std::string &path="/", const Option< int > &port=None(), const Option< std::string > &query=None(), const Option< std::string > &fragment=None(), const Option< std::string > &user=None(), const Option< std::string > &password=None())
Creates an http URI with the given parameters.
Definition: http.hpp:35
hashmap< FrameworkID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:224
void reviveOffers(const process::UPID &from, const FrameworkID &frameworkId, const std::vector< std::string > &role)
hashset< Offer * > offers
Definition: master.hpp:2834
std::string stringify(int flags)
Definition: owned.hpp:35
void fileAttached(const process::Future< Nothing > &result, const std::string &path)
void addTask(Task *task)
Definition: master.hpp:2214
void reconcileTasks(const process::UPID &from, const FrameworkID &frameworkId, const std::vector< TaskStatus > &statuses)
void closeHttpConnection()
Definition: master.hpp:2746
Definition: master.hpp:2151
void untrackUnderRole(const std::string &role)
bool contains(const Resources &that) const
protobuf::slave::Capabilities capabilities
Definition: master.hpp:198
hashmap< SlaveID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:2841
Definition: process.hpp:493
void sendSlaveLost(const SlaveInfo &slaveInfo)
process::Owned< Heartbeater< mesos::master::Event, v1::master::Event > > heartbeater
Definition: master.hpp:2010
hashmap< id::UUID, Operation * > operations
Definition: master.hpp:248
void reconcileKnownSlave(Slave *slave, const std::vector< ExecutorInfo > &executors, const std::vector< Task > &tasks)
bool contains(const Key &key) const
Definition: hashmap.hpp:86
void forward(const StatusUpdate &update, const process::UPID &acknowledgee, Framework *framework)
Nothing _agentReregisterTimeout(const SlaveID &slaveId)
void removeFramework(Framework *framework)
Try< std::set< pid_t > > pids()
Definition: freebsd.hpp:62
const Option< process::http::authentication::Principal > principal
Definition: master.hpp:2011
hashmap< SlaveID, Resources > usedResources
Definition: master.hpp:2878
Resources totalOfferedResources
Definition: master.hpp:2881
Definition: master.hpp:392
Resources offeredResources
Definition: master.hpp:261
hashmap< FrameworkID, hashmap< TaskID, TaskInfo > > pendingTasks
Definition: master.hpp:230
const FrameworkID id() const
Definition: master.hpp:2591
process::Future< bool > authorizeReserveResources(const Offer::Operation::Reserve &reserve, const Option< process::http::authentication::Principal > &principal)
Authorizes a RESERVE operation.
void submitScheduler(const std::string &name)
void removeOperation(Operation *operation)
void deactivate(Framework *framework, bool rescind)
const T & get() const
Definition: try.hpp:73
constexpr const char * name
Definition: shell.hpp:41
hashmap< FrameworkID, Resources > usedResources
Definition: master.hpp:259
void throttled(process::MessageEvent &&event, const Option< std::string > &principal)
void addOperation(Framework *framework, Slave *slave, Operation *operation)
void __removeSlave(Slave *slave, const std::string &message, const Option< TimeInfo > &unreachableTime)
process::Future< bool > authorizeCreateVolume(const Offer::Operation::Create &create, const Option< process::http::authentication::Principal > &principal)
Authorizes a CREATE operation.
void removeOffer(Offer *offer)
void updateOperationStatus(const UpdateOperationStatusMessage &update)
Try< std::vector< Value > > time(const std::string &hierarchy, const std::string &cgroup)
Framework * getFramework(const FrameworkID &frameworkId) const
void addInverseOffer(InverseOffer *inverseOffer)
Definition: master.hpp:2389
State state
Definition: master.hpp:2807
void _authenticate(const process::UPID &pid, const process::Future< Option< std::string >> &future)