24 #ifndef DC_TCP_COMM_HPP
25 #define DC_TCP_COMM_HPP
27 #include <sys/socket.h>
28 #include <netinet/in.h>
34 #include <graphlab/parallel/pthread_tools.hpp>
35 #include <graphlab/rpc/dc_types.hpp>
36 #include <graphlab/rpc/dc_internal_types.hpp>
37 #include <graphlab/rpc/dc_comm_base.hpp>
38 #include <graphlab/rpc/circular_iovec_buffer.hpp>
39 #include <graphlab/util/tracepoint.hpp>
40 #include <graphlab/util/dense_bitset.hpp>
45 void on_receive_event(
int fd,
short ev,
void* arg);
46 void on_send_event(
int fd,
short ev,
void* arg);
55 class dc_tcp_comm:
public dc_comm_base {
58 DECLARE_TRACER(tcp_send_call);
60 inline dc_tcp_comm() {
62 INITIALIZE_TRACER(tcp_send_call,
"dc_tcp_comm: send syscall");
65 size_t capabilities()
const {
84 void init(
const std::vector<std::string> &machines,
85 const std::map<std::string,std::string> &initopts,
87 std::vector<dc_receive*> receiver,
88 std::vector<dc_send*> senders);
97 inline bool channel_active(
size_t target)
const {
98 return (sock[target].outsock != -1);
120 inline size_t network_bytes_sent()
const {
121 return network_bytessent.value;
127 inline size_t network_bytes_received()
const {
128 return network_bytesreceived.value;
131 inline size_t send_queue_length()
const {
132 size_t a = network_bytessent.value;
133 size_t b = buffered_len.value;
142 void send(
size_t target,
const char* buf,
size_t len);
144 void trigger_send_timeout(
procid_t target,
bool urgent);
148 void set_tcp_no_delay(
int fd);
150 void set_non_blocking(
int fd);
153 void new_socket(
int newsock, sockaddr_in* otheraddr,
procid_t remotemachineid);
158 void open_listening(
int sockhandle = 0);
162 void connect(
size_t target);
165 int sendtosock(
int sockfd,
const char* buf,
size_t len);
174 std::vector<uint32_t> all_addrs;
175 std::map<uint32_t, procid_t> addr2id;
176 std::vector<uint16_t> portnums;
178 std::vector<dc_receive*> receiver;
179 std::vector<dc_send*> sender;
180 atomic<size_t> buffered_len;
193 struct event* inevent;
194 struct event* outevent;
198 circular_iovec_buffer outvec;
203 conditional insock_cond;
205 struct timeout_event {
210 std::vector<socket_info> sock;
218 void send_all(socket_info& sockinfo);
219 bool send_till_block(socket_info& sockinfo);
220 void check_for_new_data(socket_info& sockinfo);
221 void construct_events();
226 atomic<size_t> network_bytessent;
227 atomic<size_t> network_bytesreceived;
230 thread_group inthreads;
231 void receive_loop(
struct event_base*);
233 friend void process_sock(socket_info* sockinfo);
234 friend void on_receive_event(
int fd,
short ev,
void* arg);
235 struct event_base* inevbase;
239 thread_group outthreads;
240 void send_loop(
struct event_base*);
241 friend void on_send_event(
int fd,
short ev,
void* arg);
242 struct event_base* outevbase;
243 struct event* send_triggered_event;
244 struct event* send_all_event;
245 timeout_event send_triggered_timeout;
246 timeout_event send_all_timeout;
248 fixed_dense_bitset<256> triggered_timeouts;
252 void accept_handler();
255 void process_sock(dc_tcp_comm::socket_info* sockinfo);