Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
svc_rdma_sendto.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses. You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  * Redistributions of source code must retain the above copyright
15  * notice, this list of conditions and the following disclaimer.
16  *
17  * Redistributions in binary form must reproduce the above
18  * copyright notice, this list of conditions and the following
19  * disclaimer in the documentation and/or other materials provided
20  * with the distribution.
21  *
22  * Neither the name of the Network Appliance, Inc. nor the names of
23  * its contributors may be used to endorse or promote products
24  * derived from this software without specific prior written
25  * permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  *
39  * Author: Tom Tucker <[email protected]>
40  */
41 
42 #include <linux/sunrpc/debug.h>
43 #include <linux/sunrpc/rpc_rdma.h>
44 #include <linux/spinlock.h>
45 #include <asm/unaligned.h>
46 #include <rdma/ib_verbs.h>
47 #include <rdma/rdma_cm.h>
48 #include <linux/sunrpc/svc_rdma.h>
49 
50 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
51 
52 /* Encode an XDR as an array of IB SGE
53  *
54  * Assumptions:
55  * - head[0] is physically contiguous.
56  * - tail[0] is physically contiguous.
57  * - pages[] is not physically or virtually contiguous and consists of
58  * PAGE_SIZE elements.
59  *
60  * Output:
61  * SGE[0] reserved for RCPRDMA header
62  * SGE[1] data from xdr->head[]
63  * SGE[2..sge_count-2] data from xdr->pages[]
64  * SGE[sge_count-1] data from xdr->tail.
65  *
66  * The max SGE we need is the length of the XDR / pagesize + one for
67  * head + one for tail + one for RPCRDMA header. Since RPCSVC_MAXPAGES
68  * reserves a page for both the request and the reply header, and this
69  * array is only concerned with the reply we are assured that we have
70  * on extra page for the RPCRMDA header.
71  */
72 static int fast_reg_xdr(struct svcxprt_rdma *xprt,
73  struct xdr_buf *xdr,
74  struct svc_rdma_req_map *vec)
75 {
76  int sge_no;
77  u32 sge_bytes;
78  u32 page_bytes;
79  u32 page_off;
80  int page_no = 0;
81  u8 *frva;
82  struct svc_rdma_fastreg_mr *frmr;
83 
84  frmr = svc_rdma_get_frmr(xprt);
85  if (IS_ERR(frmr))
86  return -ENOMEM;
87  vec->frmr = frmr;
88 
89  /* Skip the RPCRDMA header */
90  sge_no = 1;
91 
92  /* Map the head. */
93  frva = (void *)((unsigned long)(xdr->head[0].iov_base) & PAGE_MASK);
94  vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
95  vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
96  vec->count = 2;
97  sge_no++;
98 
99  /* Map the XDR head */
100  frmr->kva = frva;
101  frmr->direction = DMA_TO_DEVICE;
102  frmr->access_flags = 0;
103  frmr->map_len = PAGE_SIZE;
104  frmr->page_list_len = 1;
105  page_off = (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
106  frmr->page_list->page_list[page_no] =
107  ib_dma_map_page(xprt->sc_cm_id->device,
108  virt_to_page(xdr->head[0].iov_base),
109  page_off,
110  PAGE_SIZE - page_off,
111  DMA_TO_DEVICE);
112  if (ib_dma_mapping_error(xprt->sc_cm_id->device,
113  frmr->page_list->page_list[page_no]))
114  goto fatal_err;
115  atomic_inc(&xprt->sc_dma_used);
116 
117  /* Map the XDR page list */
118  page_off = xdr->page_base;
119  page_bytes = xdr->page_len + page_off;
120  if (!page_bytes)
121  goto encode_tail;
122 
123  /* Map the pages */
124  vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
125  vec->sge[sge_no].iov_len = page_bytes;
126  sge_no++;
127  while (page_bytes) {
128  struct page *page;
129 
130  page = xdr->pages[page_no++];
131  sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
132  page_bytes -= sge_bytes;
133 
134  frmr->page_list->page_list[page_no] =
135  ib_dma_map_page(xprt->sc_cm_id->device,
136  page, page_off,
137  sge_bytes, DMA_TO_DEVICE);
138  if (ib_dma_mapping_error(xprt->sc_cm_id->device,
139  frmr->page_list->page_list[page_no]))
140  goto fatal_err;
141 
142  atomic_inc(&xprt->sc_dma_used);
143  page_off = 0; /* reset for next time through loop */
144  frmr->map_len += PAGE_SIZE;
145  frmr->page_list_len++;
146  }
147  vec->count++;
148 
149  encode_tail:
150  /* Map tail */
151  if (0 == xdr->tail[0].iov_len)
152  goto done;
153 
154  vec->count++;
155  vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
156 
157  if (((unsigned long)xdr->tail[0].iov_base & PAGE_MASK) ==
158  ((unsigned long)xdr->head[0].iov_base & PAGE_MASK)) {
159  /*
160  * If head and tail use the same page, we don't need
161  * to map it again.
162  */
163  vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
164  } else {
165  void *va;
166 
167  /* Map another page for the tail */
168  page_off = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
169  va = (void *)((unsigned long)xdr->tail[0].iov_base & PAGE_MASK);
170  vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
171 
172  frmr->page_list->page_list[page_no] =
173  ib_dma_map_page(xprt->sc_cm_id->device, virt_to_page(va),
174  page_off,
175  PAGE_SIZE,
176  DMA_TO_DEVICE);
177  if (ib_dma_mapping_error(xprt->sc_cm_id->device,
178  frmr->page_list->page_list[page_no]))
179  goto fatal_err;
180  atomic_inc(&xprt->sc_dma_used);
181  frmr->map_len += PAGE_SIZE;
182  frmr->page_list_len++;
183  }
184 
185  done:
186  if (svc_rdma_fastreg(xprt, frmr))
187  goto fatal_err;
188 
189  return 0;
190 
191  fatal_err:
192  printk("svcrdma: Error fast registering memory for xprt %p\n", xprt);
193  vec->frmr = NULL;
194  svc_rdma_put_frmr(xprt, frmr);
195  return -EIO;
196 }
197 
198 static int map_xdr(struct svcxprt_rdma *xprt,
199  struct xdr_buf *xdr,
200  struct svc_rdma_req_map *vec)
201 {
202  int sge_no;
203  u32 sge_bytes;
204  u32 page_bytes;
205  u32 page_off;
206  int page_no;
207 
208  BUG_ON(xdr->len !=
209  (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len));
210 
211  if (xprt->sc_frmr_pg_list_len)
212  return fast_reg_xdr(xprt, xdr, vec);
213 
214  /* Skip the first sge, this is for the RPCRDMA header */
215  sge_no = 1;
216 
217  /* Head SGE */
218  vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
219  vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
220  sge_no++;
221 
222  /* pages SGE */
223  page_no = 0;
224  page_bytes = xdr->page_len;
225  page_off = xdr->page_base;
226  while (page_bytes) {
227  vec->sge[sge_no].iov_base =
228  page_address(xdr->pages[page_no]) + page_off;
229  sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
230  page_bytes -= sge_bytes;
231  vec->sge[sge_no].iov_len = sge_bytes;
232 
233  sge_no++;
234  page_no++;
235  page_off = 0; /* reset for next time through loop */
236  }
237 
238  /* Tail SGE */
239  if (xdr->tail[0].iov_len) {
240  vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
241  vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
242  sge_no++;
243  }
244 
245  dprintk("svcrdma: map_xdr: sge_no %d page_no %d "
246  "page_base %u page_len %u head_len %zu tail_len %zu\n",
247  sge_no, page_no, xdr->page_base, xdr->page_len,
248  xdr->head[0].iov_len, xdr->tail[0].iov_len);
249 
250  vec->count = sge_no;
251  return 0;
252 }
253 
254 static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
255  struct xdr_buf *xdr,
256  u32 xdr_off, size_t len, int dir)
257 {
258  struct page *page;
260  if (xdr_off < xdr->head[0].iov_len) {
261  /* This offset is in the head */
262  xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
263  page = virt_to_page(xdr->head[0].iov_base);
264  } else {
265  xdr_off -= xdr->head[0].iov_len;
266  if (xdr_off < xdr->page_len) {
267  /* This offset is in the page list */
268  page = xdr->pages[xdr_off >> PAGE_SHIFT];
269  xdr_off &= ~PAGE_MASK;
270  } else {
271  /* This offset is in the tail */
272  xdr_off -= xdr->page_len;
273  xdr_off += (unsigned long)
274  xdr->tail[0].iov_base & ~PAGE_MASK;
275  page = virt_to_page(xdr->tail[0].iov_base);
276  }
277  }
278  dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
279  min_t(size_t, PAGE_SIZE, len), dir);
280  return dma_addr;
281 }
282 
283 /* Assumptions:
284  * - We are using FRMR
285  * - or -
286  * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
287  */
288 static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
289  u32 rmr, u64 to,
290  u32 xdr_off, int write_len,
291  struct svc_rdma_req_map *vec)
292 {
293  struct ib_send_wr write_wr;
294  struct ib_sge *sge;
295  int xdr_sge_no;
296  int sge_no;
297  int sge_bytes;
298  int sge_off;
299  int bc;
300  struct svc_rdma_op_ctxt *ctxt;
301 
302  BUG_ON(vec->count > RPCSVC_MAXPAGES);
303  dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
304  "write_len=%d, vec->sge=%p, vec->count=%lu\n",
305  rmr, (unsigned long long)to, xdr_off,
306  write_len, vec->sge, vec->count);
307 
308  ctxt = svc_rdma_get_context(xprt);
309  ctxt->direction = DMA_TO_DEVICE;
310  sge = ctxt->sge;
311 
312  /* Find the SGE associated with xdr_off */
313  for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
314  xdr_sge_no++) {
315  if (vec->sge[xdr_sge_no].iov_len > bc)
316  break;
317  bc -= vec->sge[xdr_sge_no].iov_len;
318  }
319 
320  sge_off = bc;
321  bc = write_len;
322  sge_no = 0;
323 
324  /* Copy the remaining SGE */
325  while (bc != 0) {
326  sge_bytes = min_t(size_t,
327  bc, vec->sge[xdr_sge_no].iov_len-sge_off);
328  sge[sge_no].length = sge_bytes;
329  if (!vec->frmr) {
330  sge[sge_no].addr =
331  dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
332  sge_bytes, DMA_TO_DEVICE);
333  xdr_off += sge_bytes;
334  if (ib_dma_mapping_error(xprt->sc_cm_id->device,
335  sge[sge_no].addr))
336  goto err;
337  atomic_inc(&xprt->sc_dma_used);
338  sge[sge_no].lkey = xprt->sc_dma_lkey;
339  } else {
340  sge[sge_no].addr = (unsigned long)
341  vec->sge[xdr_sge_no].iov_base + sge_off;
342  sge[sge_no].lkey = vec->frmr->mr->lkey;
343  }
344  ctxt->count++;
345  ctxt->frmr = vec->frmr;
346  sge_off = 0;
347  sge_no++;
348  xdr_sge_no++;
349  BUG_ON(xdr_sge_no > vec->count);
350  bc -= sge_bytes;
351  }
352 
353  /* Prepare WRITE WR */
354  memset(&write_wr, 0, sizeof write_wr);
355  ctxt->wr_op = IB_WR_RDMA_WRITE;
356  write_wr.wr_id = (unsigned long)ctxt;
357  write_wr.sg_list = &sge[0];
358  write_wr.num_sge = sge_no;
359  write_wr.opcode = IB_WR_RDMA_WRITE;
360  write_wr.send_flags = IB_SEND_SIGNALED;
361  write_wr.wr.rdma.rkey = rmr;
362  write_wr.wr.rdma.remote_addr = to;
363 
364  /* Post It */
366  if (svc_rdma_send(xprt, &write_wr))
367  goto err;
368  return 0;
369  err:
370  svc_rdma_unmap_dma(ctxt);
371  svc_rdma_put_frmr(xprt, vec->frmr);
372  svc_rdma_put_context(ctxt, 0);
373  /* Fatal error, close transport */
374  return -EIO;
375 }
376 
377 static int send_write_chunks(struct svcxprt_rdma *xprt,
378  struct rpcrdma_msg *rdma_argp,
379  struct rpcrdma_msg *rdma_resp,
380  struct svc_rqst *rqstp,
381  struct svc_rdma_req_map *vec)
382 {
383  u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
384  int write_len;
385  int max_write;
386  u32 xdr_off;
387  int chunk_off;
388  int chunk_no;
389  struct rpcrdma_write_array *arg_ary;
390  struct rpcrdma_write_array *res_ary;
391  int ret;
392 
393  arg_ary = svc_rdma_get_write_array(rdma_argp);
394  if (!arg_ary)
395  return 0;
396  res_ary = (struct rpcrdma_write_array *)
397  &rdma_resp->rm_body.rm_chunks[1];
398 
399  if (vec->frmr)
400  max_write = vec->frmr->map_len;
401  else
402  max_write = xprt->sc_max_sge * PAGE_SIZE;
403 
404  /* Write chunks start at the pagelist */
405  for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
406  xfer_len && chunk_no < arg_ary->wc_nchunks;
407  chunk_no++) {
408  struct rpcrdma_segment *arg_ch;
409  u64 rs_offset;
410 
411  arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
412  write_len = min(xfer_len, ntohl(arg_ch->rs_length));
413 
414  /* Prepare the response chunk given the length actually
415  * written */
416  xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);
417  svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
418  arg_ch->rs_handle,
419  arg_ch->rs_offset,
420  write_len);
421  chunk_off = 0;
422  while (write_len) {
423  int this_write;
424  this_write = min(write_len, max_write);
425  ret = send_write(xprt, rqstp,
426  ntohl(arg_ch->rs_handle),
427  rs_offset + chunk_off,
428  xdr_off,
429  this_write,
430  vec);
431  if (ret) {
432  dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
433  ret);
434  return -EIO;
435  }
436  chunk_off += this_write;
437  xdr_off += this_write;
438  xfer_len -= this_write;
439  write_len -= this_write;
440  }
441  }
442  /* Update the req with the number of chunks actually used */
443  svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
444 
445  return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
446 }
447 
448 static int send_reply_chunks(struct svcxprt_rdma *xprt,
449  struct rpcrdma_msg *rdma_argp,
450  struct rpcrdma_msg *rdma_resp,
451  struct svc_rqst *rqstp,
452  struct svc_rdma_req_map *vec)
453 {
454  u32 xfer_len = rqstp->rq_res.len;
455  int write_len;
456  int max_write;
457  u32 xdr_off;
458  int chunk_no;
459  int chunk_off;
460  int nchunks;
461  struct rpcrdma_segment *ch;
462  struct rpcrdma_write_array *arg_ary;
463  struct rpcrdma_write_array *res_ary;
464  int ret;
465 
466  arg_ary = svc_rdma_get_reply_array(rdma_argp);
467  if (!arg_ary)
468  return 0;
469  /* XXX: need to fix when reply lists occur with read-list and or
470  * write-list */
471  res_ary = (struct rpcrdma_write_array *)
472  &rdma_resp->rm_body.rm_chunks[2];
473 
474  if (vec->frmr)
475  max_write = vec->frmr->map_len;
476  else
477  max_write = xprt->sc_max_sge * PAGE_SIZE;
478 
479  /* xdr offset starts at RPC message */
480  nchunks = ntohl(arg_ary->wc_nchunks);
481  for (xdr_off = 0, chunk_no = 0;
482  xfer_len && chunk_no < nchunks;
483  chunk_no++) {
484  u64 rs_offset;
485  ch = &arg_ary->wc_array[chunk_no].wc_target;
486  write_len = min(xfer_len, htonl(ch->rs_length));
487 
488  /* Prepare the reply chunk given the length actually
489  * written */
490  xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
491  svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
492  ch->rs_handle, ch->rs_offset,
493  write_len);
494  chunk_off = 0;
495  while (write_len) {
496  int this_write;
497 
498  this_write = min(write_len, max_write);
499  ret = send_write(xprt, rqstp,
500  ntohl(ch->rs_handle),
501  rs_offset + chunk_off,
502  xdr_off,
503  this_write,
504  vec);
505  if (ret) {
506  dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
507  ret);
508  return -EIO;
509  }
510  chunk_off += this_write;
511  xdr_off += this_write;
512  xfer_len -= this_write;
513  write_len -= this_write;
514  }
515  }
516  /* Update the req with the number of chunks actually used */
517  svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
518 
519  return rqstp->rq_res.len;
520 }
521 
522 /* This function prepares the portion of the RPCRDMA message to be
523  * sent in the RDMA_SEND. This function is called after data sent via
524  * RDMA has already been transmitted. There are three cases:
525  * - The RPCRDMA header, RPC header, and payload are all sent in a
526  * single RDMA_SEND. This is the "inline" case.
527  * - The RPCRDMA header and some portion of the RPC header and data
528  * are sent via this RDMA_SEND and another portion of the data is
529  * sent via RDMA.
530  * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
531  * header and data are all transmitted via RDMA.
532  * In all three cases, this function prepares the RPCRDMA header in
533  * sge[0], the 'type' parameter indicates the type to place in the
534  * RPCRDMA header, and the 'byte_count' field indicates how much of
535  * the XDR to include in this RDMA_SEND. NB: The offset of the payload
536  * to send is zero in the XDR.
537  */
538 static int send_reply(struct svcxprt_rdma *rdma,
539  struct svc_rqst *rqstp,
540  struct page *page,
541  struct rpcrdma_msg *rdma_resp,
542  struct svc_rdma_op_ctxt *ctxt,
543  struct svc_rdma_req_map *vec,
544  int byte_count)
545 {
546  struct ib_send_wr send_wr;
547  struct ib_send_wr inv_wr;
548  int sge_no;
549  int sge_bytes;
550  int page_no;
551  int ret;
552 
553  /* Post a recv buffer to handle another request. */
554  ret = svc_rdma_post_recv(rdma);
555  if (ret) {
557  "svcrdma: could not post a receive buffer, err=%d."
558  "Closing transport %p.\n", ret, rdma);
559  set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
560  svc_rdma_put_frmr(rdma, vec->frmr);
561  svc_rdma_put_context(ctxt, 0);
562  return -ENOTCONN;
563  }
564 
565  /* Prepare the context */
566  ctxt->pages[0] = page;
567  ctxt->count = 1;
568  ctxt->frmr = vec->frmr;
569  if (vec->frmr)
571  else
573 
574  /* Prepare the SGE for the RPCRDMA Header */
575  ctxt->sge[0].lkey = rdma->sc_dma_lkey;
576  ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
577  ctxt->sge[0].addr =
578  ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
579  ctxt->sge[0].length, DMA_TO_DEVICE);
580  if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
581  goto err;
582  atomic_inc(&rdma->sc_dma_used);
583 
584  ctxt->direction = DMA_TO_DEVICE;
585 
586  /* Map the payload indicated by 'byte_count' */
587  for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
588  int xdr_off = 0;
589  sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
590  byte_count -= sge_bytes;
591  if (!vec->frmr) {
592  ctxt->sge[sge_no].addr =
593  dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
594  sge_bytes, DMA_TO_DEVICE);
595  xdr_off += sge_bytes;
596  if (ib_dma_mapping_error(rdma->sc_cm_id->device,
597  ctxt->sge[sge_no].addr))
598  goto err;
599  atomic_inc(&rdma->sc_dma_used);
600  ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
601  } else {
602  ctxt->sge[sge_no].addr = (unsigned long)
603  vec->sge[sge_no].iov_base;
604  ctxt->sge[sge_no].lkey = vec->frmr->mr->lkey;
605  }
606  ctxt->sge[sge_no].length = sge_bytes;
607  }
608  BUG_ON(byte_count != 0);
609 
610  /* Save all respages in the ctxt and remove them from the
611  * respages array. They are our pages until the I/O
612  * completes.
613  */
614  for (page_no = 0; page_no < rqstp->rq_resused; page_no++) {
615  ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
616  ctxt->count++;
617  rqstp->rq_respages[page_no] = NULL;
618  /*
619  * If there are more pages than SGE, terminate SGE
620  * list so that svc_rdma_unmap_dma doesn't attempt to
621  * unmap garbage.
622  */
623  if (page_no+1 >= sge_no)
624  ctxt->sge[page_no+1].length = 0;
625  }
626  BUG_ON(sge_no > rdma->sc_max_sge);
627  memset(&send_wr, 0, sizeof send_wr);
628  ctxt->wr_op = IB_WR_SEND;
629  send_wr.wr_id = (unsigned long)ctxt;
630  send_wr.sg_list = ctxt->sge;
631  send_wr.num_sge = sge_no;
632  send_wr.opcode = IB_WR_SEND;
633  send_wr.send_flags = IB_SEND_SIGNALED;
634  if (vec->frmr) {
635  /* Prepare INVALIDATE WR */
636  memset(&inv_wr, 0, sizeof inv_wr);
637  inv_wr.opcode = IB_WR_LOCAL_INV;
638  inv_wr.send_flags = IB_SEND_SIGNALED;
639  inv_wr.ex.invalidate_rkey =
640  vec->frmr->mr->lkey;
641  send_wr.next = &inv_wr;
642  }
643 
644  ret = svc_rdma_send(rdma, &send_wr);
645  if (ret)
646  goto err;
647 
648  return 0;
649 
650  err:
651  svc_rdma_unmap_dma(ctxt);
652  svc_rdma_put_frmr(rdma, vec->frmr);
653  svc_rdma_put_context(ctxt, 1);
654  return -EIO;
655 }
656 
658 {
659 }
660 
661 /*
662  * Return the start of an xdr buffer.
663  */
664 static void *xdr_start(struct xdr_buf *xdr)
665 {
666  return xdr->head[0].iov_base -
667  (xdr->len -
668  xdr->page_len -
669  xdr->tail[0].iov_len -
670  xdr->head[0].iov_len);
671 }
672 
673 int svc_rdma_sendto(struct svc_rqst *rqstp)
674 {
675  struct svc_xprt *xprt = rqstp->rq_xprt;
676  struct svcxprt_rdma *rdma =
677  container_of(xprt, struct svcxprt_rdma, sc_xprt);
678  struct rpcrdma_msg *rdma_argp;
679  struct rpcrdma_msg *rdma_resp;
680  struct rpcrdma_write_array *reply_ary;
681  enum rpcrdma_proc reply_type;
682  int ret;
683  int inline_bytes;
684  struct page *res_page;
685  struct svc_rdma_op_ctxt *ctxt;
686  struct svc_rdma_req_map *vec;
687 
688  dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
689 
690  /* Get the RDMA request header. */
691  rdma_argp = xdr_start(&rqstp->rq_arg);
692 
693  /* Build an req vec for the XDR */
694  ctxt = svc_rdma_get_context(rdma);
695  ctxt->direction = DMA_TO_DEVICE;
696  vec = svc_rdma_get_req_map();
697  ret = map_xdr(rdma, &rqstp->rq_res, vec);
698  if (ret)
699  goto err0;
700  inline_bytes = rqstp->rq_res.len;
701 
702  /* Create the RDMA response header */
703  res_page = svc_rdma_get_page();
704  rdma_resp = page_address(res_page);
705  reply_ary = svc_rdma_get_reply_array(rdma_argp);
706  if (reply_ary)
707  reply_type = RDMA_NOMSG;
708  else
709  reply_type = RDMA_MSG;
710  svc_rdma_xdr_encode_reply_header(rdma, rdma_argp,
711  rdma_resp, reply_type);
712 
713  /* Send any write-chunk data and build resp write-list */
714  ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
715  rqstp, vec);
716  if (ret < 0) {
717  printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
718  ret);
719  goto err1;
720  }
721  inline_bytes -= ret;
722 
723  /* Send any reply-list data and update resp reply-list */
724  ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
725  rqstp, vec);
726  if (ret < 0) {
727  printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
728  ret);
729  goto err1;
730  }
731  inline_bytes -= ret;
732 
733  ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec,
734  inline_bytes);
736  dprintk("svcrdma: send_reply returns %d\n", ret);
737  return ret;
738 
739  err1:
740  put_page(res_page);
741  err0:
743  svc_rdma_put_context(ctxt, 0);
744  return ret;
745 }