GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
dc_buffered_stream_send2.hpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef DC_BUFFERED_STREAM_SEND2_HPP
25 #define DC_BUFFERED_STREAM_SEND2_HPP
26 #include <iostream>
27 #include <boost/function.hpp>
28 #include <boost/bind.hpp>
29 #include <boost/type_traits/is_base_of.hpp>
30 #include <graphlab/rpc/dc_internal_types.hpp>
31 #include <graphlab/rpc/dc_types.hpp>
32 #include <graphlab/rpc/dc_comm_base.hpp>
33 #include <graphlab/rpc/dc_send.hpp>
34 #include <graphlab/parallel/pthread_tools.hpp>
35 #include <graphlab/util/inplace_lf_queue.hpp>
37 namespace graphlab {
38 class distributed_control;
39 
40 namespace dc_impl {
41 
42 
43 /**
44  * \internal
45  \ingroup rpc
46 Sender for the dc class.
47  The job of the sender is to take as input data blocks of
48  pieces which should be sent to a single destination socket.
49  This can be thought of as a sending end of a multiplexor.
50  This class performs buffered transmissions using an blocking
51  queue with one call per queue entry.
52  A seperate thread is used to transmit queue entries. Rudimentary
53  write combining is used to decrease transmission overhead.
54  This is typically the best performing sender.
55 
56  This can be enabled by passing "buffered_queued_send=yes"
57  in the distributed control initstring.
58 
59  dc_buffered_stream_send22 is similar, but does not perform write combining.
60 
61 */
62 
63 class dc_buffered_stream_send2: public dc_send{
64  public:
65  dc_buffered_stream_send2(distributed_control* dc,
66  dc_comm_base *comm,
67  procid_t target) :
68  dc(dc), comm(comm), target(target),
69  writebuffer_totallen(0) {
70  writebuffer_totallen.value = 0;
71  approx_send_queue_size = 0;
72  }
73 
74  ~dc_buffered_stream_send2() {
75  }
76 
77 
78 
79 
80  /** Called to send data to the target. The caller transfers control of
81  the pointer. The caller MUST ensure that the data be prefixed
82  with sizeof(size_t) + sizeof(packet_hdr) extra bytes at the start for
83  placement of the packet header. */
84  void send_data(procid_t target,
85  unsigned char packet_type_mask,
86  char* data, size_t len);
87 
88 
89  /** Sends the data but without transferring control of the pointer.
90  The function will make a copy of the data before sending it.
91  Unlike send_data, no padding is necessary. */
92  void copy_and_send_data(procid_t target,
93  unsigned char packet_type_mask,
94  char* data, size_t len);
95 
96  size_t get_outgoing_data(circular_iovec_buffer& outdata);
97 
98 
99  inline size_t bytes_sent() {
100  return bytessent.value;
101  }
102 
103  size_t send_queue_length() const {
104  return writebuffer_totallen.value;
105  }
106 
107  void flush();
108 
109  private:
110  /// pointer to the owner
111  distributed_control* dc;
112  dc_comm_base *comm;
113  procid_t target;
114 
115  atomic<size_t> writebuffer_totallen;
116 
117  inplace_lf_queue sendqueue;
118 
119  atomic<size_t> bytessent;
120  volatile size_t approx_send_queue_size;
121 
122  mutex lock;
123 };
124 
125 
126 
127 } // namespace dc_impl
128 } // namespace graphlab
129 #endif // DC_BUFFERED_STREAM_SEND_EXPQUEUE_HPP
130