GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
dc_stream_receive.hpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef DC_STREAM_RECEIVE_HPP
25 #define DC_STREAM_RECEIVE_HPP
26 #include <boost/type_traits/is_base_of.hpp>
27 #include <graphlab/rpc/circular_char_buffer.hpp>
28 #include <graphlab/rpc/dc_internal_types.hpp>
29 #include <graphlab/rpc/dc_types.hpp>
30 #include <graphlab/rpc/dc_receive.hpp>
31 #include <graphlab/parallel/atomic.hpp>
32 #include <graphlab/parallel/pthread_tools.hpp>
34 namespace graphlab {
35 class distributed_control;
36 
37 namespace dc_impl {
38 
39 /**
40  * \internal
41  \ingroup rpc
42  Receiver processor for the dc class.
43  The job of the receiver is to take as input a byte stream
44  (as received from the socket) and cut it up into meaningful chunks.
45  This can be thought of as a receiving end of a multiplexor.
46 
47  This is the default unbuffered receiver.
48 */
49 class dc_stream_receive: public dc_receive{
50  public:
51 
52  dc_stream_receive(distributed_control* dc, procid_t associated_proc):
53  header_read(0), writebuffer(NULL),
54  write_buffer_written(0), dc(dc), associated_proc(associated_proc)
55  { }
56 
57  private:
58 
59  size_t header_read;
60  block_header_type cur_chunk_header;
61  char* writebuffer;
62  size_t write_buffer_written;
63 
64  /// pointer to the owner
65  distributed_control* dc;
66 
67  procid_t associated_proc;
68 
69  void shutdown();
70 
71 
72  char* get_buffer(size_t& retbuflength);
73 
74 
75  char* advance_buffer(char* c, size_t wrotelength,
76  size_t& retbuflength);
77 
78 };
79 
80 
81 } // namespace dc_impl
82 } // namespace graphlab
83 #endif
84