GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
circular_char_buffer.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_CIRCULAR_CHAR_BUFFER_HPP
25 #define GRAPHLAB_CIRCULAR_CHAR_BUFFER_HPP
26 #include <string>
27 #include <iostream>
28 #include <graphlab/logger/assertions.hpp>
29 #include <boost/iostreams/stream.hpp>
30 #include <boost/iostreams/categories.hpp>
31 namespace graphlab {
32 
33  /**
34  * \ingroup rpc
35  * \internal
36  A self-resizing circular buffer of characters
37  */
38  class circular_char_buffer {
39  public:
40 
41  /// Creates a circular buffer with some initial capacity
42  circular_char_buffer(std::streamsize initialsize = 1024);
43  /// copy constructor
44  circular_char_buffer(const circular_char_buffer &src);
45  /// assignment operator
46  circular_char_buffer& operator=(const circular_char_buffer& src);
47 
48  /// destructors
49  ~circular_char_buffer();
50 
51  /// writes len bytes into the buffer
52  std::streamsize write(const char* c, std::streamsize clen);
53 
54  /** tries to peek up to 'len' bytes from the buffer.
55  the actual number of bytes read will be returned.
56  This is a non-destructive operation */
57  std::streamsize peek(char* c, std::streamsize clen) const;
58 
59  /** reads up to 'len' bytes from the buffer.
60  the actual number of bytes read will be returned.
61  This is a destructive operation */
62  std::streamsize read(char* c, std::streamsize clen);
63 
64 
65  /** tries to peek up to 'len' bytes from the buffer.
66  the actual number of bytes read will be returned.
67  This is a non-destructive operation. string overload of peek() */
68  std::streamsize peek(std::string &s, std::streamsize clen) const;
69 
70  /** reads up to 'len' bytes from the buffer. the actual number of
71  bytes read will be returned. This is a destructive
72  operation. string overload of read() */
73  std::streamsize read(std::string &s, std::streamsize clen);
74 
75  /** skip some number of input bytes. Returns the number of bytes
76  actually skipped*/
77  std::streamsize skip(std::streamsize clen);
78 
79  /** reserves at least s bytes of capacity. Tries to perform as few
80  memory copies as possible. No change is made if s is smaller
81  than the current capacity. */
82  void reserve(std::streamsize s);
83 
84  /** Squeezes out all empty capacity in the buffer so that
85  the capacity is the same as the length of the buffer */
86  void squeeze();
87 
88 
89  /** Rotates the buffer so that the head is at index 0.
90  buffer reserved size is preserved*/
91  void align();
92 
93 
94  /** Returns true if realignment requires a reallocation */
95  bool align_requires_alloc();
96 
97  /**
98  * Returns a pointer (through s) and a length of the read. This
99  * pointer is a direct pointer into the internal buffer of this
100  * datastructure. The pointer is valid as long as no other
101  * operations are performed on this structure. The length of the
102  * introspective_read may be less than the actual length of the
103  * buffer. Multiple calls to introspective_read may be necessary
104  * to read all data in the buffer. If the function returns 0, the
105  * buffer is empty.
106  */
107  std::streamsize introspective_read(char* &s);
108 
109  /**
110  * Returns a pointer (through s) and a length of the read. This
111  * pointer is a direct pointer into the internal buffer of this
112  * datastructure. The pointer is valid as long as no other
113  * operations are performed on this structure. The length of the
114  * introspective_read may be less than the number of bytes
115  * requested. Multiple calls to introspective_read may be
116  * necessary to read all data in the buffer. If the function
117  * returns 0, the buffer is empty.
118  */
119  std::streamsize introspective_read(char* &s, std::streamsize clen);
120 
121  /**
122  Returns a pointer to the next empty region of the buffer. The
123  return value is the maximum contigious length writable. When
124  writes complete, advance_write should be used to integrate the
125  written bytes
126  */
127  std::streamsize introspective_write(char* &s);
128 
129  void advance_write(std::streamsize bytes);
130 
131  inline void consistency_check() const {
132  /* ASSERT_GE(head, 0); ASSERT_GE(tail, 0);
133  ASSERT_LT(head, bufsize); ASSERT_LE(tail, bufsize);
134  if (tail > head) ASSERT_EQ(tail - head, len);
135  else if (head < tail) ASSERT_EQ(head + bufsize - tail, len);
136  else if (head == tail) ASSERT_EQ(len, 0); */
137  }
138 
139  /** clears the stream */
140  void clear();
141 
142  /** Gets the number of characters in the stream */
143  inline std::streamsize size() const {
144  return len;
145  }
146 
147  /** Gets the size of the buffer.
148  \note: The useable space is reserved_size() - 1 */
149  inline std::streamsize reserved_size() const {
150  return bufsize;
151  }
152  private:
153 
154  inline bool buffer_full() const {
155  return len == bufsize;
156  }
157 
158  char* buffer;
159  /**
160  * points to the head of the queue.
161  * Reader reads from here
162  */
163  std::streamsize head;
164 
165  /**
166  * points to one past the end of the queue.
167  * writer writes to here. if tail == head, buffer must be empty
168  */
169  std::streamsize tail;
170  std::streamsize bufsize; // current size of the buffer
171  std::streamsize len; // number of bytes stored in the buffer
172  };
173 
174  /**
175  A boost source device which can attach to a circular buffer
176  */
178  circular_char_buffer_source(circular_char_buffer &buf,
179  size_t maxlen = size_t(-1)):buf(buf), maxlen(maxlen) { }
180 
181  circular_char_buffer &buf;
182  size_t maxlen;
183  typedef char char_type;
184  struct category : public boost::iostreams::source_tag { };
185 
186  /** to satisfy the optimally buffered tag. Since this is an
187  in-memory buffer. the optimal buffer size (for any wrapping
188  stream) is 0. */
189  inline std::streamsize optimal_buffer_size() const { return 0; }
190 
191  inline std::streamsize read(char* s, std::streamsize n) {
192  if ((size_t)(n) > maxlen) n = (std::streamsize)maxlen;
193  maxlen -= (size_t)n;
194  if (n == 0) return -1;
195  else return buf.read(s, n);
196  }
197  };
198 
199  /**
200  A boost sink device which can attach to a circular buffer
201  */
203  circular_char_buffer_sink(circular_char_buffer &buf):buf(buf) { }
204 
205  circular_char_buffer &buf;
206  typedef char char_type;
207  struct category: public boost::iostreams::sink_tag,
208  public boost::iostreams::multichar_tag { };
209 
210  /** to satisfy the optimally buffered tag. Since this is an
211  in-memory buffer. the optimal buffer size is 0. */
212  inline std::streamsize optimal_buffer_size() const { return 0; }
213 
214  inline std::streamsize write(const char* s, std::streamsize n) {
215  return buf.write(s, n);
216  }
217  };
218 
219  struct circular_char_buffer_device {
220  circular_char_buffer_device(circular_char_buffer &buf):buf(buf) { }
221 
222  circular_char_buffer &buf;
223  typedef char char_type;
224  struct category : public boost::iostreams::bidirectional_device_tag,
225  public boost::iostreams::optimally_buffered_tag{ };
226 
227  /** to satisfy the optimally buffered tag. Since this is an
228  in-memory buffer. the optimal buffer size is 0. */
229  inline std::streamsize optimal_buffer_size() const { return 0; }
230 
231  inline std::streamsize write(const char* s, std::streamsize n) {
232  return buf.write(s, n);
233  }
234 
235  inline std::streamsize read(char* s, std::streamsize n) {
236  return buf.read(s, n);
237  }
238  };
239 
240 }
241 #endif
242