Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
r2net.c
Go to the documentation of this file.
1 /*
2  * r2net.c
3  *
4  * Copyright (c) 2011-2012, Dan Magenheimer, Oracle Corp.
5  *
6  * Ramster_r2net provides an interface between zcache and r2net.
7  *
8  * FIXME: support more than two nodes
9  */
10 
11 #include <linux/list.h>
12 #include "tcp.h"
13 #include "nodemanager.h"
14 #include "../tmem.h"
15 #include "../zcache.h"
16 #include "ramster.h"
17 
18 #define RAMSTER_TESTING
19 
20 #define RMSTR_KEY 0x77347734
21 
22 enum {
31 };
32 
33 #define RMSTR_R2NET_MAX_LEN \
34  (R2NET_MAX_PAYLOAD_BYTES - sizeof(struct tmem_xhandle))
35 
36 #include "tcp_internal.h"
37 
38 static struct r2nm_node *r2net_target_node;
39 static int r2net_target_nodenum;
40 
42 {
43  int ret = -1;
44 
45  r2net_target_node = r2nm_get_node_by_num(node_num);
46  if (r2net_target_node != NULL) {
47  r2net_target_nodenum = node_num;
48  r2nm_node_put(r2net_target_node);
49  ret = 0;
50  }
51  return ret;
52 }
53 
54 /* FIXME following buffer should be per-cpu, protected by preempt_disable */
55 static char ramster_async_get_buf[R2NET_MAX_PAYLOAD_BYTES];
56 
57 static int ramster_remote_async_get_request_handler(struct r2net_msg *msg,
58  u32 len, void *data, void **ret_data)
59 {
60  char *pdata;
61  struct tmem_xhandle xh;
62  int found;
63  size_t size = RMSTR_R2NET_MAX_LEN;
65  bool get_and_free = (msgtype == RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST);
66  unsigned long flags;
67 
68  xh = *(struct tmem_xhandle *)msg->buf;
69  if (xh.xh_data_size > RMSTR_R2NET_MAX_LEN)
70  BUG();
71  pdata = ramster_async_get_buf;
72  *(struct tmem_xhandle *)pdata = xh;
73  pdata += sizeof(struct tmem_xhandle);
74  local_irq_save(flags);
75  found = zcache_get_page(xh.client_id, xh.pool_id, &xh.oid, xh.index,
76  pdata, &size, true, get_and_free ? 1 : -1);
77  local_irq_restore(flags);
78  if (found < 0) {
79  /* a zero size indicates the get failed */
80  size = 0;
81  }
82  if (size > RMSTR_R2NET_MAX_LEN)
83  BUG();
84  *ret_data = pdata - sizeof(struct tmem_xhandle);
85  /* now make caller (r2net_process_message) handle specially */
87  return size + sizeof(struct tmem_xhandle);
88 }
89 
90 static int ramster_remote_async_get_reply_handler(struct r2net_msg *msg,
91  u32 len, void *data, void **ret_data)
92 {
93  char *in = (char *)msg->buf;
94  int datalen = len - sizeof(struct r2net_msg);
95  int ret = -1;
96  struct tmem_xhandle *xh = (struct tmem_xhandle *)in;
97 
98  in += sizeof(struct tmem_xhandle);
99  datalen -= sizeof(struct tmem_xhandle);
100  BUG_ON(datalen < 0 || datalen > PAGE_SIZE);
101  ret = ramster_localify(xh->pool_id, &xh->oid, xh->index,
102  in, datalen, xh->extra);
103 #ifdef RAMSTER_TESTING
104  if (ret == -EEXIST)
105  pr_err("TESTING ArrgREP, aborted overwrite on racy put\n");
106 #endif
107  return ret;
108 }
109 
111  u32 len, void *data, void **ret_data)
112 {
113  struct tmem_xhandle *xh;
114  char *p = (char *)msg->buf;
115  int datalen = len - sizeof(struct r2net_msg) -
116  sizeof(struct tmem_xhandle);
117  u16 msgtype = be16_to_cpu(msg->msg_type);
118  bool ephemeral = (msgtype == RMSTR_TMEM_PUT_EPH);
119  unsigned long flags;
120  int ret;
121 
122  xh = (struct tmem_xhandle *)p;
123  p += sizeof(struct tmem_xhandle);
124  zcache_autocreate_pool(xh->client_id, xh->pool_id, ephemeral);
125  local_irq_save(flags);
126  ret = zcache_put_page(xh->client_id, xh->pool_id, &xh->oid, xh->index,
127  p, datalen, true, ephemeral);
128  local_irq_restore(flags);
129  return ret;
130 }
131 
133  u32 len, void *data, void **ret_data)
134 {
135  struct tmem_xhandle *xh;
136  char *p = (char *)msg->buf;
137 
138  xh = (struct tmem_xhandle *)p;
139  p += sizeof(struct tmem_xhandle);
140  (void)zcache_flush_page(xh->client_id, xh->pool_id,
141  &xh->oid, xh->index);
142  return 0;
143 }
144 
146  u32 len, void *data, void **ret_data)
147 {
148  struct tmem_xhandle *xh;
149  char *p = (char *)msg->buf;
150 
151  xh = (struct tmem_xhandle *)p;
152  p += sizeof(struct tmem_xhandle);
153  (void)zcache_flush_object(xh->client_id, xh->pool_id, &xh->oid);
154  return 0;
155 }
156 
157 int r2net_remote_async_get(struct tmem_xhandle *xh, bool free, int remotenode,
158  size_t expect_size, uint8_t expect_cksum,
159  void *extra)
160 {
161  int nodenum, ret = -1, status;
162  struct r2nm_node *node = NULL;
163  struct kvec vec[1];
164  size_t veclen = 1;
165  u32 msg_type;
166  struct r2net_node *nn;
167 
168  node = r2nm_get_node_by_num(remotenode);
169  if (node == NULL)
170  goto out;
171  xh->client_id = r2nm_this_node(); /* which node is getting */
172  xh->xh_data_cksum = expect_cksum;
173  xh->xh_data_size = expect_size;
174  xh->extra = extra;
175  vec[0].iov_len = sizeof(*xh);
176  vec[0].iov_base = xh;
177 
178  node = r2net_target_node;
179  if (!node)
180  goto out;
181 
182  nodenum = r2net_target_nodenum;
183 
184  r2nm_node_get(node);
185  nn = r2net_nn_from_num(nodenum);
186  if (nn->nn_persistent_error || !nn->nn_sc_valid) {
187  ret = -ENOTCONN;
188  r2nm_node_put(node);
189  goto out;
190  }
191 
192  if (free)
194  else
195  msg_type = RMSTR_TMEM_ASYNC_GET_REQUEST;
196  ret = r2net_send_message_vec(msg_type, RMSTR_KEY,
197  vec, veclen, remotenode, &status);
198  r2nm_node_put(node);
199  if (ret < 0) {
200  if (ret == -ENOTCONN || ret == -EHOSTDOWN)
201  goto out;
202  if (ret == -EAGAIN)
203  goto out;
204  /* FIXME handle bad message possibilities here? */
205  pr_err("UNTESTED ret<0 in ramster_remote_async_get: ret=%d\n",
206  ret);
207  }
208  ret = status;
209 out:
210  return ret;
211 }
212 
213 #ifdef RAMSTER_TESTING
214 /* leave me here to see if it catches a weird crash */
215 static void ramster_check_irq_counts(void)
216 {
217  static int last_hardirq_cnt, last_softirq_cnt, last_preempt_cnt;
218  int cur_hardirq_cnt, cur_softirq_cnt, cur_preempt_cnt;
219 
220  cur_hardirq_cnt = hardirq_count() >> HARDIRQ_SHIFT;
221  if (cur_hardirq_cnt > last_hardirq_cnt) {
222  last_hardirq_cnt = cur_hardirq_cnt;
223  if (!(last_hardirq_cnt&(last_hardirq_cnt-1)))
224  pr_err("RAMSTER TESTING RRP hardirq_count=%d\n",
225  last_hardirq_cnt);
226  }
227  cur_softirq_cnt = softirq_count() >> SOFTIRQ_SHIFT;
228  if (cur_softirq_cnt > last_softirq_cnt) {
229  last_softirq_cnt = cur_softirq_cnt;
230  if (!(last_softirq_cnt&(last_softirq_cnt-1)))
231  pr_err("RAMSTER TESTING RRP softirq_count=%d\n",
232  last_softirq_cnt);
233  }
234  cur_preempt_cnt = preempt_count() & PREEMPT_MASK;
235  if (cur_preempt_cnt > last_preempt_cnt) {
236  last_preempt_cnt = cur_preempt_cnt;
237  if (!(last_preempt_cnt&(last_preempt_cnt-1)))
238  pr_err("RAMSTER TESTING RRP preempt_count=%d\n",
239  last_preempt_cnt);
240  }
241 }
242 #endif
243 
244 int r2net_remote_put(struct tmem_xhandle *xh, char *data, size_t size,
245  bool ephemeral, int *remotenode)
246 {
247  int nodenum, ret = -1, status;
248  struct r2nm_node *node = NULL;
249  struct kvec vec[2];
250  size_t veclen = 2;
251  u32 msg_type;
252  struct r2net_node *nn;
253 
254  BUG_ON(size > RMSTR_R2NET_MAX_LEN);
255  xh->client_id = r2nm_this_node(); /* which node is putting */
256  vec[0].iov_len = sizeof(*xh);
257  vec[0].iov_base = xh;
258  vec[1].iov_len = size;
259  vec[1].iov_base = data;
260 
261  node = r2net_target_node;
262  if (!node)
263  goto out;
264 
265  nodenum = r2net_target_nodenum;
266 
267  r2nm_node_get(node);
268 
269  nn = r2net_nn_from_num(nodenum);
270  if (nn->nn_persistent_error || !nn->nn_sc_valid) {
271  ret = -ENOTCONN;
272  r2nm_node_put(node);
273  goto out;
274  }
275 
276  if (ephemeral)
277  msg_type = RMSTR_TMEM_PUT_EPH;
278  else
279  msg_type = RMSTR_TMEM_PUT_PERS;
280 #ifdef RAMSTER_TESTING
281  /* leave me here to see if it catches a weird crash */
282  ramster_check_irq_counts();
283 #endif
284 
285  ret = r2net_send_message_vec(msg_type, RMSTR_KEY, vec, veclen,
286  nodenum, &status);
287  if (ret < 0)
288  ret = -1;
289  else {
290  ret = status;
291  *remotenode = nodenum;
292  }
293 
294  r2nm_node_put(node);
295 out:
296  return ret;
297 }
298 
299 int r2net_remote_flush(struct tmem_xhandle *xh, int remotenode)
300 {
301  int ret = -1, status;
302  struct r2nm_node *node = NULL;
303  struct kvec vec[1];
304  size_t veclen = 1;
305 
306  node = r2nm_get_node_by_num(remotenode);
307  BUG_ON(node == NULL);
308  xh->client_id = r2nm_this_node(); /* which node is flushing */
309  vec[0].iov_len = sizeof(*xh);
310  vec[0].iov_base = xh;
312  BUG_ON(in_softirq());
314  vec, veclen, remotenode, &status);
315  r2nm_node_put(node);
316  return ret;
317 }
318 
319 int r2net_remote_flush_object(struct tmem_xhandle *xh, int remotenode)
320 {
321  int ret = -1, status;
322  struct r2nm_node *node = NULL;
323  struct kvec vec[1];
324  size_t veclen = 1;
325 
326  node = r2nm_get_node_by_num(remotenode);
327  BUG_ON(node == NULL);
328  xh->client_id = r2nm_this_node(); /* which node is flobjing */
329  vec[0].iov_len = sizeof(*xh);
330  vec[0].iov_base = xh;
332  vec, veclen, remotenode, &status);
333  r2nm_node_put(node);
334  return ret;
335 }
336 
337 /*
338  * Handler registration
339  */
340 
341 static LIST_HEAD(r2net_unreg_list);
342 
343 static void r2net_unregister_handlers(void)
344 {
345  r2net_unregister_handler_list(&r2net_unreg_list);
346 }
347 
349 {
350  int status;
351 
355  NULL, NULL, &r2net_unreg_list);
356  if (status)
357  goto bail;
358 
362  NULL, NULL, &r2net_unreg_list);
363  if (status)
364  goto bail;
365 
368  ramster_remote_async_get_request_handler,
369  NULL, NULL,
370  &r2net_unreg_list);
371  if (status)
372  goto bail;
373 
376  ramster_remote_async_get_request_handler,
377  NULL, NULL,
378  &r2net_unreg_list);
379  if (status)
380  goto bail;
381 
384  ramster_remote_async_get_reply_handler,
385  NULL, NULL,
386  &r2net_unreg_list);
387  if (status)
388  goto bail;
389 
393  NULL, NULL,
394  &r2net_unreg_list);
395  if (status)
396  goto bail;
397 
401  NULL, NULL,
402  &r2net_unreg_list);
403  if (status)
404  goto bail;
405 
406  pr_info("ramster: r2net handlers registered\n");
407 
408 bail:
409  if (status) {
410  r2net_unregister_handlers();
411  pr_err("ramster: couldn't register r2net handlers\n");
412  }
413  return status;
414 }