GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
dc_tcp_comm.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef DC_TCP_COMM_HPP
25 #define DC_TCP_COMM_HPP
26 
27 #include <sys/socket.h>
28 #include <netinet/in.h>
29 
30 #include <vector>
31 #include <string>
32 #include <map>
33 
34 #include <graphlab/parallel/pthread_tools.hpp>
35 #include <graphlab/rpc/dc_types.hpp>
36 #include <graphlab/rpc/dc_internal_types.hpp>
37 #include <graphlab/rpc/dc_comm_base.hpp>
38 #include <graphlab/rpc/circular_iovec_buffer.hpp>
39 #include <graphlab/util/tracepoint.hpp>
40 #include <graphlab/util/dense_bitset.hpp>
41 namespace graphlab {
42 namespace dc_impl {
43 
44 
45 void on_receive_event(int fd, short ev, void* arg);
46 void on_send_event(int fd, short ev, void* arg);
47 
48 /**
49  \ingroup rpc
50  \internal
51 TCP implementation of the communications subsystem.
52 Provides a single object interface to sending/receiving data streams to
53 a collection of machines.
54 */
55 class dc_tcp_comm:public dc_comm_base {
56  public:
57 
58  DECLARE_TRACER(tcp_send_call);
59 
60  inline dc_tcp_comm() {
61  is_closed = true;
62  INITIALIZE_TRACER(tcp_send_call, "dc_tcp_comm: send syscall");
63  }
64 
65  size_t capabilities() const {
66  return COMM_STREAM;
67  }
68 
69  /**
70  this fuction should pause until all communication has been set up
71  and returns the number of systems in the network.
72  After which, all other remaining public functions (numprocs(), send(), etc)
73  should operate normally. Every received message should immediate trigger the
74  attached receiver
75 
76  machines: a vector of strings where each string is of the form [IP]:[portnumber]
77  initopts: unused
78  curmachineid: The ID of the current machine. machines[curmachineid] will be
79  the listening address of this machine
80 
81  recvcallback: A function pointer to the receiving function. This function must be thread-safe
82  tag: An additional pointer passed to the receiving function.
83  */
84  void init(const std::vector<std::string> &machines,
85  const std::map<std::string,std::string> &initopts,
86  procid_t curmachineid,
87  std::vector<dc_receive*> receiver,
88  std::vector<dc_send*> senders);
89 
90  /** shuts down all sockets and cleans up */
91  void close();
92 
93  ~dc_tcp_comm() {
94  close();
95  }
96 
97  inline bool channel_active(size_t target) const {
98  return (sock[target].outsock != -1);
99  }
100 
101  /**
102  Returns the number of machines in the network.
103  Only valid after call to init()
104  */
105  inline procid_t numprocs() const {
106  return nprocs;
107  }
108 
109  /**
110  * Returns the current machine ID.
111  * Only valid after call to init()
112  */
113  inline procid_t procid() const {
114  return curid;
115  }
116 
117  /**
118  * Returns the total number of bytes sent
119  */
120  inline size_t network_bytes_sent() const {
121  return network_bytessent.value;
122  }
123 
124  /**
125  * Returns the total number of bytes received
126  */
127  inline size_t network_bytes_received() const {
128  return network_bytesreceived.value;
129  }
130 
131  inline size_t send_queue_length() const {
132  size_t a = network_bytessent.value;
133  size_t b = buffered_len.value;
134  return b - a;
135  }
136 
137  /**
138  Sends the string of length len to the target machine dest.
139  Only valid after call to init();
140  Establishes a connection if necessary
141  */
142  void send(size_t target, const char* buf, size_t len);
143 
144  void trigger_send_timeout(procid_t target, bool urgent);
145 
146  private:
147  /// Sets TCP_NO_DELAY on the socket passed in fd
148  void set_tcp_no_delay(int fd);
149 
150  void set_non_blocking(int fd);
151 
152  /// called when listener receives an incoming socket request
153  void new_socket(int newsock, sockaddr_in* otheraddr, procid_t remotemachineid);
154 
155  /** opens the listening sock and spawns a thread to listen on it.
156  * Uses sockhandle if non-zero
157  */
158  void open_listening(int sockhandle = 0);
159 
160 
161  /// constructs a connection to the target machine
162  void connect(size_t target);
163 
164  /// wrapper around the standard send. but loops till the buffer is all sent
165  int sendtosock(int sockfd, const char* buf, size_t len);
166 
167 
168  procid_t curid; /// if od the current processor
169  procid_t nprocs; /// number of processors
170  bool is_closed; /// whether this socket is closed
171 
172 
173  /// all_addrs[i] will contain the IP address of machine i
174  std::vector<uint32_t> all_addrs;
175  std::map<uint32_t, procid_t> addr2id;
176  std::vector<uint16_t> portnums;
177 
178  std::vector<dc_receive*> receiver;
179  std::vector<dc_send*> sender;
180  atomic<size_t> buffered_len;
181 
182 
183 
184 
185 
186  /// All information about stuff regarding a particular sock
187  /// Passed to the receive handler
188  struct socket_info{
189  size_t id; /// which machine this is connected to
190  dc_tcp_comm* owner; /// this object
191  int outsock; /// FD of the outgoing socket
192  int insock; /// FD of the incoming socket
193  struct event* inevent; /// event object for incoming information
194  struct event* outevent; /// event object for outgoing information
195  bool wouldblock;
196  mutex m;
197 
198  circular_iovec_buffer outvec; /// outgoing data
199  struct msghdr data;
200  };
201 
202  mutex insock_lock; /// locks the insock field in socket_info
203  conditional insock_cond; /// triggered when the insock field in socket_info changes
204 
205  struct timeout_event {
206  bool send_all;
207  dc_tcp_comm* owner;
208  };
209 
210  std::vector<socket_info> sock;
211 
212  /**
213  * Sends as much of the buffer inside the sockinfo as possible
214  * until the send call will block or all sends are complete.
215  * Returns true when the buffer has been completely sent
216  * If wouldblock returns true, the next call to send_till_block may block
217  */
218  void send_all(socket_info& sockinfo);
219  bool send_till_block(socket_info& sockinfo);
220  void check_for_new_data(socket_info& sockinfo);
221  void construct_events();
222 
223 
224 
225  // counters
226  atomic<size_t> network_bytessent;
227  atomic<size_t> network_bytesreceived;
228 
229  //////////// Receiving Sockets //////////////////////
230  thread_group inthreads;
231  void receive_loop(struct event_base*);
232 
233  friend void process_sock(socket_info* sockinfo);
234  friend void on_receive_event(int fd, short ev, void* arg);
235  struct event_base* inevbase;
236 
237 
238  //////////// Sending Sockets //////////////////////
239  thread_group outthreads;
240  void send_loop(struct event_base*);
241  friend void on_send_event(int fd, short ev, void* arg);
242  struct event_base* outevbase;
243  struct event* send_triggered_event;
244  struct event* send_all_event;
245  timeout_event send_triggered_timeout;
246  timeout_event send_all_timeout;
247 
248  fixed_dense_bitset<256> triggered_timeouts;
249  //////////// Listening Sockets //////////////////////
250  int listensock;
251  thread listenthread;
252  void accept_handler();
253 };
254 
255 void process_sock(dc_tcp_comm::socket_info* sockinfo);
256 
257 } // namespace dc_impl
258 } // namespace graphlab
259 #endif
260