24 #ifndef GRAPHLAB_DC_HPP
25 #define GRAPHLAB_DC_HPP
27 #include <boost/iostreams/stream.hpp>
28 #include <boost/function.hpp>
29 #include <graphlab/parallel/pthread_tools.hpp>
30 #include <graphlab/parallel/thread_pool.hpp>
31 #include <graphlab/util/resizing_array_sink.hpp>
32 #include <graphlab/util/blocking_queue.hpp>
33 #include <graphlab/util/dense_bitset.hpp>
34 #include <graphlab/serialization/serialization_includes.hpp>
36 #include <graphlab/rpc/dc_types.hpp>
37 #include <graphlab/rpc/dc_internal_types.hpp>
39 #include <graphlab/rpc/dc_receive.hpp>
40 #include <graphlab/rpc/dc_send.hpp>
41 #include <graphlab/rpc/dc_comm_base.hpp>
42 #include <graphlab/rpc/dc_dist_object_base.hpp>
44 #include <graphlab/rpc/is_rpc_call.hpp>
45 #include <graphlab/rpc/function_call_issue.hpp>
46 #include <graphlab/rpc/function_broadcast_issue.hpp>
47 #include <graphlab/rpc/request_issue.hpp>
48 #include <graphlab/rpc/reply_increment_counter.hpp>
49 #include <graphlab/rpc/function_ret_type.hpp>
50 #include <graphlab/rpc/dc_compile_parameters.hpp>
51 #include <graphlab/util/tracepoint.hpp>
52 #include <graphlab/rpc/distributed_event_log.hpp>
53 #include <boost/preprocessor.hpp>
54 #include <graphlab/rpc/function_arg_types_def.hpp>
124 class dc_buffered_stream_send2;
125 class dc_stream_receive;
202 struct function_call_block{
203 function_call_block() {}
205 function_call_block(
char* data,
size_t len,
206 unsigned char packet_mask):
207 data(data), len(len), packet_mask(packet_mask){}
211 unsigned char packet_mask;
215 void init(
const std::vector<std::string> &machines,
216 const std::string &initstring,
218 size_t numhandlerthreads,
222 dc_impl::dc_comm_base* comm;
225 std::vector<dc_impl::dc_receive*> receivers;
226 std::vector<dc_impl::dc_send*> senders;
230 std::vector<atomic<size_t> > fcall_handler_active;
231 std::vector<mutex> fcall_handler_blockers;
233 struct fcallqueue_entry {
234 std::vector<function_call_block> calls;
237 atomic<size_t>* chunk_ref_counter;
242 std::vector<blocking_queue<fcallqueue_entry*> > fcallqueue;
245 atomic<size_t> fcallqueue_length;
248 std::vector<void*> registered_objects;
249 std::vector<dc_impl::dc_dist_object_base*> registered_rmi_instance;
252 dc_services* distributed_services;
261 std::vector<atomic<size_t> > global_calls_sent;
262 std::vector<atomic<size_t> > global_calls_received;
264 std::vector<atomic<size_t> > global_bytes_received;
266 std::vector<boost::function<void(void)> > deletion_callbacks;
269 friend class dc_impl::dc_stream_receive;
270 friend class dc_impl::dc_buffered_stream_send2;
276 std::map<std::string, std::string> parse_options(std::string initstring);
278 volatile inline size_t num_registered_objects() {
279 return registered_objects.size();
284 DECLARE_TRACER(dc_receive_queuing);
285 DECLARE_TRACER(dc_receive_multiplexing);
286 DECLARE_TRACER(dc_call_dispatch);
288 DECLARE_EVENT(EVENT_NETWORK_BYTES);
289 DECLARE_EVENT(EVENT_RPC_CALLS);
317 return localnumprocs;
328 deletion_callbacks.push_back(deleter);
408 #define GENARGS(Z,N,_) BOOST_PP_CAT(const T, N) BOOST_PP_CAT(&i, N)
409 #define GENI(Z,N,_) BOOST_PP_CAT(i, N)
410 #define GENT(Z,N,_) BOOST_PP_CAT(T, N)
411 #define GENARC(Z,N,_) arc << BOOST_PP_CAT(i, N);
413 #define RPC_INTERFACE_GENERATOR(Z,N,FNAME_AND_CALL) \
414 template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
415 void BOOST_PP_TUPLE_ELEM(3,0,FNAME_AND_CALL) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
416 ASSERT_LT(target, senders.size()); \
417 BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,FNAME_AND_CALL),N) \
418 <F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
419 ::exec(senders[target], BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL), target, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
425 BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (
remote_call, dc_impl::remote_call_issue, STANDARD_CALL) )
426 BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (reply_remote_call,dc_impl::remote_call_issue, STANDARD_CALL | REPLY_PACKET) )
427 BOOST_PP_REPEAT(6, RPC_INTERFACE_GENERATOR, (control_call, dc_impl::remote_call_issue, (STANDARD_CALL | CONTROL_PACKET)) )
430 #define BROADCAST_INTERFACE_GENERATOR(Z,N,FNAME_AND_CALL) \
431 template<typename Iterator, typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
432 void BOOST_PP_TUPLE_ELEM(3,0,FNAME_AND_CALL) (Iterator target_begin, Iterator target_end, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
433 if (target_begin == target_end) return; \
434 BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,FNAME_AND_CALL),N) \
435 <Iterator, F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
436 ::exec(senders, BOOST_PP_TUPLE_ELEM(3,2,FNAME_AND_CALL), target_begin, target_end, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
439 BOOST_PP_REPEAT(6, BROADCAST_INTERFACE_GENERATOR, (
remote_call, dc_impl::remote_broadcast_issue, STANDARD_CALL) )
441 #define REQUEST_INTERFACE_GENERATOR(Z,N,ARGS) \
442 template<typename F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, typename T)> \
443 BOOST_PP_TUPLE_ELEM(3,0,ARGS) (procid_t target, F remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENARGS ,_) ) { \
444 ASSERT_LT(target, senders.size()); \
445 return BOOST_PP_CAT( BOOST_PP_TUPLE_ELEM(3,1,ARGS),N) \
446 <F BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM_PARAMS(N, T)> \
447 ::exec(senders[target], BOOST_PP_TUPLE_ELEM(3,2,ARGS), target, remote_function BOOST_PP_COMMA_IF(N) BOOST_PP_ENUM(N,GENI ,_) ); \
453 BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (
typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type
remote_request, dc_impl::remote_request_issue, STANDARD_CALL | WAIT_FOR_REPLY) )
454 BOOST_PP_REPEAT(6, REQUEST_INTERFACE_GENERATOR, (typename dc_impl::function_ret_type<__GLRPC_FRESULT>::type control_request, dc_impl::remote_request_issue, (STANDARD_CALL | WAIT_FOR_REPLY | CONTROL_PACKET)) )
458 #undef RPC_INTERFACE_GENERATOR
459 #undef BROADCAST_INTERFACE_GENERATOR
460 #undef REQUEST_INTERFACE_GENERATOR
471 #if DOXYGEN_DOCUMENTATION
571 void remote_call(Iterator machine_begin, Iterator machine_end, Fn fn, ...);
627 void exec_function_call(
procid_t source,
unsigned char packet_type_mask,
const char* data,
const size_t len);
635 void process_fcall_block(fcallqueue_entry &fcallblock);
643 void deferred_function_call_chunk(
char* buf,
size_t len,
procid_t src);
649 void fcallhandler_loop(
size_t id);
661 void stop_handler_threads(
size_t threadid,
size_t total_threadid);
672 void stop_handler_threads_no_wait(
size_t threadid,
size_t total_threadid);
682 void handle_incoming_calls(
size_t threadid,
size_t total_threadid);
693 void start_handler_threads(
size_t threadid,
size_t total_threadid);
696 size_t recv_queue_length()
const {
697 return fcallqueue_length.value;
700 size_t send_queue_length()
const {
701 return comm->send_queue_length();
709 global_calls_sent[
procid].inc();
712 inline void inc_calls_received(
procid_t procid) {
714 if (!full_barrier_in_effect) {
715 size_t t = global_calls_received[
procid].inc();
716 if (full_barrier_in_effect) {
717 if (t == calls_to_receive[procid]) {
719 if (procs_complete.
set_bit(procid) ==
false) {
723 full_barrier_lock.
lock();
724 if (num_proc_recvs_incomplete.dec() == 0) {
725 full_barrier_cond.
signal();
727 full_barrier_lock.
unlock();
736 if (global_calls_received[procid].inc() == calls_to_receive[procid]) {
738 if (procs_complete.
set_bit(procid) ==
false) {
742 full_barrier_lock.
lock();
743 if (num_proc_recvs_incomplete.dec() == 0) {
744 full_barrier_cond.
signal();
746 full_barrier_lock.
unlock();
756 for (
size_t i = 0;i <
numprocs(); ++i) {
757 ctr += global_calls_sent[i].value;
765 for (
size_t i = 0;i <
numprocs(); ++i) {
766 ctr += global_calls_sent[i].value;
768 return double(ctr)/(1024 * 1024);
776 for (
size_t i = 0;i <
numprocs(); ++i) {
777 ctr += global_calls_received[i].value;
787 for (
size_t i = 0;i < senders.size(); ++i) ret += senders[i]->
bytes_sent();
795 return comm->network_bytes_sent();
802 return double(comm->network_bytes_sent()) / (1024 * 1024);
812 for (
size_t i = 0;i < global_bytes_received.size(); ++i) {
813 ret += global_bytes_received[i].value;
821 inline size_t register_object(
void* v, dc_impl::dc_dist_object_base *rmiinstance) {
822 ASSERT_NE(v, (
void*)NULL);
823 registered_objects.push_back(v);
824 registered_rmi_instance.push_back(rmiinstance);
825 return registered_objects.size() - 1;
829 inline void* get_registered_object(
size_t id) {
830 while(__builtin_expect((
id >= num_registered_objects()), 0)) sched_yield();
831 while (__builtin_expect(registered_objects[
id] == NULL, 0)) sched_yield();
832 return registered_objects[id];
836 inline dc_impl::dc_dist_object_base* get_rmi_instance(
size_t id) {
837 while(
id >= num_registered_objects()) sched_yield();
838 ASSERT_NE(registered_rmi_instance[
id], (
void*)NULL);
839 return registered_rmi_instance[id];
843 inline void clear_registered_object(
size_t id) {
844 registered_objects[id] = (
void*)NULL;
845 registered_rmi_instance[id] = NULL;
895 template <
typename U>
936 template <
typename U>
984 template <
typename U>
985 inline void broadcast(U& data,
bool originator,
bool control =
false);
1030 template <
typename U>
1031 inline void gather(std::vector<U>& data,
procid_t sendto,
bool control =
false);
1067 template <
typename U>
1068 inline void all_gather(std::vector<U>& data,
bool control =
false);
1095 template <
typename U>
1096 inline void all_reduce(U& data,
bool control =
false);
1138 template <
typename U,
typename PlusEqual>
1139 inline void all_reduce2(U& data, PlusEqual plusequal,
bool control =
false);
1187 else return nullstrm;
1195 else return nullstrm;
1199 mutex full_barrier_lock;
1201 std::vector<size_t> calls_to_receive;
1205 volatile bool full_barrier_in_effect;
1209 atomic<size_t> num_proc_recvs_incomplete;
1214 mutable boost::iostreams::stream<boost::iostreams::null_sink> nullstrm;
1221 struct collected_statistics {
1224 size_t network_bytessent;
1225 collected_statistics(): callssent(0), bytessent(0), network_bytessent(0) { }
1226 void save(oarchive &oarc)
const {
1227 oarc << callssent << bytessent << network_bytessent;
1229 void load(iarchive &iarc) {
1230 iarc >> callssent >> bytessent >> network_bytessent;
1245 #define REGISTER_RPC(dc, f) dc.register_rpc<typeof(f)*, f>(std::string(BOOST_PP_STRINGIZE(f)))
1247 #include <graphlab/rpc/function_arg_types_undef.hpp>
1248 #include <graphlab/rpc/function_call_dispatch.hpp>
1249 #include <graphlab/rpc/request_dispatch.hpp>
1250 #include <graphlab/rpc/dc_dist_object.hpp>
1251 #include <graphlab/rpc/dc_services.hpp>
1253 namespace graphlab {
1255 template <
typename U>
1257 distributed_services->send_to(target, t, control);
1260 template <
typename U>
1262 distributed_services->recv_from(source, t, control);
1265 template <
typename U>
1267 distributed_services->broadcast(data, originator, control);
1270 template <
typename U>
1272 distributed_services->gather(data, sendto, control);
1275 template <
typename U>
1277 distributed_services->all_gather(data, control);
1280 template <
typename U>
1282 distributed_services->all_reduce(data, control);
1286 template <
typename U,
typename PlusEqual>
1288 distributed_services->all_reduce2(data, plusequal, control);
1294 extern procid_t get_last_dc_procid();
1301 #include <graphlab/util/mpi_tools.hpp>