GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
dc_buffered_stream_send2.cpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #include <iostream>
25 #include <boost/iostreams/stream.hpp>
26 
27 #include <graphlab/rpc/dc.hpp>
28 #include <graphlab/rpc/dc_buffered_stream_send2.hpp>
29 #include <graphlab/util/branch_hints.hpp>
30 namespace graphlab {
31 namespace dc_impl {
32 
33  void dc_buffered_stream_send2::send_data(procid_t target,
34  unsigned char packet_type_mask,
35  char* data, size_t len) {
36  size_t actual_data_len = len - sizeof(packet_hdr) - sizeof(size_t);
37 
38  if ((packet_type_mask & CONTROL_PACKET) == 0) {
39  if (packet_type_mask & (STANDARD_CALL)) {
40  dc->inc_calls_sent(target);
41  }
42  }
43  writebuffer_totallen.inc(actual_data_len + sizeof(packet_hdr));
44 
45  // build the packet header
46  packet_hdr* hdr = reinterpret_cast<packet_hdr*>(data + sizeof(size_t));
47  memset(hdr, 0, sizeof(packet_hdr));
48 
49  hdr->len = len - sizeof(packet_hdr) - sizeof(size_t);
50  hdr->src = dc->procid();
51  hdr->sequentialization_key = dc->get_sequentialization_key();
52  hdr->packet_type_mask = packet_type_mask;
53 
54  //logstream(LOG_DEBUG) << " queueing to " << target << " message of length " << hdr->len << std::endl;
55  sendqueue.enqueue(data);
56 
57  size_t sqsize = approx_send_queue_size;
58  approx_send_queue_size = sqsize + 1;
59  if (sqsize == 256) comm->trigger_send_timeout(target, false);
60  else if ((packet_type_mask &
61  (CONTROL_PACKET | WAIT_FOR_REPLY | REPLY_PACKET))) {
62  comm->trigger_send_timeout(target, true);
63  }
64  }
65 
66  void dc_buffered_stream_send2::flush() {
67  comm->trigger_send_timeout(target, true);
68  }
69 
70  void dc_buffered_stream_send2::copy_and_send_data(procid_t target,
71  unsigned char packet_type_mask,
72  char* data, size_t len) {
73  char* c = (char*)malloc(sizeof(size_t) + sizeof(packet_hdr) + len);
74  memcpy(c + sizeof(size_t) + sizeof(packet_hdr), data, len);
75  send_data(target, packet_type_mask, c, len + sizeof(size_t) + sizeof(packet_hdr));
76  }
77 
78 
79  size_t dc_buffered_stream_send2::get_outgoing_data(circular_iovec_buffer& outdata) {
80  // fast exit if no buffer
81  if (writebuffer_totallen.value == 0) return 0;
82 
83  lock.lock();
84  char* sendqueue_head = sendqueue.dequeue_all();
85  if (sendqueue_head == NULL) {
86  lock.unlock();
87  return 0;
88  }
89  approx_send_queue_size = 0;
90  size_t real_send_len = 0;
91 
92  // construct the block msg header
93  block_header_type* blockheader = new block_header_type;
94  // now I don't really know what is the size of it yet.
95  // create a block header iovec
96  iovec blockheader_iovec;
97  blockheader_iovec.iov_base = reinterpret_cast<void*>(blockheader);
98  blockheader_iovec.iov_len = sizeof(block_header_type);
99  outdata.write(blockheader_iovec);
100 
101  size_t ctr = 0;
102  while(!sendqueue.end_of_dequeue_list(sendqueue_head)) {
103  ++ctr;
104  iovec tosend, tofree;
105  tofree.iov_base = sendqueue_head;
106  tosend.iov_base = sendqueue_head + sizeof(size_t);
107  // I need to read the length
108  packet_hdr* hdr = reinterpret_cast<packet_hdr*>(sendqueue_head + sizeof(size_t));
109  tosend.iov_len = hdr->len + sizeof(packet_hdr);
110  tofree.iov_len = tosend.iov_len + sizeof(size_t);
111  outdata.write(tosend, tofree);
112  real_send_len += tosend.iov_len;
113  // advance to the next list item
114  while(__unlikely__(inplace_lf_queue::get_next(sendqueue_head) == NULL)) {
115  asm volatile("pause\n": : :"memory");
116  }
117  sendqueue_head = inplace_lf_queue::get_next(sendqueue_head);
118  }
119  lock.unlock();
120 /* logstream(LOG_DEBUG) << "Sending: " << real_send_len << " bytes in "
121  << ctr << " messages to "<< target << std::endl; */
122  (*blockheader) = real_send_len;
123  writebuffer_totallen.dec(real_send_len);
124  return real_send_len + sizeof(block_header_type);
125  }
126 } // namespace dc_impl
127 } // namespace graphlab
128 
129