24 #include <graphlab/util/safe_circular_char_buffer.hpp>
25 #include <graphlab/parallel/pthread_tools.hpp>
26 #include <graphlab/logger/assertions.hpp>
30 safe_circular_char_buffer::safe_circular_char_buffer(std::streamsize bufsize)
31 :bufsize(bufsize), head(0), tail(0), done(false), iswaiting(false){
32 ASSERT_GT(bufsize, 0);
33 buffer = (
char*)malloc((
size_t)bufsize);
36 safe_circular_char_buffer::~safe_circular_char_buffer() {
49 return (head == tail);
53 std::streamsize headval = head;
54 std::streamsize tailval = tail;
55 if (tailval >= headval)
return tailval - headval;
56 else if (headval > tailval)
return tailval + bufsize - headval;
61 return bufsize -
size() - 1;
65 write(
const char* c, std::streamsize clen) {
68 if (iswaiting && ret > 0) {
78 if (clen +
size() >= bufsize)
return 0;
87 std::streamsize firstcopy = std::min(clen, bufsize - tail);
88 memcpy(buffer + tail, c, (
size_t)firstcopy);
92 if (tail == bufsize) tail = 0;
94 if (firstcopy < clen) {
98 std::streamsize secondcopy = clen - firstcopy;
99 ASSERT_GT(secondcopy, 0);
101 memcpy(buffer, c + firstcopy, (
size_t)secondcopy);
112 if(
empty() || clen == 0) {
116 const std::streamsize curtail = tail;
123 std::streamsize available_readlen = 0;
124 const bool loop_around(curtail < head);
125 if (loop_around) available_readlen = bufsize - head;
126 else available_readlen = curtail - head;
127 ASSERT_GE(available_readlen, 0);
128 const std::streamsize actual_readlen =
129 std::min(available_readlen, clen);
130 ASSERT_GT(actual_readlen, 0);
131 return actual_readlen;
139 if (ret != 0)
return ret;
148 if (ret != 0)
return ret;
154 void safe_circular_char_buffer::
155 advance_head(
const std::streamsize advance_len) {
156 ASSERT_GE(advance_len, 0);
157 ASSERT_LE(advance_len,
size());
161 if (head >= bufsize) head -= bufsize;