Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
svc_rdma_recvfrom.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses. You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  * Redistributions of source code must retain the above copyright
15  * notice, this list of conditions and the following disclaimer.
16  *
17  * Redistributions in binary form must reproduce the above
18  * copyright notice, this list of conditions and the following
19  * disclaimer in the documentation and/or other materials provided
20  * with the distribution.
21  *
22  * Neither the name of the Network Appliance, Inc. nor the names of
23  * its contributors may be used to endorse or promote products
24  * derived from this software without specific prior written
25  * permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  *
39  * Author: Tom Tucker <[email protected]>
40  */
41 
42 #include <linux/sunrpc/debug.h>
43 #include <linux/sunrpc/rpc_rdma.h>
44 #include <linux/spinlock.h>
45 #include <asm/unaligned.h>
46 #include <rdma/ib_verbs.h>
47 #include <rdma/rdma_cm.h>
48 #include <linux/sunrpc/svc_rdma.h>
49 
50 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
51 
52 /*
53  * Replace the pages in the rq_argpages array with the pages from the SGE in
54  * the RDMA_RECV completion. The SGL should contain full pages up until the
55  * last one.
56  */
57 static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
58  struct svc_rdma_op_ctxt *ctxt,
59  u32 byte_count)
60 {
61  struct page *page;
62  u32 bc;
63  int sge_no;
64 
65  /* Swap the page in the SGE with the page in argpages */
66  page = ctxt->pages[0];
67  put_page(rqstp->rq_pages[0]);
68  rqstp->rq_pages[0] = page;
69 
70  /* Set up the XDR head */
71  rqstp->rq_arg.head[0].iov_base = page_address(page);
72  rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length);
73  rqstp->rq_arg.len = byte_count;
74  rqstp->rq_arg.buflen = byte_count;
75 
76  /* Compute bytes past head in the SGL */
77  bc = byte_count - rqstp->rq_arg.head[0].iov_len;
78 
79  /* If data remains, store it in the pagelist */
80  rqstp->rq_arg.page_len = bc;
81  rqstp->rq_arg.page_base = 0;
82  rqstp->rq_arg.pages = &rqstp->rq_pages[1];
83  sge_no = 1;
84  while (bc && sge_no < ctxt->count) {
85  page = ctxt->pages[sge_no];
86  put_page(rqstp->rq_pages[sge_no]);
87  rqstp->rq_pages[sge_no] = page;
88  bc -= min(bc, ctxt->sge[sge_no].length);
89  rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
90  sge_no++;
91  }
92  rqstp->rq_respages = &rqstp->rq_pages[sge_no];
93 
94  /* We should never run out of SGE because the limit is defined to
95  * support the max allowed RPC data length
96  */
97  BUG_ON(bc && (sge_no == ctxt->count));
98  BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len)
99  != byte_count);
100  BUG_ON(rqstp->rq_arg.len != byte_count);
101 
102  /* If not all pages were used from the SGL, free the remaining ones */
103  bc = sge_no;
104  while (sge_no < ctxt->count) {
105  page = ctxt->pages[sge_no++];
106  put_page(page);
107  }
108  ctxt->count = bc;
109 
110  /* Set up tail */
111  rqstp->rq_arg.tail[0].iov_base = NULL;
112  rqstp->rq_arg.tail[0].iov_len = 0;
113 }
114 
115 /* Encode a read-chunk-list as an array of IB SGE
116  *
117  * Assumptions:
118  * - chunk[0]->position points to pages[0] at an offset of 0
119  * - pages[] is not physically or virtually contiguous and consists of
120  * PAGE_SIZE elements.
121  *
122  * Output:
123  * - sge array pointing into pages[] array.
124  * - chunk_sge array specifying sge index and count for each
125  * chunk in the read list
126  *
127  */
128 static int map_read_chunks(struct svcxprt_rdma *xprt,
129  struct svc_rqst *rqstp,
130  struct svc_rdma_op_ctxt *head,
131  struct rpcrdma_msg *rmsgp,
132  struct svc_rdma_req_map *rpl_map,
133  struct svc_rdma_req_map *chl_map,
134  int ch_count,
135  int byte_count)
136 {
137  int sge_no;
138  int sge_bytes;
139  int page_off;
140  int page_no;
141  int ch_bytes;
142  int ch_no;
143  struct rpcrdma_read_chunk *ch;
144 
145  sge_no = 0;
146  page_no = 0;
147  page_off = 0;
148  ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
149  ch_no = 0;
150  ch_bytes = ntohl(ch->rc_target.rs_length);
151  head->arg.head[0] = rqstp->rq_arg.head[0];
152  head->arg.tail[0] = rqstp->rq_arg.tail[0];
153  head->arg.pages = &head->pages[head->count];
154  head->hdr_count = head->count; /* save count of hdr pages */
155  head->arg.page_base = 0;
156  head->arg.page_len = ch_bytes;
157  head->arg.len = rqstp->rq_arg.len + ch_bytes;
158  head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
159  head->count++;
160  chl_map->ch[0].start = 0;
161  while (byte_count) {
162  rpl_map->sge[sge_no].iov_base =
163  page_address(rqstp->rq_arg.pages[page_no]) + page_off;
164  sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
165  rpl_map->sge[sge_no].iov_len = sge_bytes;
166  /*
167  * Don't bump head->count here because the same page
168  * may be used by multiple SGE.
169  */
170  head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
171  rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
172 
173  byte_count -= sge_bytes;
174  ch_bytes -= sge_bytes;
175  sge_no++;
176  /*
177  * If all bytes for this chunk have been mapped to an
178  * SGE, move to the next SGE
179  */
180  if (ch_bytes == 0) {
181  chl_map->ch[ch_no].count =
182  sge_no - chl_map->ch[ch_no].start;
183  ch_no++;
184  ch++;
185  chl_map->ch[ch_no].start = sge_no;
186  ch_bytes = ntohl(ch->rc_target.rs_length);
187  /* If bytes remaining account for next chunk */
188  if (byte_count) {
189  head->arg.page_len += ch_bytes;
190  head->arg.len += ch_bytes;
191  head->arg.buflen += ch_bytes;
192  }
193  }
194  /*
195  * If this SGE consumed all of the page, move to the
196  * next page
197  */
198  if ((sge_bytes + page_off) == PAGE_SIZE) {
199  page_no++;
200  page_off = 0;
201  /*
202  * If there are still bytes left to map, bump
203  * the page count
204  */
205  if (byte_count)
206  head->count++;
207  } else
208  page_off += sge_bytes;
209  }
210  BUG_ON(byte_count != 0);
211  return sge_no;
212 }
213 
214 /* Map a read-chunk-list to an XDR and fast register the page-list.
215  *
216  * Assumptions:
217  * - chunk[0] position points to pages[0] at an offset of 0
218  * - pages[] will be made physically contiguous by creating a one-off memory
219  * region using the fastreg verb.
220  * - byte_count is # of bytes in read-chunk-list
221  * - ch_count is # of chunks in read-chunk-list
222  *
223  * Output:
224  * - sge array pointing into pages[] array.
225  * - chunk_sge array specifying sge index and count for each
226  * chunk in the read list
227  */
228 static int fast_reg_read_chunks(struct svcxprt_rdma *xprt,
229  struct svc_rqst *rqstp,
230  struct svc_rdma_op_ctxt *head,
231  struct rpcrdma_msg *rmsgp,
232  struct svc_rdma_req_map *rpl_map,
233  struct svc_rdma_req_map *chl_map,
234  int ch_count,
235  int byte_count)
236 {
237  int page_no;
238  int ch_no;
239  u32 offset;
240  struct rpcrdma_read_chunk *ch;
241  struct svc_rdma_fastreg_mr *frmr;
242  int ret = 0;
243 
244  frmr = svc_rdma_get_frmr(xprt);
245  if (IS_ERR(frmr))
246  return -ENOMEM;
247 
248  head->frmr = frmr;
249  head->arg.head[0] = rqstp->rq_arg.head[0];
250  head->arg.tail[0] = rqstp->rq_arg.tail[0];
251  head->arg.pages = &head->pages[head->count];
252  head->hdr_count = head->count; /* save count of hdr pages */
253  head->arg.page_base = 0;
254  head->arg.page_len = byte_count;
255  head->arg.len = rqstp->rq_arg.len + byte_count;
256  head->arg.buflen = rqstp->rq_arg.buflen + byte_count;
257 
258  /* Fast register the page list */
259  frmr->kva = page_address(rqstp->rq_arg.pages[0]);
260  frmr->direction = DMA_FROM_DEVICE;
262  frmr->map_len = byte_count;
263  frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT;
264  for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
265  frmr->page_list->page_list[page_no] =
266  ib_dma_map_page(xprt->sc_cm_id->device,
267  rqstp->rq_arg.pages[page_no], 0,
269  if (ib_dma_mapping_error(xprt->sc_cm_id->device,
270  frmr->page_list->page_list[page_no]))
271  goto fatal_err;
272  atomic_inc(&xprt->sc_dma_used);
273  head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
274  }
275  head->count += page_no;
276 
277  /* rq_respages points one past arg pages */
278  rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
279 
280  /* Create the reply and chunk maps */
281  offset = 0;
282  ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
283  for (ch_no = 0; ch_no < ch_count; ch_no++) {
284  int len = ntohl(ch->rc_target.rs_length);
285  rpl_map->sge[ch_no].iov_base = frmr->kva + offset;
286  rpl_map->sge[ch_no].iov_len = len;
287  chl_map->ch[ch_no].count = 1;
288  chl_map->ch[ch_no].start = ch_no;
289  offset += len;
290  ch++;
291  }
292 
293  ret = svc_rdma_fastreg(xprt, frmr);
294  if (ret)
295  goto fatal_err;
296 
297  return ch_no;
298 
299  fatal_err:
300  printk("svcrdma: error fast registering xdr for xprt %p", xprt);
301  svc_rdma_put_frmr(xprt, frmr);
302  return -EIO;
303 }
304 
305 static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt,
306  struct svc_rdma_op_ctxt *ctxt,
307  struct svc_rdma_fastreg_mr *frmr,
308  struct kvec *vec,
309  u64 *sgl_offset,
310  int count)
311 {
312  int i;
313  unsigned long off;
314 
315  ctxt->count = count;
316  ctxt->direction = DMA_FROM_DEVICE;
317  for (i = 0; i < count; i++) {
318  ctxt->sge[i].length = 0; /* in case map fails */
319  if (!frmr) {
320  BUG_ON(!virt_to_page(vec[i].iov_base));
321  off = (unsigned long)vec[i].iov_base & ~PAGE_MASK;
322  ctxt->sge[i].addr =
323  ib_dma_map_page(xprt->sc_cm_id->device,
324  virt_to_page(vec[i].iov_base),
325  off,
326  vec[i].iov_len,
328  if (ib_dma_mapping_error(xprt->sc_cm_id->device,
329  ctxt->sge[i].addr))
330  return -EINVAL;
331  ctxt->sge[i].lkey = xprt->sc_dma_lkey;
332  atomic_inc(&xprt->sc_dma_used);
333  } else {
334  ctxt->sge[i].addr = (unsigned long)vec[i].iov_base;
335  ctxt->sge[i].lkey = frmr->mr->lkey;
336  }
337  ctxt->sge[i].length = vec[i].iov_len;
338  *sgl_offset = *sgl_offset + vec[i].iov_len;
339  }
340  return 0;
341 }
342 
343 static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
344 {
345  if ((rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
347  sge_count > 1)
348  return 1;
349  else
350  return min_t(int, sge_count, xprt->sc_max_sge);
351 }
352 
353 /*
354  * Use RDMA_READ to read data from the advertised client buffer into the
355  * XDR stream starting at rq_arg.head[0].iov_base.
356  * Each chunk in the array
357  * contains the following fields:
358  * discrim - '1', This isn't used for data placement
359  * position - The xdr stream offset (the same for every chunk)
360  * handle - RMR for client memory region
361  * length - data transfer length
362  * offset - 64 bit tagged offset in remote memory region
363  *
364  * On our side, we need to read into a pagelist. The first page immediately
365  * follows the RPC header.
366  *
367  * This function returns:
368  * 0 - No error and no read-list found.
369  *
370  * 1 - Successful read-list processing. The data is not yet in
371  * the pagelist and therefore the RPC request must be deferred. The
372  * I/O completion will enqueue the transport again and
373  * svc_rdma_recvfrom will complete the request.
374  *
375  * <0 - Error processing/posting read-list.
376  *
377  * NOTE: The ctxt must not be touched after the last WR has been posted
378  * because the I/O completion processing may occur on another
379  * processor and free / modify the context. Ne touche pas!
380  */
381 static int rdma_read_xdr(struct svcxprt_rdma *xprt,
382  struct rpcrdma_msg *rmsgp,
383  struct svc_rqst *rqstp,
384  struct svc_rdma_op_ctxt *hdr_ctxt)
385 {
386  struct ib_send_wr read_wr;
387  struct ib_send_wr inv_wr;
388  int err = 0;
389  int ch_no;
390  int ch_count;
391  int byte_count;
392  int sge_count;
393  u64 sgl_offset;
394  struct rpcrdma_read_chunk *ch;
395  struct svc_rdma_op_ctxt *ctxt = NULL;
396  struct svc_rdma_req_map *rpl_map;
397  struct svc_rdma_req_map *chl_map;
398 
399  /* If no read list is present, return 0 */
400  ch = svc_rdma_get_read_chunk(rmsgp);
401  if (!ch)
402  return 0;
403 
404  svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
405  if (ch_count > RPCSVC_MAXPAGES)
406  return -EINVAL;
407 
408  /* Allocate temporary reply and chunk maps */
409  rpl_map = svc_rdma_get_req_map();
410  chl_map = svc_rdma_get_req_map();
411 
412  if (!xprt->sc_frmr_pg_list_len)
413  sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
414  rpl_map, chl_map, ch_count,
415  byte_count);
416  else
417  sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
418  rpl_map, chl_map, ch_count,
419  byte_count);
420  if (sge_count < 0) {
421  err = -EIO;
422  goto out;
423  }
424 
425  sgl_offset = 0;
426  ch_no = 0;
427 
428  for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
429  ch->rc_discrim != 0; ch++, ch_no++) {
430  u64 rs_offset;
431 next_sge:
432  ctxt = svc_rdma_get_context(xprt);
433  ctxt->direction = DMA_FROM_DEVICE;
434  ctxt->frmr = hdr_ctxt->frmr;
435  ctxt->read_hdr = NULL;
438 
439  /* Prepare READ WR */
440  memset(&read_wr, 0, sizeof read_wr);
441  read_wr.wr_id = (unsigned long)ctxt;
442  read_wr.opcode = IB_WR_RDMA_READ;
443  ctxt->wr_op = read_wr.opcode;
444  read_wr.send_flags = IB_SEND_SIGNALED;
445  read_wr.wr.rdma.rkey = ntohl(ch->rc_target.rs_handle);
446  xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
447  &rs_offset);
448  read_wr.wr.rdma.remote_addr = rs_offset + sgl_offset;
449  read_wr.sg_list = ctxt->sge;
450  read_wr.num_sge =
451  rdma_read_max_sge(xprt, chl_map->ch[ch_no].count);
452  err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr,
453  &rpl_map->sge[chl_map->ch[ch_no].start],
454  &sgl_offset,
455  read_wr.num_sge);
456  if (err) {
457  svc_rdma_unmap_dma(ctxt);
458  svc_rdma_put_context(ctxt, 0);
459  goto out;
460  }
461  if (((ch+1)->rc_discrim == 0) &&
462  (read_wr.num_sge == chl_map->ch[ch_no].count)) {
463  /*
464  * Mark the last RDMA_READ with a bit to
465  * indicate all RPC data has been fetched from
466  * the client and the RPC needs to be enqueued.
467  */
469  if (hdr_ctxt->frmr) {
471  /*
472  * Invalidate the local MR used to map the data
473  * sink.
474  */
475  if (xprt->sc_dev_caps &
477  read_wr.opcode =
479  ctxt->wr_op = read_wr.opcode;
480  read_wr.ex.invalidate_rkey =
481  ctxt->frmr->mr->lkey;
482  } else {
483  /* Prepare INVALIDATE WR */
484  memset(&inv_wr, 0, sizeof inv_wr);
485  inv_wr.opcode = IB_WR_LOCAL_INV;
486  inv_wr.send_flags = IB_SEND_SIGNALED;
487  inv_wr.ex.invalidate_rkey =
488  hdr_ctxt->frmr->mr->lkey;
489  read_wr.next = &inv_wr;
490  }
491  }
492  ctxt->read_hdr = hdr_ctxt;
493  }
494  /* Post the read */
495  err = svc_rdma_send(xprt, &read_wr);
496  if (err) {
497  printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n",
498  err);
499  set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
500  svc_rdma_unmap_dma(ctxt);
501  svc_rdma_put_context(ctxt, 0);
502  goto out;
503  }
505 
506  if (read_wr.num_sge < chl_map->ch[ch_no].count) {
507  chl_map->ch[ch_no].count -= read_wr.num_sge;
508  chl_map->ch[ch_no].start += read_wr.num_sge;
509  goto next_sge;
510  }
511  sgl_offset = 0;
512  err = 1;
513  }
514 
515  out:
516  svc_rdma_put_req_map(rpl_map);
517  svc_rdma_put_req_map(chl_map);
518 
519  /* Detach arg pages. svc_recv will replenish them */
520  for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++)
521  rqstp->rq_pages[ch_no] = NULL;
522 
523  /*
524  * Detach res pages. svc_release must see a resused count of
525  * zero or it will attempt to put them.
526  */
527  while (rqstp->rq_resused)
528  rqstp->rq_respages[--rqstp->rq_resused] = NULL;
529 
530  return err;
531 }
532 
533 static int rdma_read_complete(struct svc_rqst *rqstp,
534  struct svc_rdma_op_ctxt *head)
535 {
536  int page_no;
537  int ret;
538 
539  BUG_ON(!head);
540 
541  /* Copy RPC pages */
542  for (page_no = 0; page_no < head->count; page_no++) {
543  put_page(rqstp->rq_pages[page_no]);
544  rqstp->rq_pages[page_no] = head->pages[page_no];
545  }
546  /* Point rq_arg.pages past header */
547  rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
548  rqstp->rq_arg.page_len = head->arg.page_len;
549  rqstp->rq_arg.page_base = head->arg.page_base;
550 
551  /* rq_respages starts after the last arg page */
552  rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
553  rqstp->rq_resused = 0;
554 
555  /* Rebuild rq_arg head and tail. */
556  rqstp->rq_arg.head[0] = head->arg.head[0];
557  rqstp->rq_arg.tail[0] = head->arg.tail[0];
558  rqstp->rq_arg.len = head->arg.len;
559  rqstp->rq_arg.buflen = head->arg.buflen;
560 
561  /* Free the context */
562  svc_rdma_put_context(head, 0);
563 
564  /* XXX: What should this be? */
565  rqstp->rq_prot = IPPROTO_MAX;
566  svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt);
567 
568  ret = rqstp->rq_arg.head[0].iov_len
569  + rqstp->rq_arg.page_len
570  + rqstp->rq_arg.tail[0].iov_len;
571  dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, "
572  "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
573  ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
574  rqstp->rq_arg.head[0].iov_len);
575 
576  return ret;
577 }
578 
579 /*
580  * Set up the rqstp thread context to point to the RQ buffer. If
581  * necessary, pull additional data from the client with an RDMA_READ
582  * request.
583  */
584 int svc_rdma_recvfrom(struct svc_rqst *rqstp)
585 {
586  struct svc_xprt *xprt = rqstp->rq_xprt;
587  struct svcxprt_rdma *rdma_xprt =
588  container_of(xprt, struct svcxprt_rdma, sc_xprt);
589  struct svc_rdma_op_ctxt *ctxt = NULL;
590  struct rpcrdma_msg *rmsgp;
591  int ret = 0;
592  int len;
593 
594  dprintk("svcrdma: rqstp=%p\n", rqstp);
595 
596  spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
597  if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
598  ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
599  struct svc_rdma_op_ctxt,
600  dto_q);
601  list_del_init(&ctxt->dto_q);
602  }
603  if (ctxt) {
604  spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
605  return rdma_read_complete(rqstp, ctxt);
606  }
607 
608  if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
609  ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
610  struct svc_rdma_op_ctxt,
611  dto_q);
612  list_del_init(&ctxt->dto_q);
613  } else {
615  clear_bit(XPT_DATA, &xprt->xpt_flags);
616  ctxt = NULL;
617  }
618  spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
619  if (!ctxt) {
620  /* This is the EAGAIN path. The svc_recv routine will
621  * return -EAGAIN, the nfsd thread will go to call into
622  * svc_recv again and we shouldn't be on the active
623  * transport list
624  */
625  if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
626  goto close_out;
627 
628  BUG_ON(ret);
629  goto out;
630  }
631  dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
632  ctxt, rdma_xprt, rqstp, ctxt->wc_status);
633  BUG_ON(ctxt->wc_status != IB_WC_SUCCESS);
635 
636  /* Build up the XDR from the receive buffers. */
637  rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
638 
639  /* Decode the RDMA header. */
640  len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
641  rqstp->rq_xprt_hlen = len;
642 
643  /* If the request is invalid, reply with an error */
644  if (len < 0) {
645  if (len == -ENOSYS)
646  svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
647  goto close_out;
648  }
649 
650  /* Read read-list data. */
651  ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt);
652  if (ret > 0) {
653  /* read-list posted, defer until data received from client. */
654  goto defer;
655  }
656  if (ret < 0) {
657  /* Post of read-list failed, free context. */
658  svc_rdma_put_context(ctxt, 1);
659  return 0;
660  }
661 
662  ret = rqstp->rq_arg.head[0].iov_len
663  + rqstp->rq_arg.page_len
664  + rqstp->rq_arg.tail[0].iov_len;
665  svc_rdma_put_context(ctxt, 0);
666  out:
667  dprintk("svcrdma: ret = %d, rq_arg.len =%d, "
668  "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
669  ret, rqstp->rq_arg.len,
670  rqstp->rq_arg.head[0].iov_base,
671  rqstp->rq_arg.head[0].iov_len);
672  rqstp->rq_prot = IPPROTO_MAX;
673  svc_xprt_copy_addrs(rqstp, xprt);
674  return ret;
675 
676  close_out:
677  if (ctxt)
678  svc_rdma_put_context(ctxt, 1);
679  dprintk("svcrdma: transport %p is closing\n", xprt);
680  /*
681  * Set the close bit and enqueue it. svc_recv will see the
682  * close bit and call svc_xprt_delete
683  */
684  set_bit(XPT_CLOSE, &xprt->xpt_flags);
685 defer:
686  return 0;
687 }