GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
dc_stream_receive.cpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 
25 #include <iostream>
26 
27 #include <graphlab/rpc/dc.hpp>
28 #include <graphlab/rpc/dc_internal_types.hpp>
29 #include <graphlab/rpc/dc_stream_receive.hpp>
30 
31 //#define DC_RECEIVE_DEBUG
32 namespace graphlab {
33 namespace dc_impl {
34 
35 
36 
37 char* dc_stream_receive::get_buffer(size_t& retbuflength) {
38  if (header_read < sizeof(block_header_type)) {
39  retbuflength = sizeof(block_header_type) - header_read;
40  return (reinterpret_cast<char*>(&cur_chunk_header) + header_read);
41  }
42  else {
43  retbuflength = cur_chunk_header - write_buffer_written;
44  return writebuffer + write_buffer_written;
45  }
46 }
47 
48 
49 char* dc_stream_receive::advance_buffer(char* c, size_t wrotelength,
50  size_t& retbuflength) {
51  if (header_read != sizeof(block_header_type)) {
52  // tcp is still writing into cur`writelen
53  header_read += wrotelength;
54  ASSERT_LE(header_read, sizeof(block_header_type));
55  // are we done reading the header?
56  if (header_read < sizeof(block_header_type)) {
57  // nope!
58  retbuflength = sizeof(block_header_type) - header_read;
59  return (reinterpret_cast<char*>(&cur_chunk_header) + header_read);
60  }
61  else {
62  // ok header is full. construct the return
63  // bufer and switch to it.
64  ASSERT_TRUE(writebuffer == NULL);
65  writebuffer = (char*)malloc(cur_chunk_header);
66  retbuflength = cur_chunk_header;
67  write_buffer_written = 0;
68  return writebuffer;
69  }
70  }
71  else {
72  // we read the entire header and is reading buffers now
73  // try to store the buffer and see if we are full yet.
74  ASSERT_EQ(header_read, sizeof(block_header_type));
75  write_buffer_written += wrotelength;
76  if (write_buffer_written < cur_chunk_header) {
77  retbuflength = cur_chunk_header - write_buffer_written;
78  return writebuffer + write_buffer_written;
79  }
80  }
81 
82  // if we reach here, we have an available block
83  // give away the buffer to dc
84  dc->deferred_function_call_chunk(writebuffer, cur_chunk_header, associated_proc);
85  writebuffer = NULL;
86  write_buffer_written = 0;
87  header_read = 0;
88  return get_buffer(retbuflength);
89 }
90 
91 
92 
93 void dc_stream_receive::shutdown() { }
94 
95 } // namespace dc_impl
96 } // namespace graphlab
97