GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
circular_iovec_buffer.hpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_RPC_CIRCULAR_IOVEC_BUFFER_HPP
25 #define GRAPHLAB_RPC_CIRCULAR_IOVEC_BUFFER_HPP
26 #include <vector>
27 #include <sys/socket.h>
28 
29 namespace graphlab{
30 namespace dc_impl {
31 
32 /**
33  * \ingroup rpc
34  * \internal
35  * A circular buffer which maintains a parallel sequence of iovecs.
36  * One sequence is basic iovecs
37  * The other sequence is used for storing the original unomidifed pointers
38  * This is minimally checked. length must be a power of 2
39  */
40 struct circular_iovec_buffer {
41  inline circular_iovec_buffer(size_t len = 4096) {
42  v.resize(4096);
43  parallel_v.resize(4096);
44  head = 0;
45  tail = 0;
46  numel = 0;
47  }
48 
49  inline bool empty() const {
50  return numel == 0;
51  }
52 
53  size_t size() const {
54  return numel;
55  }
56 
57 
58  void reserve(size_t _n) {
59  if (_n <= v.size()) return;
60  size_t originalsize = v.size();
61  size_t n = v.size();
62  // must be a power of 2
63  while (n <= _n) n *= 2;
64 
65  v.resize(n);
66  parallel_v.resize(n);
67  if (head >= tail && numel > 0) {
68  // there is a loop around
69  // we need to fix the shift
70  size_t newtail = originalsize;
71  for (size_t i = 0;i < tail; ++i) {
72  v[newtail] = v[i];
73  parallel_v[newtail] = parallel_v[i];
74  ++newtail;
75  }
76  tail = newtail;
77  }
78  }
79 
80  inline void write(const std::vector<iovec>& other, size_t nwrite) {
81  reserve(numel + nwrite);
82  for (size_t i = 0;i < nwrite; ++i) {
83  v[tail] = other[i];
84  parallel_v[tail] = other[i];
85  tail = (tail + 1) & (v.size() - 1);
86  }
87  numel += nwrite;
88 
89  }
90 
91  /**
92  * Writes an entry into the buffer, resizing the buffer if necessary.
93  * This buffer will take over all iovec pointers and free them when done
94  */
95  inline void write(const iovec &entry) {
96  if (numel == v.size()) {
97  reserve(2 * numel);
98  }
99 
100  v[tail] = entry;
101  parallel_v[tail] = entry;
102  tail = (tail + 1) & (v.size() - 1); ++numel;
103  }
104 
105 
106  /**
107  * Writes an entry into the buffer, resizing the buffer if necessary.
108  * This buffer will take over all iovec pointers and free them when done.
109  * This version of write allows the iovec that is sent to be different from the
110  * iovec that is freed. (for instance, what is sent could be subarray of
111  * what is to be freed.
112  */
113  inline void write(const iovec &entry, const iovec& actual_ptr_entry) {
114  if (numel == v.size()) {
115  reserve(2 * numel);
116  }
117 
118  v[tail] = actual_ptr_entry;
119  parallel_v[tail] = entry;
120  tail = (tail + 1) & (v.size() - 1); ++numel;
121  }
122 
123 
124  /**
125  * Erases a single iovec from the head and free the pointer
126  */
127  inline void erase_from_head_and_free() {
128  free(v[head].iov_base);
129  head = (head + 1) & (v.size() - 1);
130  --numel;
131  }
132 
133  /**
134  * Fills a msghdr for unsent data.
135  */
136  void fill_msghdr(struct msghdr& data) {
137  data.msg_iov = &(parallel_v[head]);
138  if (head < tail) {
139  data.msg_iovlen = tail - head;
140  }
141  else {
142  data.msg_iovlen = v.size() - head;
143  }
144  data.msg_iovlen = std::min<size_t>(IOV_MAX, data.msg_iovlen);
145  }
146 
147  /**
148  * Advances the head as if some amount of data was sent.
149  */
150  void sent(size_t len) {
151  while(len > 0) {
152  size_t curv_sent_len = std::min(len, parallel_v[head].iov_len);
153  parallel_v[head].iov_len -= curv_sent_len;
154  parallel_v[head].iov_base = (char*)(parallel_v[head].iov_base) + curv_sent_len;
155  len -= curv_sent_len;
156  if (parallel_v[head].iov_len == 0) {
157  erase_from_head_and_free();
158  }
159  }
160  }
161 
162  std::vector<struct iovec> v;
163  std::vector<struct iovec> parallel_v;
164  size_t head;
165  size_t tail;
166  size_t numel;
167 };
168 
169 }
170 }
171 
172 #endif