24 #include <sys/socket.h>
25 #include <netinet/in.h>
26 #include <arpa/inet.h>
31 #include <netinet/tcp.h>
35 #include <event2/event.h>
36 #include <event2/thread.h>
43 #include <boost/lexical_cast.hpp>
44 #include <boost/bind.hpp>
46 #include <graphlab/rpc/dc_tcp_comm.hpp>
47 #include <graphlab/rpc/dc_internal_types.hpp>
49 #define compile_barrier() asm volatile("": : :"memory")
51 #include <graphlab/macros_def.hpp>
58 void dc_tcp_comm::init(
const std::vector<std::string> &machines,
59 const std::map<std::string,std::string> &initopts,
61 std::vector<dc_receive*> receiver_,
62 std::vector<dc_send*> sender_) {
65 ASSERT_LT(machines.size(), std::numeric_limits<procid_t>::max());
66 nprocs = (
procid_t)(machines.size());
71 all_addrs.resize(nprocs);
72 portnums.resize(nprocs);
73 assert(triggered_timeouts.size() >= nprocs);
74 triggered_timeouts.clear();
77 for (
size_t i = 0;i < nprocs; ++i) {
82 sock[i].inevent = NULL;
83 sock[i].outevent = NULL;
84 sock[i].wouldblock =
false;
85 sock[i].data.msg_name = NULL;
86 sock[i].data.msg_namelen = 0;
87 sock[i].data.msg_control = NULL;
88 sock[i].data.msg_controllen = 0;
89 sock[i].data.msg_flags = 0;
90 sock[i].data.msg_iovlen = 0;
91 sock[i].data.msg_iov = NULL;
94 for (
size_t i = 0;i < machines.size(); ++i) {
96 size_t pos = machines[i].find(
":");
97 ASSERT_NE(pos, std::string::npos);
98 std::string address = machines[i].substr(0, pos);
99 size_t port = boost::lexical_cast<
size_t>(machines[i].substr(pos+1));
101 struct hostent* ent = gethostbyname(address.c_str());
102 ASSERT_EQ(ent->h_length, 4);
103 uint32_t addr = *
reinterpret_cast<uint32_t*
>(ent->h_addr_list[0]);
106 ASSERT_LT(port, 65536);
107 portnums[i] = (uint16_t)(port);
109 network_bytessent = 0;
112 std::map<std::string, std::string>::const_iterator iter =
113 initopts.find(
"__sockhandle__");
114 if (iter != initopts.end()) {
115 open_listening(atoi(iter->second.c_str()));
119 for(
size_t i = 0;i < nprocs; ++i) connect(i);
122 size_t prevconnected = -1;
124 size_t connected = 0;
125 for (
size_t i = 0;i < sock.size(); ++i) {
126 connected += (sock[i].insock != -1);
128 if (connected == sock.size()) {
131 if (prevconnected != connected) {
132 logstream(
LOG_INFO) << curmachineid <<
": Waiting for " << sock.size() - connected
133 <<
" more hosts..." << std::endl;
135 prevconnected = connected;
136 insock_cond.wait(insock_lock);
138 insock_lock.unlock();
144 inthreads.launch(boost::bind(&dc_tcp_comm::receive_loop,
this, inevbase),
thread::cpu_count() - 2);
145 outthreads.launch(boost::bind(&dc_tcp_comm::send_loop,
this, outevbase),
thread::cpu_count() - 1);
149 void dc_tcp_comm::construct_events() {
150 int ret = evthread_use_pthreads();
151 if (ret < 0) logstream(
LOG_FATAL) <<
"Unable to initialize libevent with pthread support!" << std::endl;
153 outevbase = event_base_new();
154 if (!outevbase) logstream(
LOG_FATAL) <<
"Unable to construct libevent base" << std::endl;
155 send_all_timeout.owner =
this;
156 send_all_timeout.send_all =
true;
157 send_triggered_timeout.owner =
this;
158 send_triggered_timeout.send_all =
false;
159 send_all_event = event_new(outevbase, -1, EV_TIMEOUT | EV_PERSIST, on_send_event, &(send_all_timeout));
160 assert(send_all_event != NULL);
161 struct timeval t = {0, 10000};
162 event_add(send_all_event, &t);
163 send_triggered_event = event_new(outevbase, -1, EV_TIMEOUT | EV_PERSIST, on_send_event, &(send_triggered_timeout));
164 assert(send_triggered_event != NULL);
166 inevbase = event_base_new();
167 if (!inevbase) logstream(
LOG_FATAL) <<
"Unable to construct libevent base" << std::endl;
171 for (
size_t i = 0;i < sock.size(); ++i) {
172 sock[i].inevent = event_new(inevbase, sock[i].insock, EV_READ | EV_PERSIST | EV_ET,
173 on_receive_event, &(sock[i]));
174 if (sock[i].inevent == NULL) {
175 logstream(
LOG_FATAL) <<
"Unable to register socket read event" << std::endl;
178 sock[i].outevent = event_new(outevbase, sock[i].outsock, EV_WRITE | EV_PERSIST | EV_ET,
179 on_send_event, &(sock[i]));
180 if (sock[i].outevent == NULL) {
181 logstream(
LOG_FATAL) <<
"Unable to register socket write event" << std::endl;
184 event_add(sock[i].inevent, NULL);
186 event_add(sock[i].outevent, NULL);
190 void dc_tcp_comm::trigger_send_timeout(
procid_t target,
bool urgent) {
192 if (sock[target].wouldblock ==
false &&
193 triggered_timeouts.get(target) ==
false) {
194 triggered_timeouts.set_bit(target);
195 event_active(send_triggered_event, EV_TIMEOUT, 1);
199 process_sock(&(sock[target]));
204 void dc_tcp_comm::close() {
205 if (is_closed)
return;
206 logstream(
LOG_INFO) <<
"Closing listening socket" << std::endl;
208 if (listensock > 0) {
216 event_base_loopbreak(outevbase);
218 for (
size_t i = 0;i < sock.size(); ++i) {
219 event_free(sock[i].outevent);
221 event_free(send_triggered_event);
222 event_free(send_all_event);
223 event_base_free(outevbase);
226 logstream(
LOG_INFO) <<
"Closing outgoing sockets" << std::endl;
228 for (
size_t i = 0;i < sock.size(); ++i) {
229 if (sock[i].outsock > 0) {
230 ::close(sock[i].outsock);
231 sock[i].outsock = -1;
236 event_base_loopbreak(inevbase);
238 for (
size_t i = 0;i < sock.size(); ++i) {
239 event_free(sock[i].inevent);
241 event_base_free(inevbase);
244 logstream(
LOG_INFO) <<
"Closing incoming sockets" << std::endl;
246 for (
size_t i = 0;i < sock.size(); ++i) {
247 if (sock[i].insock > 0) {
248 ::close(sock[i].insock);
256 bool dc_tcp_comm::send_till_block(socket_info& sockinfo) {
257 sockinfo.wouldblock =
false;
259 BEGIN_TRACEPOINT(tcp_send_call);
260 while(!sockinfo.outvec.empty()) {
261 sockinfo.outvec.fill_msghdr(sockinfo.data);
262 ssize_t ret = sendmsg(sockinfo.outsock, &sockinfo.data, 0);
264 END_TRACEPOINT(tcp_send_call);
265 if (errno == EWOULDBLOCK || errno == EAGAIN) {
266 sockinfo.wouldblock =
true;
270 logstream(
LOG_FATAL) <<
"send error: " << strerror(errno) << std::endl;
276 logstream(
LOG_INFO) << ret <<
" bytes --> " << sockinfo.id << std::endl;
278 network_bytessent.inc(ret);
279 sockinfo.outvec.sent(ret);
281 END_TRACEPOINT(tcp_send_call);
285 int dc_tcp_comm::sendtosock(
int sockfd,
const char* buf,
size_t len) {
287 BEGIN_TRACEPOINT(tcp_send_call);
288 while (numsent < len) {
289 ssize_t ret = ::send(sockfd, buf + numsent, len - numsent, 0);
291 logstream(
LOG_ERROR) <<
"send error: " << strerror(errno) << std::endl;
292 END_TRACEPOINT(tcp_send_call);
297 END_TRACEPOINT(tcp_send_call);
301 void dc_tcp_comm::set_tcp_no_delay(
int fd) {
303 int result = setsockopt(fd,
310 <<
"Unable to disable Nagle. Performance may be signifantly reduced"
316 void dc_tcp_comm::set_non_blocking(
int fd) {
317 int flag = fcntl(fd, F_GETFL);
319 logstream(
LOG_FATAL) <<
"Unable to get socket flags" << std::endl;
322 if (fcntl(fd, F_SETFL, flag) < 0) {
323 logstream(
LOG_FATAL) <<
"Unable to set socket as non-blocking" << std::endl;
329 void dc_tcp_comm::new_socket(
int newsock, sockaddr_in* otheraddr,
332 uint32_t addr = *
reinterpret_cast<uint32_t*
>(&(otheraddr->sin_addr));
334 logstream(
LOG_INFO) <<
"Incoming connection from "
335 << inet_ntoa(otheraddr->sin_addr) << std::endl;
336 ASSERT_LT(
id, all_addrs.size());
337 ASSERT_EQ(all_addrs[
id], addr);
339 ASSERT_EQ(sock[
id].insock, -1);
340 sock[id].insock = newsock;
341 insock_cond.signal();
342 insock_lock.unlock();
343 logstream(
LOG_INFO) <<
"Proc " << procid() <<
" accepted connection "
344 <<
"from machine " <<
id << std::endl;
349 void dc_tcp_comm::open_listening(
int sockhandle) {
351 if (sockhandle == 0) {
352 listensock = socket(AF_INET, SOCK_STREAM, 0);
355 my_addr.sin_family = AF_INET;
356 my_addr.sin_port = htons(portnums[curid]);
357 my_addr.sin_addr.s_addr = INADDR_ANY;
358 memset(&(my_addr.sin_zero),
'\0', 8);
359 logstream(
LOG_INFO) <<
"Proc " << procid() <<
" Bind on "
360 << portnums[curid] <<
"\n";
361 if (bind(listensock, (sockaddr*)&my_addr,
sizeof(my_addr)) < 0)
363 logstream(
LOG_FATAL) <<
"bind: " << strerror(errno) <<
"\n";
368 listensock = sockhandle;
370 logstream(
LOG_INFO) <<
"Proc " << procid()
371 <<
" listening on " << portnums[curid] <<
"\n";
372 ASSERT_EQ(0, listen(listensock, 128));
374 listenthread.launch(boost::bind(&dc_tcp_comm::accept_handler,
this));
377 void dc_tcp_comm::connect(
size_t target) {
378 if (sock[target].outsock != -1) {
381 int newsock = socket(AF_INET, SOCK_STREAM, 0);
382 set_tcp_no_delay(newsock);
383 sockaddr_in serv_addr;
384 serv_addr.sin_family = AF_INET;
386 serv_addr.sin_port = htons(portnums[target]);
388 serv_addr.sin_addr = *(
struct in_addr*)&(all_addrs[target]);
389 memset(&(serv_addr.sin_zero),
'\0', 8);
391 logstream(
LOG_INFO) <<
"Trying to connect from "
392 << curid <<
" -> " << target
393 <<
" on port " << portnums[target] <<
"\n";
394 logger(
LOG_INFO,
"Destination IP = %s", inet_ntoa(serv_addr.sin_addr));
396 bool success =
false;
397 for (
size_t i = 0;i < 10; ++i) {
398 if (::connect(newsock, (sockaddr*)&serv_addr,
sizeof(serv_addr)) < 0) {
400 <<
"connect " << curid <<
" to " << target <<
": "
401 << strerror(errno) <<
". Retrying...\n";
408 newsock = socket(AF_INET, SOCK_STREAM, 0);
409 set_tcp_no_delay(newsock);
412 sendtosock(newsock, reinterpret_cast<char*>(&curid),
sizeof(curid));
413 set_non_blocking(newsock);
419 logstream(
LOG_FATAL) <<
"Failed to establish connection" << std::endl;
422 sock[target].outsock = newsock;
423 logstream(
LOG_INFO) <<
"connection from " << curid <<
" to " << target
424 <<
" established." << std::endl;
437 void dc_tcp_comm::accept_handler() {
442 size_t numsocks_connected = 0;
443 logstream(
LOG_INFO) <<
"Listening thread launched." << std::endl;
444 while(numsocks_connected < sock.size()) {
448 if (pf.revents & POLLIN) {
449 logstream(
LOG_INFO) <<
"Accepting...." << std::endl;
451 sockaddr_in their_addr;
452 socklen_t namelen =
sizeof(sockaddr_in);
453 int newsock = accept(listensock, (sockaddr*)&their_addr, &namelen);
454 logstream(
LOG_INFO) <<
"Accepted" << std::endl;
459 set_tcp_no_delay(newsock);
464 int retval = recv(newsock, (
char*)(&remotemachineid) + msglen,
467 if (errno == EWOULDBLOCK || errno == EAGAIN) {
471 logstream(
LOG_FATAL) <<
"error: " << errno <<
" receive error: " << strerror(errno) << std::endl;
474 else if (retval > 0) {
477 else if (retval == 0) {
478 std::cout <<
"error: connection dropped." << std::endl;
486 set_non_blocking(newsock);
487 new_socket(newsock, &their_addr, remotemachineid);
488 ++numsocks_connected;
491 if (listensock == -1) {
496 logstream(
LOG_INFO) <<
"Listening thread quitting" << std::endl;
501 void on_receive_event(
int fd,
short ev,
void* arg) {
502 dc_tcp_comm::socket_info* sockinfo = (dc_tcp_comm::socket_info*)(arg);
503 dc_tcp_comm* comm = sockinfo->owner;
506 dc_receive* receiver = comm->receiver[sockinfo->id];
509 char *c = receiver->get_buffer(buflength);
511 ssize_t msglen = recv(fd, c, buflength, 0);
513 if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
515 logstream(
LOG_FATAL) <<
"receive error: " << strerror(errno) << std::endl;
519 else if (msglen == 0) {
523 else if (msglen > 0) {
524 comm->network_bytesreceived.inc(msglen);
526 logstream(
LOG_INFO) << msglen <<
" bytes <-- "
527 << sockinfo->id << std::endl;
529 c = receiver->advance_buffer(c, msglen, buflength);
535 void dc_tcp_comm::receive_loop(
struct event_base* ev) {
536 logstream(
LOG_INFO) <<
"Receive loop Started" << std::endl;
537 int ret = event_base_dispatch(ev);
539 logstream(
LOG_FATAL) <<
"Receive loop Quit with " << ret << std::endl;
542 logstream(
LOG_INFO) <<
"Receive loop Stopped" << std::endl;
547 void dc_tcp_comm::check_for_new_data(dc_tcp_comm::socket_info& sockinfo) {
548 buffered_len.inc(sender[sockinfo.id]->get_outgoing_data(sockinfo.outvec));
552 inline void process_sock(dc_tcp_comm::socket_info* sockinfo) {
553 if (sockinfo->m.try_lock()) {
554 dc_tcp_comm* comm = sockinfo->owner;
556 if (sockinfo->wouldblock ==
false) {
557 comm->check_for_new_data(*sockinfo);
558 if (!sockinfo->outvec.empty()) {
559 comm->send_till_block(*sockinfo);
562 sockinfo->m.unlock();
567 void on_send_event(
int fd,
short ev,
void* arg) {
569 dc_tcp_comm::socket_info* sockinfo = (dc_tcp_comm::socket_info*)(arg);
570 sockinfo->wouldblock =
false;
571 process_sock(sockinfo);
573 else if (ev & EV_TIMEOUT) {
574 dc_tcp_comm::timeout_event* te = (dc_tcp_comm::timeout_event*)(arg);
575 dc_tcp_comm* comm = te->owner;
576 if (te->send_all ==
false) {
578 foreach(uint32_t i, comm->triggered_timeouts) {
579 comm->triggered_timeouts.clear_bit(i);
580 dc_tcp_comm::socket_info* sockinfo = &(comm->sock[i]);
581 process_sock(sockinfo);
585 for(uint32_t i = 0;i < comm->sock.size(); ++i) {
586 dc_tcp_comm::socket_info* sockinfo = &(comm->sock[i]);
587 process_sock(sockinfo);
594 void dc_tcp_comm::send_loop(
struct event_base* ev) {
595 logstream(
LOG_INFO) <<
"Send loop Started" << std::endl;
596 int ret = event_base_dispatch(ev);
598 logstream(
LOG_FATAL) <<
"Send loop Quit with " << ret << std::endl;
601 logstream(
LOG_INFO) <<
"Send loop Stopped" << std::endl;