24 #ifndef DC_BUFFERED_STREAM_SEND2_HPP
25 #define DC_BUFFERED_STREAM_SEND2_HPP
27 #include <boost/function.hpp>
28 #include <boost/bind.hpp>
29 #include <boost/type_traits/is_base_of.hpp>
30 #include <graphlab/rpc/dc_internal_types.hpp>
31 #include <graphlab/rpc/dc_types.hpp>
32 #include <graphlab/rpc/dc_comm_base.hpp>
33 #include <graphlab/rpc/dc_send.hpp>
34 #include <graphlab/parallel/pthread_tools.hpp>
35 #include <graphlab/util/inplace_lf_queue.hpp>
38 class distributed_control;
63 class dc_buffered_stream_send2:
public dc_send{
65 dc_buffered_stream_send2(distributed_control* dc,
68 dc(dc), comm(comm), target(target),
69 writebuffer_totallen(0) {
70 writebuffer_totallen.value = 0;
71 approx_send_queue_size = 0;
74 ~dc_buffered_stream_send2() {
85 unsigned char packet_type_mask,
86 char* data,
size_t len);
92 void copy_and_send_data(
procid_t target,
93 unsigned char packet_type_mask,
94 char* data,
size_t len);
96 size_t get_outgoing_data(circular_iovec_buffer& outdata);
99 inline size_t bytes_sent() {
100 return bytessent.value;
103 size_t send_queue_length()
const {
104 return writebuffer_totallen.value;
111 distributed_control* dc;
115 atomic<size_t> writebuffer_totallen;
117 inplace_lf_queue sendqueue;
119 atomic<size_t> bytessent;
120 volatile size_t approx_send_queue_size;
129 #endif // DC_BUFFERED_STREAM_SEND_EXPQUEUE_HPP