25 #include <boost/iostreams/stream.hpp>
27 #include <graphlab/rpc/dc.hpp>
28 #include <graphlab/rpc/dc_buffered_stream_send2.hpp>
29 #include <graphlab/util/branch_hints.hpp>
33 void dc_buffered_stream_send2::send_data(
procid_t target,
34 unsigned char packet_type_mask,
35 char* data,
size_t len) {
36 size_t actual_data_len = len -
sizeof(packet_hdr) -
sizeof(
size_t);
38 if ((packet_type_mask & CONTROL_PACKET) == 0) {
39 if (packet_type_mask & (STANDARD_CALL)) {
40 dc->inc_calls_sent(target);
43 writebuffer_totallen.inc(actual_data_len +
sizeof(packet_hdr));
46 packet_hdr* hdr =
reinterpret_cast<packet_hdr*
>(data +
sizeof(size_t));
47 memset(hdr, 0,
sizeof(packet_hdr));
49 hdr->len = len -
sizeof(packet_hdr) -
sizeof(
size_t);
50 hdr->src = dc->procid();
51 hdr->sequentialization_key = dc->get_sequentialization_key();
52 hdr->packet_type_mask = packet_type_mask;
55 sendqueue.enqueue(data);
57 size_t sqsize = approx_send_queue_size;
58 approx_send_queue_size = sqsize + 1;
59 if (sqsize == 256) comm->trigger_send_timeout(target,
false);
60 else if ((packet_type_mask &
61 (CONTROL_PACKET | WAIT_FOR_REPLY | REPLY_PACKET))) {
62 comm->trigger_send_timeout(target,
true);
66 void dc_buffered_stream_send2::flush() {
67 comm->trigger_send_timeout(target,
true);
70 void dc_buffered_stream_send2::copy_and_send_data(
procid_t target,
71 unsigned char packet_type_mask,
72 char* data,
size_t len) {
73 char* c = (
char*)malloc(
sizeof(
size_t) +
sizeof(packet_hdr) + len);
74 memcpy(c +
sizeof(
size_t) +
sizeof(packet_hdr), data, len);
75 send_data(target, packet_type_mask, c, len +
sizeof(
size_t) +
sizeof(packet_hdr));
79 size_t dc_buffered_stream_send2::get_outgoing_data(circular_iovec_buffer& outdata) {
81 if (writebuffer_totallen.value == 0)
return 0;
84 char* sendqueue_head = sendqueue.dequeue_all();
85 if (sendqueue_head == NULL) {
89 approx_send_queue_size = 0;
90 size_t real_send_len = 0;
93 block_header_type* blockheader =
new block_header_type;
96 iovec blockheader_iovec;
97 blockheader_iovec.iov_base =
reinterpret_cast<void*
>(blockheader);
98 blockheader_iovec.iov_len =
sizeof(block_header_type);
99 outdata.write(blockheader_iovec);
102 while(!sendqueue.end_of_dequeue_list(sendqueue_head)) {
104 iovec tosend, tofree;
105 tofree.iov_base = sendqueue_head;
106 tosend.iov_base = sendqueue_head +
sizeof(size_t);
108 packet_hdr* hdr =
reinterpret_cast<packet_hdr*
>(sendqueue_head +
sizeof(size_t));
109 tosend.iov_len = hdr->len +
sizeof(packet_hdr);
110 tofree.iov_len = tosend.iov_len +
sizeof(size_t);
111 outdata.write(tosend, tofree);
112 real_send_len += tosend.iov_len;
114 while(__unlikely__(inplace_lf_queue::get_next(sendqueue_head) == NULL)) {
115 asm volatile(
"pause\n": : :
"memory");
117 sendqueue_head = inplace_lf_queue::get_next(sendqueue_head);
122 (*blockheader) = real_send_len;
123 writebuffer_totallen.dec(real_send_len);
124 return real_send_len +
sizeof(block_header_type);