27 #include <graphlab/rpc/dc.hpp>
28 #include <graphlab/rpc/dc_internal_types.hpp>
29 #include <graphlab/rpc/dc_stream_receive.hpp>
37 char* dc_stream_receive::get_buffer(
size_t& retbuflength) {
38 if (header_read <
sizeof(block_header_type)) {
39 retbuflength =
sizeof(block_header_type) - header_read;
40 return (reinterpret_cast<char*>(&cur_chunk_header) + header_read);
43 retbuflength = cur_chunk_header - write_buffer_written;
44 return writebuffer + write_buffer_written;
49 char* dc_stream_receive::advance_buffer(
char* c,
size_t wrotelength,
50 size_t& retbuflength) {
51 if (header_read !=
sizeof(block_header_type)) {
53 header_read += wrotelength;
54 ASSERT_LE(header_read,
sizeof(block_header_type));
56 if (header_read <
sizeof(block_header_type)) {
58 retbuflength =
sizeof(block_header_type) - header_read;
59 return (reinterpret_cast<char*>(&cur_chunk_header) + header_read);
64 ASSERT_TRUE(writebuffer == NULL);
65 writebuffer = (
char*)malloc(cur_chunk_header);
66 retbuflength = cur_chunk_header;
67 write_buffer_written = 0;
74 ASSERT_EQ(header_read,
sizeof(block_header_type));
75 write_buffer_written += wrotelength;
76 if (write_buffer_written < cur_chunk_header) {
77 retbuflength = cur_chunk_header - write_buffer_written;
78 return writebuffer + write_buffer_written;
84 dc->deferred_function_call_chunk(writebuffer, cur_chunk_header, associated_proc);
86 write_buffer_written = 0;
88 return get_buffer(retbuflength);
93 void dc_stream_receive::shutdown() { }