GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
dc_tcp_comm.cpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #include <sys/socket.h>
25 #include <netinet/in.h>
26 #include <arpa/inet.h>
27 #include <netdb.h>
28 #include <unistd.h>
29 #include <fcntl.h>
30 #include <signal.h>
31 #include <netinet/tcp.h>
32 #include <ifaddrs.h>
33 #include <poll.h>
34 
35 #include <event2/event.h>
36 #include <event2/thread.h>
37 
38 #include <limits>
39 #include <vector>
40 #include <string>
41 #include <map>
42 
43 #include <boost/lexical_cast.hpp>
44 #include <boost/bind.hpp>
46 #include <graphlab/rpc/dc_tcp_comm.hpp>
47 #include <graphlab/rpc/dc_internal_types.hpp>
48 
49 #define compile_barrier() asm volatile("": : :"memory")
50 
51 #include <graphlab/macros_def.hpp>
52 
53 //#define COMM_DEBUG
54 namespace graphlab {
55 
56  namespace dc_impl {
57 
58  void dc_tcp_comm::init(const std::vector<std::string> &machines,
59  const std::map<std::string,std::string> &initopts,
60  procid_t curmachineid,
61  std::vector<dc_receive*> receiver_,
62  std::vector<dc_send*> sender_) {
63 
64  curid = curmachineid;
65  ASSERT_LT(machines.size(), std::numeric_limits<procid_t>::max());
66  nprocs = (procid_t)(machines.size());
67  receiver = receiver_;
68  sender = sender_;
69 
70  // insert machines into the address map
71  all_addrs.resize(nprocs);
72  portnums.resize(nprocs);
73  assert(triggered_timeouts.size() >= nprocs);
74  triggered_timeouts.clear();
75  // fill all the socks
76  sock.resize(nprocs);
77  for (size_t i = 0;i < nprocs; ++i) {
78  sock[i].id = i;
79  sock[i].owner = this;
80  sock[i].outsock = -1;
81  sock[i].insock = -1;
82  sock[i].inevent = NULL;
83  sock[i].outevent = NULL;
84  sock[i].wouldblock = false;
85  sock[i].data.msg_name = NULL;
86  sock[i].data.msg_namelen = 0;
87  sock[i].data.msg_control = NULL;
88  sock[i].data.msg_controllen = 0;
89  sock[i].data.msg_flags = 0;
90  sock[i].data.msg_iovlen = 0;
91  sock[i].data.msg_iov = NULL;
92  }
93  // parse the machines list, and extract the relevant address information
94  for (size_t i = 0;i < machines.size(); ++i) {
95  // extract the port number
96  size_t pos = machines[i].find(":");
97  ASSERT_NE(pos, std::string::npos);
98  std::string address = machines[i].substr(0, pos);
99  size_t port = boost::lexical_cast<size_t>(machines[i].substr(pos+1));
100 
101  struct hostent* ent = gethostbyname(address.c_str());
102  ASSERT_EQ(ent->h_length, 4);
103  uint32_t addr = *reinterpret_cast<uint32_t*>(ent->h_addr_list[0]);
104 
105  all_addrs[i] = addr;
106  ASSERT_LT(port, 65536);
107  portnums[i] = (uint16_t)(port);
108  }
109  network_bytessent = 0;
110  buffered_len = 0;
111  // if sock handle is set
112  std::map<std::string, std::string>::const_iterator iter =
113  initopts.find("__sockhandle__");
114  if (iter != initopts.end()) {
115  open_listening(atoi(iter->second.c_str()));
116  } else {
117  open_listening();
118  }
119  for(size_t i = 0;i < nprocs; ++i) connect(i);
120  // wait for all incoming connections
121  insock_lock.lock();
122  size_t prevconnected = -1;
123  while(1) {
124  size_t connected = 0;
125  for (size_t i = 0;i < sock.size(); ++i) {
126  connected += (sock[i].insock != -1);
127  }
128  if (connected == sock.size()) {
129  break;
130  }
131  if (prevconnected != connected) {
132  logstream(LOG_INFO) << curmachineid << ": Waiting for " << sock.size() - connected
133  << " more hosts..." << std::endl;
134  }
135  prevconnected = connected;
136  insock_cond.wait(insock_lock);
137  }
138  insock_lock.unlock();
139 
140  // everyone is connected.
141  // Construct the eventbase
142  construct_events();
143  // we reserve the last 2 cores for communication
144  inthreads.launch(boost::bind(&dc_tcp_comm::receive_loop, this, inevbase), thread::cpu_count() - 2);
145  outthreads.launch(boost::bind(&dc_tcp_comm::send_loop, this, outevbase), thread::cpu_count() - 1);
146  is_closed = false;
147  }
148 
149  void dc_tcp_comm::construct_events() {
150  int ret = evthread_use_pthreads();
151  if (ret < 0) logstream(LOG_FATAL) << "Unable to initialize libevent with pthread support!" << std::endl;
152  // number of evs to create.
153  outevbase = event_base_new();
154  if (!outevbase) logstream(LOG_FATAL) << "Unable to construct libevent base" << std::endl;
155  send_all_timeout.owner = this;
156  send_all_timeout.send_all = true;
157  send_triggered_timeout.owner = this;
158  send_triggered_timeout.send_all = false;
159  send_all_event = event_new(outevbase, -1, EV_TIMEOUT | EV_PERSIST, on_send_event, &(send_all_timeout));
160  assert(send_all_event != NULL);
161  struct timeval t = {0, 10000};
162  event_add(send_all_event, &t);
163  send_triggered_event = event_new(outevbase, -1, EV_TIMEOUT | EV_PERSIST, on_send_event, &(send_triggered_timeout));
164  assert(send_triggered_event != NULL);
165 
166  inevbase = event_base_new();
167  if (!inevbase) logstream(LOG_FATAL) << "Unable to construct libevent base" << std::endl;
168 
169 
170  //register all event objects
171  for (size_t i = 0;i < sock.size(); ++i) {
172  sock[i].inevent = event_new(inevbase, sock[i].insock, EV_READ | EV_PERSIST | EV_ET,
173  on_receive_event, &(sock[i]));
174  if (sock[i].inevent == NULL) {
175  logstream(LOG_FATAL) << "Unable to register socket read event" << std::endl;
176  }
177 
178  sock[i].outevent = event_new(outevbase, sock[i].outsock, EV_WRITE | EV_PERSIST | EV_ET,
179  on_send_event, &(sock[i]));
180  if (sock[i].outevent == NULL) {
181  logstream(LOG_FATAL) << "Unable to register socket write event" << std::endl;
182  }
183 
184  event_add(sock[i].inevent, NULL);
185  //struct timeval t = {0, 10};
186  event_add(sock[i].outevent, NULL);
187  }
188  }
189 
190  void dc_tcp_comm::trigger_send_timeout(procid_t target, bool urgent) {
191  if (!urgent) {
192  if (sock[target].wouldblock == false &&
193  triggered_timeouts.get(target) == false) {
194  triggered_timeouts.set_bit(target);
195  event_active(send_triggered_event, EV_TIMEOUT, 1);
196  }
197  }
198  else {
199  process_sock(&(sock[target]));
200  }
201  }
202 
203 
204  void dc_tcp_comm::close() {
205  if (is_closed) return;
206  logstream(LOG_INFO) << "Closing listening socket" << std::endl;
207  // close the listening socket
208  if (listensock > 0) {
209  ::close(listensock);
210  listensock = -1;
211  }
212  // shutdown the listening thread
213  listenthread.join();
214 
215  // clear the outevent loop
216  event_base_loopbreak(outevbase);
217  outthreads.join();
218  for (size_t i = 0;i < sock.size(); ++i) {
219  event_free(sock[i].outevent);
220  }
221  event_free(send_triggered_event);
222  event_free(send_all_event);
223  event_base_free(outevbase);
224 
225 
226  logstream(LOG_INFO) << "Closing outgoing sockets" << std::endl;
227  // close all outgoing sockets
228  for (size_t i = 0;i < sock.size(); ++i) {
229  if (sock[i].outsock > 0) {
230  ::close(sock[i].outsock);
231  sock[i].outsock = -1;
232  }
233  }
234 
235  // clear the inevent loop
236  event_base_loopbreak(inevbase);
237  inthreads.join();
238  for (size_t i = 0;i < sock.size(); ++i) {
239  event_free(sock[i].inevent);
240  }
241  event_base_free(inevbase);
242 
243 
244  logstream(LOG_INFO) << "Closing incoming sockets" << std::endl;
245  // close all incoming sockets
246  for (size_t i = 0;i < sock.size(); ++i) {
247  if (sock[i].insock > 0) {
248  ::close(sock[i].insock);
249  sock[i].insock = -1;
250  }
251  }
252  is_closed = true;
253  }
254 
255 
256  bool dc_tcp_comm::send_till_block(socket_info& sockinfo) {
257  sockinfo.wouldblock = false;
258  // while there is still data to be sent
259  BEGIN_TRACEPOINT(tcp_send_call);
260  while(!sockinfo.outvec.empty()) {
261  sockinfo.outvec.fill_msghdr(sockinfo.data);
262  ssize_t ret = sendmsg(sockinfo.outsock, &sockinfo.data, 0);
263  if (ret < 0) {
264  END_TRACEPOINT(tcp_send_call);
265  if (errno == EWOULDBLOCK || errno == EAGAIN) {
266  sockinfo.wouldblock = true;
267  return false;
268  }
269  else {
270  logstream(LOG_FATAL) << "send error: " << strerror(errno) << std::endl;
271  return false;
272  }
273  }
274 
275 #ifdef COMM_DEBUG
276  logstream(LOG_INFO) << ret << " bytes --> " << sockinfo.id << std::endl;
277 #endif
278  network_bytessent.inc(ret);
279  sockinfo.outvec.sent(ret);
280  }
281  END_TRACEPOINT(tcp_send_call);
282  return true;
283  }
284 
285  int dc_tcp_comm::sendtosock(int sockfd, const char* buf, size_t len) {
286  size_t numsent = 0;
287  BEGIN_TRACEPOINT(tcp_send_call);
288  while (numsent < len) {
289  ssize_t ret = ::send(sockfd, buf + numsent, len - numsent, 0);
290  if (ret < 0) {
291  logstream(LOG_ERROR) << "send error: " << strerror(errno) << std::endl;
292  END_TRACEPOINT(tcp_send_call);
293  return errno;
294  }
295  numsent += ret;
296  }
297  END_TRACEPOINT(tcp_send_call);
298  return 0;
299  }
300 
301  void dc_tcp_comm::set_tcp_no_delay(int fd) {
302  int flag = 1;
303  int result = setsockopt(fd, /* socket affected */
304  IPPROTO_TCP, /* set option at TCP level */
305  TCP_NODELAY, /* name of option */
306  (char *) &flag,
307  sizeof(int));
308  if (result < 0) {
309  logstream(LOG_WARNING)
310  << "Unable to disable Nagle. Performance may be signifantly reduced"
311  << std::endl;
312  }
313  // set nonblocking
314  }
315 
316  void dc_tcp_comm::set_non_blocking(int fd) {
317  int flag = fcntl(fd, F_GETFL);
318  if (flag < 0) {
319  logstream(LOG_FATAL) << "Unable to get socket flags" << std::endl;
320  }
321  flag |= O_NONBLOCK;
322  if (fcntl(fd, F_SETFL, flag) < 0) {
323  logstream(LOG_FATAL) << "Unable to set socket as non-blocking" << std::endl;
324  }
325 
326  }
327 
328 
329  void dc_tcp_comm::new_socket(int newsock, sockaddr_in* otheraddr,
330  procid_t id) {
331  // figure out the address of the incoming connection
332  uint32_t addr = *reinterpret_cast<uint32_t*>(&(otheraddr->sin_addr));
333  // locate the incoming address in the list
334  logstream(LOG_INFO) << "Incoming connection from "
335  << inet_ntoa(otheraddr->sin_addr) << std::endl;
336  ASSERT_LT(id, all_addrs.size());
337  ASSERT_EQ(all_addrs[id], addr);
338  insock_lock.lock();
339  ASSERT_EQ(sock[id].insock, -1);
340  sock[id].insock = newsock;
341  insock_cond.signal();
342  insock_lock.unlock();
343  logstream(LOG_INFO) << "Proc " << procid() << " accepted connection "
344  << "from machine " << id << std::endl;
345  }
346 
347 
348 
349  void dc_tcp_comm::open_listening(int sockhandle) {
350  // open listening socket
351  if (sockhandle == 0) {
352  listensock = socket(AF_INET, SOCK_STREAM, 0);
353  // uninteresting boiler plate. Set the port number and socket type
354  sockaddr_in my_addr;
355  my_addr.sin_family = AF_INET;
356  my_addr.sin_port = htons(portnums[curid]);
357  my_addr.sin_addr.s_addr = INADDR_ANY;
358  memset(&(my_addr.sin_zero), '\0', 8);
359  logstream(LOG_INFO) << "Proc " << procid() << " Bind on "
360  << portnums[curid] << "\n";
361  if (bind(listensock, (sockaddr*)&my_addr, sizeof(my_addr)) < 0)
362  {
363  logstream(LOG_FATAL) << "bind: " << strerror(errno) << "\n";
364  ASSERT_TRUE(0);
365  }
366  }
367  else {
368  listensock = sockhandle;
369  }
370  logstream(LOG_INFO) << "Proc " << procid()
371  << " listening on " << portnums[curid] << "\n";
372  ASSERT_EQ(0, listen(listensock, 128));
373  // spawn a thread which loops around accept
374  listenthread.launch(boost::bind(&dc_tcp_comm::accept_handler, this));
375  } // end of open_listening
376 
377  void dc_tcp_comm::connect(size_t target) {
378  if (sock[target].outsock != -1) {
379  return;
380  } else {
381  int newsock = socket(AF_INET, SOCK_STREAM, 0);
382  set_tcp_no_delay(newsock);
383  sockaddr_in serv_addr;
384  serv_addr.sin_family = AF_INET;
385  // set the target port
386  serv_addr.sin_port = htons(portnums[target]);
387  // set the target address
388  serv_addr.sin_addr = *(struct in_addr*)&(all_addrs[target]);
389  memset(&(serv_addr.sin_zero), '\0', 8);
390  // Connect!
391  logstream(LOG_INFO) << "Trying to connect from "
392  << curid << " -> " << target
393  << " on port " << portnums[target] << "\n";
394  logger(LOG_INFO, "Destination IP = %s", inet_ntoa(serv_addr.sin_addr));
395  // retry 10 times at 1 second intervals
396  bool success = false;
397  for (size_t i = 0;i < 10; ++i) {
398  if (::connect(newsock, (sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) {
399  logstream(LOG_INFO)
400  << "connect " << curid << " to " << target << ": "
401  << strerror(errno) << ". Retrying...\n";
402  timer::sleep(1);
403  // posix says that
404  /* If connect() fails, the state of the socket is unspecified.
405  Conforming applications should close the file descriptor and
406  create a new socket before attempting to reconnect. */
407  ::close(newsock);
408  newsock = socket(AF_INET, SOCK_STREAM, 0);
409  set_tcp_no_delay(newsock);
410  } else {
411  // send my machine id
412  sendtosock(newsock, reinterpret_cast<char*>(&curid), sizeof(curid));
413  set_non_blocking(newsock);
414  success = true;
415  break;
416  }
417  }
418  if (!success) {
419  logstream(LOG_FATAL) << "Failed to establish connection" << std::endl;
420  }
421  // remember the socket
422  sock[target].outsock = newsock;
423  logstream(LOG_INFO) << "connection from " << curid << " to " << target
424  << " established." << std::endl;
425  }
426  } // end of connect
427 
428 
429 
430 
431 
432 ////////////////////////////////////////////////////////////////////////////
433 // These stuff run in seperate threads //
434 ////////////////////////////////////////////////////////////////////////////
435 
436  // waits for incoming connections
437  void dc_tcp_comm::accept_handler() {
438  pollfd pf;
439  pf.fd = listensock;
440  pf.events = POLLIN;
441  pf.revents = 0;
442  size_t numsocks_connected = 0;
443  logstream(LOG_INFO) << "Listening thread launched." << std::endl;
444  while(numsocks_connected < sock.size()) {
445  // wait for incoming event
446  poll(&pf, 1, 1000);
447  // if we have a POLLIN, we have an incoming socket request
448  if (pf.revents & POLLIN) {
449  logstream(LOG_INFO) << "Accepting...." << std::endl;
450  // accept the socket
451  sockaddr_in their_addr;
452  socklen_t namelen = sizeof(sockaddr_in);
453  int newsock = accept(listensock, (sockaddr*)&their_addr, &namelen);
454  logstream(LOG_INFO) << "Accepted" << std::endl;
455  if (newsock < 0) {
456  break;
457  }
458  // set the socket options and inform the
459  set_tcp_no_delay(newsock);
460  // before accepting the socket, get the machine number
461  procid_t remotemachineid = (procid_t)(-1);
462  ssize_t msglen = 0;
463  while(msglen != sizeof(procid_t)) {
464  int retval = recv(newsock, (char*)(&remotemachineid) + msglen,
465  sizeof(procid_t) - msglen, 0);
466  if (retval < 0) {
467  if (errno == EWOULDBLOCK || errno == EAGAIN) {
468  continue;
469  }
470  else {
471  logstream(LOG_FATAL) << "error: " << errno << " receive error: " << strerror(errno) << std::endl;
472  }
473  }
474  else if (retval > 0) {
475  msglen += retval;
476  }
477  else if (retval == 0) {
478  std::cout << "error: connection dropped." << std::endl;
479  ::close(newsock);
480  newsock = -1;
481  break;
482  }
483  }
484  if (newsock != -1) {
485  // register the new socket
486  set_non_blocking(newsock);
487  new_socket(newsock, &their_addr, remotemachineid);
488  ++numsocks_connected;
489  }
490  }
491  if (listensock == -1) {
492  // the owner has closed
493  break;
494  }
495  }
496  logstream(LOG_INFO) << "Listening thread quitting" << std::endl;
497  } // end of run
498 
499 
500  // libevent receive handler
501  void on_receive_event(int fd, short ev, void* arg) {
502  dc_tcp_comm::socket_info* sockinfo = (dc_tcp_comm::socket_info*)(arg);
503  dc_tcp_comm* comm = sockinfo->owner;
504  if (ev & EV_READ) {
505  // get a direct pointer to my receiver
506  dc_receive* receiver = comm->receiver[sockinfo->id];
507 
508  size_t buflength;
509  char *c = receiver->get_buffer(buflength);
510  while(1) {
511  ssize_t msglen = recv(fd, c, buflength, 0);
512  if (msglen < 0) {
513  if (errno == EAGAIN || errno == EWOULDBLOCK) break;
514  else {
515  logstream(LOG_FATAL) << "receive error: " << strerror(errno) << std::endl;
516  break;
517  }
518  }
519  else if (msglen == 0) {
520  // socket closed
521  break;
522  }
523  else if (msglen > 0) {
524  comm->network_bytesreceived.inc(msglen);
525  #ifdef COMM_DEBUG
526  logstream(LOG_INFO) << msglen << " bytes <-- "
527  << sockinfo->id << std::endl;
528  #endif
529  c = receiver->advance_buffer(c, msglen, buflength);
530  }
531  }
532  }
533  }
534 
535  void dc_tcp_comm::receive_loop(struct event_base* ev) {
536  logstream(LOG_INFO) << "Receive loop Started" << std::endl;
537  int ret = event_base_dispatch(ev);
538  if (ret != 0) {
539  logstream(LOG_FATAL) << "Receive loop Quit with " << ret << std::endl;
540  }
541  else {
542  logstream(LOG_INFO) << "Receive loop Stopped" << std::endl;
543  }
544  }
545 
546 
547  void dc_tcp_comm::check_for_new_data(dc_tcp_comm::socket_info& sockinfo) {
548  buffered_len.inc(sender[sockinfo.id]->get_outgoing_data(sockinfo.outvec));
549  }
550 
551 
552  inline void process_sock(dc_tcp_comm::socket_info* sockinfo) {
553  if (sockinfo->m.try_lock()) {
554  dc_tcp_comm* comm = sockinfo->owner;
555  // get a direct pointer to my receiver
556  if (sockinfo->wouldblock == false) {
557  comm->check_for_new_data(*sockinfo);
558  if (!sockinfo->outvec.empty()) {
559  comm->send_till_block(*sockinfo);
560  }
561  }
562  sockinfo->m.unlock();
563  }
564  }
565 
566  // libevent receive handler
567  void on_send_event(int fd, short ev, void* arg) {
568  if (ev & EV_WRITE) {
569  dc_tcp_comm::socket_info* sockinfo = (dc_tcp_comm::socket_info*)(arg);
570  sockinfo->wouldblock = false;
571  process_sock(sockinfo);
572  }
573  else if (ev & EV_TIMEOUT) {
574  dc_tcp_comm::timeout_event* te = (dc_tcp_comm::timeout_event*)(arg);
575  dc_tcp_comm* comm = te->owner;
576  if (te->send_all == false) {
577  // this is a triggered event
578  foreach(uint32_t i, comm->triggered_timeouts) {
579  comm->triggered_timeouts.clear_bit(i);
580  dc_tcp_comm::socket_info* sockinfo = &(comm->sock[i]);
581  process_sock(sockinfo);
582  }
583  } else {
584  // send all event
585  for(uint32_t i = 0;i < comm->sock.size(); ++i) {
586  dc_tcp_comm::socket_info* sockinfo = &(comm->sock[i]);
587  process_sock(sockinfo);
588  }
589  }
590  }
591  }
592 
593 
594  void dc_tcp_comm::send_loop(struct event_base* ev) {
595  logstream(LOG_INFO) << "Send loop Started" << std::endl;
596  int ret = event_base_dispatch(ev);
597  if (ret != 0) {
598  logstream(LOG_FATAL) << "Send loop Quit with " << ret << std::endl;
599  }
600  else {
601  logstream(LOG_INFO) << "Send loop Stopped" << std::endl;
602  }
603  }
604  }; // end of namespace dc_impl
605 }; // end of namespace graphlab
606