GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
circular_char_buffer.cpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #include <cstdlib>
25 #include <cstring>
26 #include <graphlab/logger/assertions.hpp>
27 #include <graphlab/rpc/circular_char_buffer.hpp>
28 
29 namespace graphlab {
30 
31  circular_char_buffer::circular_char_buffer(std::streamsize initial) {
32  initial = std::max<size_t>((size_t)initial, 4);
33  // allocate something to start with
34  buffer = (char*)malloc(initial);
35  head = 0;
36  tail = 0;
37  bufsize = initial;
38  len = 0;
39  }
40 
41  circular_char_buffer::circular_char_buffer(const circular_char_buffer &src) {
42  // allocate minimum of 4 bytes
43  bufsize = std::max<size_t>(src.size(), 4);
44  buffer = (char*)malloc(bufsize);
45 
46  // copy the buffer in src
47  src.peek(buffer, src.size());
48  // set the lengths
49  len = src.size();
50  tail = src.size();
51  head = 0;
52  if (tail == bufsize) tail = 0;
53  }
54 
55  circular_char_buffer&
56  circular_char_buffer::operator=(const circular_char_buffer& src) {
57  // reset head and tail
58  clear();
59  // make sure we have enough room
60  reserve(src.size());
61  src.peek(buffer, src.size());
62  len = src.size();
63  tail = src.size();
64  if (tail == bufsize) tail = 0;
65  return *this;
66  }
67 
68  void circular_char_buffer::reserve(std::streamsize s) {
69  // minimum of 4 bytes. disallow reserve of 0
70  if (s <= 4) s = 4;
71  // do nothing if s is smaller than the current buffer size
72  if (s <= bufsize) return;
73 
74  // do a reallocation
75  buffer = (char*)realloc((void*)buffer, s);
76 
77  // now, we need to be careful to reposition the head and tail
78  // there are 2 cases
79 
80  // case 1: no loop around,
81  // Easiest case. do nothing. just update bufsize
82  // case 2: we have a loop around,
83  // copy the left side of the loop around to the end.
84  if (tail >= head) {
85  bufsize = s;
86  }
87  else {
88  // how much excess space do we have now?
89  size_t excess = (size_t)s - bufsize;
90  // move up to excess bytes to the end
91  size_t movebytes = std::min<size_t>(tail, excess);
92  memcpy(buffer + bufsize, buffer, movebytes);
93  // move the remaining bytes to the start of the buffer
94  memmove(buffer, buffer+movebytes, tail - movebytes);
95  // update buftail
96  // if movebytes == tail, then tail has been wiped out
97  // and it is no longer a looparound
98  bufsize = s;
99  tail = (head + len) % bufsize;
100 
101  }
102  consistency_check();
103  }
104 
105  void circular_char_buffer::squeeze() {
106  // squeeze to a minimum of 4 bytes
107  if (bufsize <= 4) return;
108  // 2 cases
109  // case 1: no loop around
110  // case 2: loop around. Easiest solution is to allocate a new buffer
111  if (tail >= head) {
112  if (head > 0) memmove(buffer, buffer+head, len);
113  std::streamsize efflen = std::max(len + 1, std::streamsize(4));
114  buffer = (char*)realloc((void*)buffer, efflen);
115  head = 0;
116  tail = len;
117  bufsize = efflen;
118  }
119  else {
120  // allocate a new buffer
121  std::streamsize efflen = std::max(len + 1, std::streamsize(4));
122  char *newbuf = (char*)malloc(efflen);
123  // read into this buffer
124  peek(newbuf, len);
125  // free the old buffer
126  free(buffer);
127  buffer = newbuf;
128  head = 0;
129  tail = len;
130  bufsize = efflen;
131  }
132  consistency_check();
133  }
134 
135 
136  void circular_char_buffer::align() {
137  // squeeze to a minimum of 4 bytes
138  if (bufsize <= 4) return;
139  // 2 cases
140  // case 1: no loop around
141  // case 2: loop around. Easiest solution is to allocate a new buffer
142  if (tail >= head) {
143  if (head > 0) memmove(buffer, buffer+head, len);
144  head = 0;
145  tail = len;
146  }
147  else {
148  // allocate a new buffer
149  char *newbuf = (char*)malloc(bufsize);
150  // read into this buffer
151  peek(newbuf, len);
152  // free the old buffer
153  free(buffer);
154  buffer = newbuf;
155  head = 0;
156  tail = len;
157  }
158  }
159 
160  bool circular_char_buffer::align_requires_alloc() {
161  // squeeze to a minimum of 4 bytes
162  if (bufsize <= 4) return false;
163  // 2 cases
164  // case 1: no loop around
165  // case 2: loop around. Easiest solution is to allocate a new buffer
166  if (tail >= head) {
167  return false;
168  }
169  else {
170  return true;
171  }
172  }
173  std::streamsize circular_char_buffer::peek(char* c,
174  std::streamsize clen) const {
175  std::streamsize readlen = std::min(clen, len);
176  // eliminate the case where head == tail, but buffer is empty
177  if (readlen == 0) return 0;
178 
179  // first copy from head to end of buffer
180  std::streamsize firstcopy = std::min(bufsize - head, readlen);
181  memcpy(c, buffer+head, firstcopy);
182  if (firstcopy == readlen) return readlen;
183  // second copy from beginning of buffer to tail
184  std::streamsize secondcopy = std::min(tail, readlen - firstcopy);
185  memcpy(c+firstcopy, buffer, secondcopy);
186  consistency_check();
187 
188  return readlen;
189 
190  }
191 
192  std::streamsize circular_char_buffer::skip(std::streamsize clen) {
193  std::streamsize readlen = std::min(clen, len);
194  head += readlen;
195  if (head >= bufsize) head -= bufsize;
196  len -= readlen;
197  consistency_check();
198  return readlen;
199  }
200 
201  std::streamsize circular_char_buffer::read(char* c,
202  std::streamsize clen) {
203  if (len == 0) return -1;
204  std::streamsize readlen = peek(c, clen);
205  skip(readlen);
206  consistency_check();
207 
208  return readlen;
209  }
210 
211  std::streamsize circular_char_buffer::peek(std::string &c,
212  std::streamsize clen) const{
213  c.clear();
214  c.resize(clen);
215  return peek(const_cast<char*>(c.c_str()), clen);
216  }
217 
218  std::streamsize circular_char_buffer::read(std::string &c,
219  std::streamsize clen) {
220  c.clear();
221  c.resize(clen);
222  return read(const_cast<char*>(c.c_str()), clen);
223  }
224 
225 
226  std::streamsize circular_char_buffer::write(const char* c,
227  std::streamsize clen) {
228  // if we do not have enough capacity.
229  // make sure we have enough capacity
230  reserve(clen + len + 1);
231  len += clen;
232 
233  std::streamsize firstcopy = std::min(clen, bufsize - tail);
234  memcpy(buffer + tail, c, firstcopy);
235  tail += firstcopy;
236  if (tail == bufsize) tail = 0;
237  if (firstcopy == clen) {
238  consistency_check();
239  return clen;
240  }
241 
242  std::streamsize secondcopy = clen - firstcopy;
243  memcpy(buffer, c + firstcopy, secondcopy);
244  tail += secondcopy;
245  consistency_check();
246  return clen;
247  }
248 
249  void circular_char_buffer::clear() {
250  head = 0; tail = 0; len = 0;
251  }
252 
253  circular_char_buffer::~circular_char_buffer() {
254  free(buffer);
255  }
256 
257  std::streamsize circular_char_buffer::introspective_read(char* &s) {
258  if (len == 0) {
259  s = NULL;
260  return 0;
261  }
262  s = buffer + head;
263  // how much we do read?
264  // we can go up to the end of the buffer, or until a looparound
265  // case 1: no looparound
266  // case 2: looparound
267  std::streamsize readlen = 0;
268  if (tail > head) {
269  readlen = tail - head;
270  }
271  else {
272  readlen = bufsize - head;
273  }
274  skip(readlen);
275  return readlen;
276  }
277 
278  std::streamsize circular_char_buffer::introspective_read(char* &s, std::streamsize clen) {
279  if (len == 0) {
280  s = NULL;
281  return 0;
282  }
283  s = buffer + head;
284  // how much we do read?
285  // we can go up to the end of the buffer, or until a looparound
286  // case 1: no looparound
287  // case 2: looparound
288  std::streamsize readlen = 0;
289  if (tail > head) {
290  readlen = tail - head;
291  }
292  else {
293  readlen = bufsize - head;
294  }
295  if (clen < readlen) readlen = clen;
296 
297  skip(readlen);
298  return readlen;
299  }
300 
301 
302  std::streamsize circular_char_buffer::introspective_write(char* &s) {
303  s = buffer + tail;
304  if (tail >= head) {
305  // case 1. no looparound.
306  return bufsize - tail - (head==0);
307  }
308  else {
309  // case 2 looparound
310  return head - tail - 1;
311  }
312  }
313 
314  void circular_char_buffer::advance_write(std::streamsize bytes) {
315  tail += bytes;
316  if (tail >= bufsize) tail -= bufsize;
317  len += bytes;
318  consistency_check();
319  }
320 
321 };
322