GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
safe_circular_char_buffer.cpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #include <graphlab/util/safe_circular_char_buffer.hpp>
25 #include <graphlab/parallel/pthread_tools.hpp>
26 #include <graphlab/logger/assertions.hpp>
27 
28 namespace graphlab {
29 
30  safe_circular_char_buffer::safe_circular_char_buffer(std::streamsize bufsize)
31  :bufsize(bufsize), head(0), tail(0), done(false), iswaiting(false){
32  ASSERT_GT(bufsize, 0);
33  buffer = (char*)malloc((size_t)bufsize);
34  }
35 
36  safe_circular_char_buffer::~safe_circular_char_buffer() {
37  free(buffer);
38  }
39 
41  mut.lock();
42  done = true;
43  cond.signal();
44  mut.unlock();
45  }
46 
47  // Head == tail implies empty
49  return (head == tail);
50  }
51 
52  std::streamsize safe_circular_char_buffer::size() const {
53  std::streamsize headval = head;
54  std::streamsize tailval = tail;
55  if (tailval >= headval) return tailval - headval;
56  else if (headval > tailval) return tailval + bufsize - headval;
57  return 0;
58  }
59 
60  std::streamsize safe_circular_char_buffer::free_space() const {
61  return bufsize - size() - 1;
62  }
63 
64  std::streamsize safe_circular_char_buffer::
65  write(const char* c, std::streamsize clen) {
66  mut.lock();
67  std::streamsize ret = write_unsafe(c, clen);
68  if (iswaiting && ret > 0) {
69  cond.signal();
70  }
71  mut.unlock();
72  return ret;
73  }
74 
75  std::streamsize safe_circular_char_buffer::
76  write_unsafe(const char* c, std::streamsize clen) {
77  // If the char array does not fit simply return
78  if (clen + size() >= bufsize) return 0;
79 
80  /// Adding c characters to the buffer
81  /// 0--------------H...body...T------------->Bufsize
82  /// 0--------------H...body...T--(Part A)--->Bufsize
83  /// T--(Part B)----H...body....ccccccccccccc>Bufsize
84  /// 0cccccccccccT--H...body....ccccccccccccc>Bufsize
85 
86  // First we copy the contents into Part A
87  std::streamsize firstcopy = std::min(clen, bufsize - tail);
88  memcpy(buffer + tail, c, (size_t)firstcopy);
89  // Move the tail to the end
90  tail += firstcopy;
91  // If tail moved to the end wrap around
92  if (tail == bufsize) tail = 0;
93  // If the copy is not complete
94  if (firstcopy < clen) {
95  // Assert: This only happens on wrape around
96  ASSERT_EQ(tail, 0);
97  // Determine what is left to be coppied
98  std::streamsize secondcopy = clen - firstcopy;
99  ASSERT_GT(secondcopy, 0);
100  // Do the copy and advance the pointer
101  memcpy(buffer, c + firstcopy, (size_t)secondcopy);
102  tail += secondcopy;
103 
104  }
105  return clen;
106  }
107 
108  std::streamsize safe_circular_char_buffer::
109  introspective_read(char* &s, std::streamsize clen) {
110  ASSERT_GT(clen,0);
111  // early termination check
112  if(empty() || clen == 0) {
113  s = NULL;
114  return 0;
115  }
116  const std::streamsize curtail = tail;
117 
118  s = buffer + head;
119  // how much we do read? we can go up to the end of the requested
120  // size or until a looparound
121  // case 1: no looparound |------H......T----->
122  // case 2: looparound |...T--H............>
123  std::streamsize available_readlen = 0;
124  const bool loop_around(curtail < head);
125  if (loop_around) available_readlen = bufsize - head;
126  else available_readlen = curtail - head;
127  ASSERT_GE(available_readlen, 0);
128  const std::streamsize actual_readlen =
129  std::min(available_readlen, clen);
130  ASSERT_GT(actual_readlen, 0);
131  return actual_readlen;
132  }
133 
134 
135  std::streamsize safe_circular_char_buffer::
136  blocking_introspective_read(char* &s, std::streamsize clen) {
137  // try to read
138  std::streamsize ret = introspective_read(s, clen);
139  if (ret != 0) return ret;
140  // if read failed. acquire the lock and try again
141  while(1) {
142  iswaiting = true;
143  mut.lock();
144  while (empty() && !done) cond.wait(mut);
145  iswaiting = false;
146  mut.unlock();
147  std::streamsize ret = introspective_read(s, clen);
148  if (ret != 0) return ret;
149  if (done) return 0;
150  }
151  }
152 
153 
154  void safe_circular_char_buffer::
155  advance_head(const std::streamsize advance_len) {
156  ASSERT_GE(advance_len, 0);
157  ASSERT_LE(advance_len, size());
158  // advance the head forward as far as possible
159  head += advance_len;
160  // If head wraps around move head to begginning and then offset
161  if (head >= bufsize) head -= bufsize;
162  } // end of advance head
163 
164 
165 
166 } // end of namespace
167 
168