Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
iw_send.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2006 Oracle. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses. You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  * Redistribution and use in source and binary forms, with or
11  * without modification, are permitted provided that the following
12  * conditions are met:
13  *
14  * - Redistributions of source code must retain the above
15  * copyright notice, this list of conditions and the following
16  * disclaimer.
17  *
18  * - Redistributions in binary form must reproduce the above
19  * copyright notice, this list of conditions and the following
20  * disclaimer in the documentation and/or other materials
21  * provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  *
32  */
33 #include <linux/kernel.h>
34 #include <linux/in.h>
35 #include <linux/device.h>
36 #include <linux/dmapool.h>
37 #include <linux/ratelimit.h>
38 
39 #include "rds.h"
40 #include "iw.h"
41 
42 static void rds_iw_send_rdma_complete(struct rds_message *rm,
43  int wc_status)
44 {
45  int notify_status;
46 
47  switch (wc_status) {
48  case IB_WC_WR_FLUSH_ERR:
49  return;
50 
51  case IB_WC_SUCCESS:
52  notify_status = RDS_RDMA_SUCCESS;
53  break;
54 
56  notify_status = RDS_RDMA_REMOTE_ERROR;
57  break;
58 
59  default:
60  notify_status = RDS_RDMA_OTHER_ERROR;
61  break;
62  }
63  rds_rdma_send_complete(rm, notify_status);
64 }
65 
66 static void rds_iw_send_unmap_rdma(struct rds_iw_connection *ic,
67  struct rm_rdma_op *op)
68 {
69  if (op->op_mapped) {
70  ib_dma_unmap_sg(ic->i_cm_id->device,
71  op->op_sg, op->op_nents,
72  op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
73  op->op_mapped = 0;
74  }
75 }
76 
77 static void rds_iw_send_unmap_rm(struct rds_iw_connection *ic,
78  struct rds_iw_send_work *send,
79  int wc_status)
80 {
81  struct rds_message *rm = send->s_rm;
82 
83  rdsdebug("ic %p send %p rm %p\n", ic, send, rm);
84 
85  ib_dma_unmap_sg(ic->i_cm_id->device,
86  rm->data.op_sg, rm->data.op_nents,
88 
89  if (rm->rdma.op_active) {
90  rds_iw_send_unmap_rdma(ic, &rm->rdma);
91 
92  /* If the user asked for a completion notification on this
93  * message, we can implement three different semantics:
94  * 1. Notify when we received the ACK on the RDS message
95  * that was queued with the RDMA. This provides reliable
96  * notification of RDMA status at the expense of a one-way
97  * packet delay.
98  * 2. Notify when the IB stack gives us the completion event for
99  * the RDMA operation.
100  * 3. Notify when the IB stack gives us the completion event for
101  * the accompanying RDS messages.
102  * Here, we implement approach #3. To implement approach #2,
103  * call rds_rdma_send_complete from the cq_handler. To implement #1,
104  * don't call rds_rdma_send_complete at all, and fall back to the notify
105  * handling in the ACK processing code.
106  *
107  * Note: There's no need to explicitly sync any RDMA buffers using
108  * ib_dma_sync_sg_for_cpu - the completion for the RDMA
109  * operation itself unmapped the RDMA buffers, which takes care
110  * of synching.
111  */
112  rds_iw_send_rdma_complete(rm, wc_status);
113 
114  if (rm->rdma.op_write)
115  rds_stats_add(s_send_rdma_bytes, rm->rdma.op_bytes);
116  else
117  rds_stats_add(s_recv_rdma_bytes, rm->rdma.op_bytes);
118  }
119 
120  /* If anyone waited for this message to get flushed out, wake
121  * them up now */
123 
124  rds_message_put(rm);
125  send->s_rm = NULL;
126 }
127 
129 {
130  struct rds_iw_send_work *send;
131  u32 i;
132 
133  for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
134  struct ib_sge *sge;
135 
136  send->s_rm = NULL;
137  send->s_op = NULL;
138  send->s_mapping = NULL;
139 
140  send->s_wr.next = NULL;
141  send->s_wr.wr_id = i;
142  send->s_wr.sg_list = send->s_sge;
143  send->s_wr.num_sge = 1;
144  send->s_wr.opcode = IB_WR_SEND;
145  send->s_wr.send_flags = 0;
146  send->s_wr.ex.imm_data = 0;
147 
148  sge = rds_iw_data_sge(ic, send->s_sge);
149  sge->lkey = 0;
150 
151  sge = rds_iw_header_sge(ic, send->s_sge);
152  sge->addr = ic->i_send_hdrs_dma + (i * sizeof(struct rds_header));
153  sge->length = sizeof(struct rds_header);
154  sge->lkey = 0;
155 
157  if (IS_ERR(send->s_mr)) {
158  printk(KERN_WARNING "RDS/IW: ib_alloc_fast_reg_mr failed\n");
159  break;
160  }
161 
163  ic->i_cm_id->device, fastreg_message_size);
164  if (IS_ERR(send->s_page_list)) {
165  printk(KERN_WARNING "RDS/IW: ib_alloc_fast_reg_page_list failed\n");
166  break;
167  }
168  }
169 }
170 
172 {
173  struct rds_iw_send_work *send;
174  u32 i;
175 
176  for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
177  BUG_ON(!send->s_mr);
178  ib_dereg_mr(send->s_mr);
179  BUG_ON(!send->s_page_list);
181  if (send->s_wr.opcode == 0xdead)
182  continue;
183  if (send->s_rm)
184  rds_iw_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
185  if (send->s_op)
186  rds_iw_send_unmap_rdma(ic, send->s_op);
187  }
188 }
189 
190 /*
191  * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
192  * operations performed in the send path. As the sender allocs and potentially
193  * unallocs the next free entry in the ring it doesn't alter which is
194  * the next to be freed, which is what this is concerned with.
195  */
197 {
198  struct rds_connection *conn = context;
199  struct rds_iw_connection *ic = conn->c_transport_data;
200  struct ib_wc wc;
201  struct rds_iw_send_work *send;
202  u32 completed;
203  u32 oldest;
204  u32 i;
205  int ret;
206 
207  rdsdebug("cq %p conn %p\n", cq, conn);
208  rds_iw_stats_inc(s_iw_tx_cq_call);
209  ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
210  if (ret)
211  rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
212 
213  while (ib_poll_cq(cq, 1, &wc) > 0) {
214  rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
215  (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
216  be32_to_cpu(wc.ex.imm_data));
217  rds_iw_stats_inc(s_iw_tx_cq_event);
218 
219  if (wc.status != IB_WC_SUCCESS) {
220  printk(KERN_ERR "WC Error: status = %d opcode = %d\n", wc.status, wc.opcode);
221  break;
222  }
223 
224  if (wc.opcode == IB_WC_LOCAL_INV && wc.wr_id == RDS_IW_LOCAL_INV_WR_ID) {
225  ic->i_fastreg_posted = 0;
226  continue;
227  }
228 
230  ic->i_fastreg_posted = 1;
231  continue;
232  }
233 
234  if (wc.wr_id == RDS_IW_ACK_WR_ID) {
235  if (ic->i_ack_queued + HZ/2 < jiffies)
236  rds_iw_stats_inc(s_iw_tx_stalled);
238  continue;
239  }
240 
241  oldest = rds_iw_ring_oldest(&ic->i_send_ring);
242 
243  completed = rds_iw_ring_completed(&ic->i_send_ring, wc.wr_id, oldest);
244 
245  for (i = 0; i < completed; i++) {
246  send = &ic->i_sends[oldest];
247 
248  /* In the error case, wc.opcode sometimes contains garbage */
249  switch (send->s_wr.opcode) {
250  case IB_WR_SEND:
251  if (send->s_rm)
252  rds_iw_send_unmap_rm(ic, send, wc.status);
253  break;
254  case IB_WR_FAST_REG_MR:
255  case IB_WR_RDMA_WRITE:
256  case IB_WR_RDMA_READ:
258  /* Nothing to be done - the SG list will be unmapped
259  * when the SEND completes. */
260  break;
261  default:
263  "RDS/IW: %s: unexpected opcode 0x%x in WR!\n",
264  __func__, send->s_wr.opcode);
265  break;
266  }
267 
268  send->s_wr.opcode = 0xdead;
269  send->s_wr.num_sge = 1;
270  if (send->s_queued + HZ/2 < jiffies)
271  rds_iw_stats_inc(s_iw_tx_stalled);
272 
273  /* If a RDMA operation produced an error, signal this right
274  * away. If we don't, the subsequent SEND that goes with this
275  * RDMA will be canceled with ERR_WFLUSH, and the application
276  * never learn that the RDMA failed. */
277  if (unlikely(wc.status == IB_WC_REM_ACCESS_ERR && send->s_op)) {
278  struct rds_message *rm;
279 
280  rm = rds_send_get_message(conn, send->s_op);
281  if (rm)
282  rds_iw_send_rdma_complete(rm, wc.status);
283  }
284 
285  oldest = (oldest + 1) % ic->i_send_ring.w_nr;
286  }
287 
288  rds_iw_ring_free(&ic->i_send_ring, completed);
289 
291  test_bit(0, &conn->c_map_queued))
292  queue_delayed_work(rds_wq, &conn->c_send_w, 0);
293 
294  /* We expect errors as the qp is drained during shutdown */
295  if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) {
296  rds_iw_conn_error(conn,
297  "send completion on %pI4 "
298  "had status %u, disconnecting and reconnecting\n",
299  &conn->c_faddr, wc.status);
300  }
301  }
302 }
303 
304 /*
305  * This is the main function for allocating credits when sending
306  * messages.
307  *
308  * Conceptually, we have two counters:
309  * - send credits: this tells us how many WRs we're allowed
310  * to submit without overruning the receiver's queue. For
311  * each SEND WR we post, we decrement this by one.
312  *
313  * - posted credits: this tells us how many WRs we recently
314  * posted to the receive queue. This value is transferred
315  * to the peer as a "credit update" in a RDS header field.
316  * Every time we transmit credits to the peer, we subtract
317  * the amount of transferred credits from this counter.
318  *
319  * It is essential that we avoid situations where both sides have
320  * exhausted their send credits, and are unable to send new credits
321  * to the peer. We achieve this by requiring that we send at least
322  * one credit update to the peer before exhausting our credits.
323  * When new credits arrive, we subtract one credit that is withheld
324  * until we've posted new buffers and are ready to transmit these
325  * credits (see rds_iw_send_add_credits below).
326  *
327  * The RDS send code is essentially single-threaded; rds_send_xmit
328  * grabs c_send_lock to ensure exclusive access to the send ring.
329  * However, the ACK sending code is independent and can race with
330  * message SENDs.
331  *
332  * In the send path, we need to update the counters for send credits
333  * and the counter of posted buffers atomically - when we use the
334  * last available credit, we cannot allow another thread to race us
335  * and grab the posted credits counter. Hence, we have to use a
336  * spinlock to protect the credit counter, or use atomics.
337  *
338  * Spinlocks shared between the send and the receive path are bad,
339  * because they create unnecessary delays. An early implementation
340  * using a spinlock showed a 5% degradation in throughput at some
341  * loads.
342  *
343  * This implementation avoids spinlocks completely, putting both
344  * counters into a single atomic, and updating that atomic using
345  * atomic_add (in the receive path, when receiving fresh credits),
346  * and using atomic_cmpxchg when updating the two counters.
347  */
349  u32 wanted, u32 *adv_credits, int need_posted, int max_posted)
350 {
351  unsigned int avail, posted, got = 0, advertise;
352  long oldval, newval;
353 
354  *adv_credits = 0;
355  if (!ic->i_flowctl)
356  return wanted;
357 
358 try_again:
359  advertise = 0;
360  oldval = newval = atomic_read(&ic->i_credits);
361  posted = IB_GET_POST_CREDITS(oldval);
362  avail = IB_GET_SEND_CREDITS(oldval);
363 
364  rdsdebug("rds_iw_send_grab_credits(%u): credits=%u posted=%u\n",
365  wanted, avail, posted);
366 
367  /* The last credit must be used to send a credit update. */
368  if (avail && !posted)
369  avail--;
370 
371  if (avail < wanted) {
372  struct rds_connection *conn = ic->i_cm_id->context;
373 
374  /* Oops, there aren't that many credits left! */
376  got = avail;
377  } else {
378  /* Sometimes you get what you want, lalala. */
379  got = wanted;
380  }
381  newval -= IB_SET_SEND_CREDITS(got);
382 
383  /*
384  * If need_posted is non-zero, then the caller wants
385  * the posted regardless of whether any send credits are
386  * available.
387  */
388  if (posted && (got || need_posted)) {
389  advertise = min_t(unsigned int, posted, max_posted);
390  newval -= IB_SET_POST_CREDITS(advertise);
391  }
392 
393  /* Finally bill everything */
394  if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval)
395  goto try_again;
396 
397  *adv_credits = advertise;
398  return got;
399 }
400 
401 void rds_iw_send_add_credits(struct rds_connection *conn, unsigned int credits)
402 {
403  struct rds_iw_connection *ic = conn->c_transport_data;
404 
405  if (credits == 0)
406  return;
407 
408  rdsdebug("rds_iw_send_add_credits(%u): current=%u%s\n",
409  credits,
411  test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ? ", ll_send_full" : "");
412 
413  atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
415  queue_delayed_work(rds_wq, &conn->c_send_w, 0);
416 
417  WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
418 
419  rds_iw_stats_inc(s_iw_rx_credit_updates);
420 }
421 
422 void rds_iw_advertise_credits(struct rds_connection *conn, unsigned int posted)
423 {
424  struct rds_iw_connection *ic = conn->c_transport_data;
425 
426  if (posted == 0)
427  return;
428 
430 
431  /* Decide whether to send an update to the peer now.
432  * If we would send a credit update for every single buffer we
433  * post, we would end up with an ACK storm (ACK arrives,
434  * consumes buffer, we refill the ring, send ACK to remote
435  * advertising the newly posted buffer... ad inf)
436  *
437  * Performance pretty much depends on how often we send
438  * credit updates - too frequent updates mean lots of ACKs.
439  * Too infrequent updates, and the peer will run out of
440  * credits and has to throttle.
441  * For the time being, 16 seems to be a good compromise.
442  */
443  if (IB_GET_POST_CREDITS(atomic_read(&ic->i_credits)) >= 16)
445 }
446 
447 static inline void
448 rds_iw_xmit_populate_wr(struct rds_iw_connection *ic,
449  struct rds_iw_send_work *send, unsigned int pos,
450  unsigned long buffer, unsigned int length,
451  int send_flags)
452 {
453  struct ib_sge *sge;
454 
455  WARN_ON(pos != send - ic->i_sends);
456 
457  send->s_wr.send_flags = send_flags;
458  send->s_wr.opcode = IB_WR_SEND;
459  send->s_wr.num_sge = 2;
460  send->s_wr.next = NULL;
461  send->s_queued = jiffies;
462  send->s_op = NULL;
463 
464  if (length != 0) {
465  sge = rds_iw_data_sge(ic, send->s_sge);
466  sge->addr = buffer;
467  sge->length = length;
468  sge->lkey = rds_iw_local_dma_lkey(ic);
469 
470  sge = rds_iw_header_sge(ic, send->s_sge);
471  } else {
472  /* We're sending a packet with no payload. There is only
473  * one SGE */
474  send->s_wr.num_sge = 1;
475  sge = &send->s_sge[0];
476  }
477 
478  sge->addr = ic->i_send_hdrs_dma + (pos * sizeof(struct rds_header));
479  sge->length = sizeof(struct rds_header);
480  sge->lkey = rds_iw_local_dma_lkey(ic);
481 }
482 
483 /*
484  * This can be called multiple times for a given message. The first time
485  * we see a message we map its scatterlist into the IB device so that
486  * we can provide that mapped address to the IB scatter gather entries
487  * in the IB work requests. We translate the scatterlist into a series
488  * of work requests that fragment the message. These work requests complete
489  * in order so we pass ownership of the message to the completion handler
490  * once we send the final fragment.
491  *
492  * The RDS core uses the c_send_lock to only enter this function once
493  * per connection. This makes sure that the tx ring alloc/unalloc pairs
494  * don't get out of sync and confuse the ring.
495  */
496 int rds_iw_xmit(struct rds_connection *conn, struct rds_message *rm,
497  unsigned int hdr_off, unsigned int sg, unsigned int off)
498 {
499  struct rds_iw_connection *ic = conn->c_transport_data;
500  struct ib_device *dev = ic->i_cm_id->device;
501  struct rds_iw_send_work *send = NULL;
502  struct rds_iw_send_work *first;
503  struct rds_iw_send_work *prev;
504  struct ib_send_wr *failed_wr;
505  struct scatterlist *scat;
506  u32 pos;
507  u32 i;
508  u32 work_alloc;
509  u32 credit_alloc;
510  u32 posted;
511  u32 adv_credits = 0;
512  int send_flags = 0;
513  int sent;
514  int ret;
515  int flow_controlled = 0;
516 
517  BUG_ON(off % RDS_FRAG_SIZE);
518  BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
519 
520  /* Fastreg support */
521  if (rds_rdma_cookie_key(rm->m_rdma_cookie) && !ic->i_fastreg_posted) {
522  ret = -EAGAIN;
523  goto out;
524  }
525 
526  /* FIXME we may overallocate here */
527  if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
528  i = 1;
529  else
530  i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
531 
532  work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, i, &pos);
533  if (work_alloc == 0) {
535  rds_iw_stats_inc(s_iw_tx_ring_full);
536  ret = -ENOMEM;
537  goto out;
538  }
539 
540  credit_alloc = work_alloc;
541  if (ic->i_flowctl) {
542  credit_alloc = rds_iw_send_grab_credits(ic, work_alloc, &posted, 0, RDS_MAX_ADV_CREDIT);
543  adv_credits += posted;
544  if (credit_alloc < work_alloc) {
545  rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - credit_alloc);
546  work_alloc = credit_alloc;
547  flow_controlled++;
548  }
549  if (work_alloc == 0) {
551  rds_iw_stats_inc(s_iw_tx_throttle);
552  ret = -ENOMEM;
553  goto out;
554  }
555  }
556 
557  /* map the message the first time we see it */
558  if (!ic->i_rm) {
559  /*
560  printk(KERN_NOTICE "rds_iw_xmit prep msg dport=%u flags=0x%x len=%d\n",
561  be16_to_cpu(rm->m_inc.i_hdr.h_dport),
562  rm->m_inc.i_hdr.h_flags,
563  be32_to_cpu(rm->m_inc.i_hdr.h_len));
564  */
565  if (rm->data.op_nents) {
566  rm->data.op_count = ib_dma_map_sg(dev,
567  rm->data.op_sg,
568  rm->data.op_nents,
569  DMA_TO_DEVICE);
570  rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, rm->data.op_count);
571  if (rm->data.op_count == 0) {
572  rds_iw_stats_inc(s_iw_tx_sg_mapping_failure);
573  rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
574  ret = -ENOMEM; /* XXX ? */
575  goto out;
576  }
577  } else {
578  rm->data.op_count = 0;
579  }
580 
583  rds_message_addref(rm);
584  ic->i_rm = rm;
585 
586  /* Finalize the header */
588  rm->m_inc.i_hdr.h_flags |= RDS_FLAG_ACK_REQUIRED;
590  rm->m_inc.i_hdr.h_flags |= RDS_FLAG_RETRANSMITTED;
591 
592  /* If it has a RDMA op, tell the peer we did it. This is
593  * used by the peer to release use-once RDMA MRs. */
594  if (rm->rdma.op_active) {
595  struct rds_ext_header_rdma ext_hdr;
596 
597  ext_hdr.h_rdma_rkey = cpu_to_be32(rm->rdma.op_rkey);
598  rds_message_add_extension(&rm->m_inc.i_hdr,
599  RDS_EXTHDR_RDMA, &ext_hdr, sizeof(ext_hdr));
600  }
601  if (rm->m_rdma_cookie) {
603  rds_rdma_cookie_key(rm->m_rdma_cookie),
604  rds_rdma_cookie_offset(rm->m_rdma_cookie));
605  }
606 
607  /* Note - rds_iw_piggyb_ack clears the ACK_REQUIRED bit, so
608  * we should not do this unless we have a chance of at least
609  * sticking the header into the send ring. Which is why we
610  * should call rds_iw_ring_alloc first. */
611  rm->m_inc.i_hdr.h_ack = cpu_to_be64(rds_iw_piggyb_ack(ic));
612  rds_message_make_checksum(&rm->m_inc.i_hdr);
613 
614  /*
615  * Update adv_credits since we reset the ACK_REQUIRED bit.
616  */
617  rds_iw_send_grab_credits(ic, 0, &posted, 1, RDS_MAX_ADV_CREDIT - adv_credits);
618  adv_credits += posted;
619  BUG_ON(adv_credits > 255);
620  }
621 
622  send = &ic->i_sends[pos];
623  first = send;
624  prev = NULL;
625  scat = &rm->data.op_sg[sg];
626  sent = 0;
627  i = 0;
628 
629  /* Sometimes you want to put a fence between an RDMA
630  * READ and the following SEND.
631  * We could either do this all the time
632  * or when requested by the user. Right now, we let
633  * the application choose.
634  */
635  if (rm->rdma.op_active && rm->rdma.op_fence)
636  send_flags = IB_SEND_FENCE;
637 
638  /*
639  * We could be copying the header into the unused tail of the page.
640  * That would need to be changed in the future when those pages might
641  * be mapped userspace pages or page cache pages. So instead we always
642  * use a second sge and our long-lived ring of mapped headers. We send
643  * the header after the data so that the data payload can be aligned on
644  * the receiver.
645  */
646 
647  /* handle a 0-len message */
648  if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0) {
649  rds_iw_xmit_populate_wr(ic, send, pos, 0, 0, send_flags);
650  goto add_header;
651  }
652 
653  /* if there's data reference it with a chain of work reqs */
654  for (; i < work_alloc && scat != &rm->data.op_sg[rm->data.op_count]; i++) {
655  unsigned int len;
656 
657  send = &ic->i_sends[pos];
658 
659  len = min(RDS_FRAG_SIZE, ib_sg_dma_len(dev, scat) - off);
660  rds_iw_xmit_populate_wr(ic, send, pos,
661  ib_sg_dma_address(dev, scat) + off, len,
662  send_flags);
663 
664  /*
665  * We want to delay signaling completions just enough to get
666  * the batching benefits but not so much that we create dead time
667  * on the wire.
668  */
669  if (ic->i_unsignaled_wrs-- == 0) {
671  send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
672  }
673 
674  ic->i_unsignaled_bytes -= len;
675  if (ic->i_unsignaled_bytes <= 0) {
677  send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
678  }
679 
680  /*
681  * Always signal the last one if we're stopping due to flow control.
682  */
683  if (flow_controlled && i == (work_alloc-1))
684  send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
685 
686  rdsdebug("send %p wr %p num_sge %u next %p\n", send,
687  &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
688 
689  sent += len;
690  off += len;
691  if (off == ib_sg_dma_len(dev, scat)) {
692  scat++;
693  off = 0;
694  }
695 
696 add_header:
697  /* Tack on the header after the data. The header SGE should already
698  * have been set up to point to the right header buffer. */
699  memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header));
700 
701  if (0) {
702  struct rds_header *hdr = &ic->i_send_hdrs[pos];
703 
704  printk(KERN_NOTICE "send WR dport=%u flags=0x%x len=%d\n",
705  be16_to_cpu(hdr->h_dport),
706  hdr->h_flags,
707  be32_to_cpu(hdr->h_len));
708  }
709  if (adv_credits) {
710  struct rds_header *hdr = &ic->i_send_hdrs[pos];
711 
712  /* add credit and redo the header checksum */
713  hdr->h_credit = adv_credits;
714  rds_message_make_checksum(hdr);
715  adv_credits = 0;
716  rds_iw_stats_inc(s_iw_tx_credit_updates);
717  }
718 
719  if (prev)
720  prev->s_wr.next = &send->s_wr;
721  prev = send;
722 
723  pos = (pos + 1) % ic->i_send_ring.w_nr;
724  }
725 
726  /* Account the RDS header in the number of bytes we sent, but just once.
727  * The caller has no concept of fragmentation. */
728  if (hdr_off == 0)
729  sent += sizeof(struct rds_header);
730 
731  /* if we finished the message then send completion owns it */
732  if (scat == &rm->data.op_sg[rm->data.op_count]) {
733  prev->s_rm = ic->i_rm;
734  prev->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
735  ic->i_rm = NULL;
736  }
737 
738  if (i < work_alloc) {
739  rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - i);
740  work_alloc = i;
741  }
742  if (ic->i_flowctl && i < credit_alloc)
743  rds_iw_send_add_credits(conn, credit_alloc - i);
744 
745  /* XXX need to worry about failed_wr and partial sends. */
746  failed_wr = &first->s_wr;
747  ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
748  rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
749  first, &first->s_wr, ret, failed_wr);
750  BUG_ON(failed_wr != &first->s_wr);
751  if (ret) {
752  printk(KERN_WARNING "RDS/IW: ib_post_send to %pI4 "
753  "returned %d\n", &conn->c_faddr, ret);
754  rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
755  if (prev->s_rm) {
756  ic->i_rm = prev->s_rm;
757  prev->s_rm = NULL;
758  }
759  goto out;
760  }
761 
762  ret = sent;
763 out:
764  BUG_ON(adv_credits);
765  return ret;
766 }
767 
768 static void rds_iw_build_send_fastreg(struct rds_iw_device *rds_iwdev, struct rds_iw_connection *ic, struct rds_iw_send_work *send, int nent, int len, u64 sg_addr)
769 {
770  BUG_ON(nent > send->s_page_list->max_page_list_len);
771  /*
772  * Perform a WR for the fast_reg_mr. Each individual page
773  * in the sg list is added to the fast reg page list and placed
774  * inside the fast_reg_mr WR.
775  */
776  send->s_wr.opcode = IB_WR_FAST_REG_MR;
777  send->s_wr.wr.fast_reg.length = len;
778  send->s_wr.wr.fast_reg.rkey = send->s_mr->rkey;
779  send->s_wr.wr.fast_reg.page_list = send->s_page_list;
780  send->s_wr.wr.fast_reg.page_list_len = nent;
781  send->s_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
782  send->s_wr.wr.fast_reg.access_flags = IB_ACCESS_REMOTE_WRITE;
783  send->s_wr.wr.fast_reg.iova_start = sg_addr;
784 
785  ib_update_fast_reg_key(send->s_mr, send->s_remap_count++);
786 }
787 
788 int rds_iw_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
789 {
790  struct rds_iw_connection *ic = conn->c_transport_data;
791  struct rds_iw_send_work *send = NULL;
792  struct rds_iw_send_work *first;
793  struct rds_iw_send_work *prev;
794  struct ib_send_wr *failed_wr;
795  struct rds_iw_device *rds_iwdev;
796  struct scatterlist *scat;
797  unsigned long len;
798  u64 remote_addr = op->op_remote_addr;
799  u32 pos, fr_pos;
800  u32 work_alloc;
801  u32 i;
802  u32 j;
803  int sent;
804  int ret;
805  int num_sge;
806 
807  rds_iwdev = ib_get_client_data(ic->i_cm_id->device, &rds_iw_client);
808 
809  /* map the message the first time we see it */
810  if (!op->op_mapped) {
811  op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
812  op->op_sg, op->op_nents, (op->op_write) ?
814  rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->op_count);
815  if (op->op_count == 0) {
816  rds_iw_stats_inc(s_iw_tx_sg_mapping_failure);
817  ret = -ENOMEM; /* XXX ? */
818  goto out;
819  }
820 
821  op->op_mapped = 1;
822  }
823 
824  if (!op->op_write) {
825  /* Alloc space on the send queue for the fastreg */
826  work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, 1, &fr_pos);
827  if (work_alloc != 1) {
828  rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
829  rds_iw_stats_inc(s_iw_tx_ring_full);
830  ret = -ENOMEM;
831  goto out;
832  }
833  }
834 
835  /*
836  * Instead of knowing how to return a partial rdma read/write we insist that there
837  * be enough work requests to send the entire message.
838  */
839  i = ceil(op->op_count, rds_iwdev->max_sge);
840 
841  work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, i, &pos);
842  if (work_alloc != i) {
843  rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
844  rds_iw_stats_inc(s_iw_tx_ring_full);
845  ret = -ENOMEM;
846  goto out;
847  }
848 
849  send = &ic->i_sends[pos];
850  if (!op->op_write) {
851  first = prev = &ic->i_sends[fr_pos];
852  } else {
853  first = send;
854  prev = NULL;
855  }
856  scat = &op->op_sg[0];
857  sent = 0;
858  num_sge = op->op_count;
859 
860  for (i = 0; i < work_alloc && scat != &op->op_sg[op->op_count]; i++) {
861  send->s_wr.send_flags = 0;
862  send->s_queued = jiffies;
863 
864  /*
865  * We want to delay signaling completions just enough to get
866  * the batching benefits but not so much that we create dead time on the wire.
867  */
868  if (ic->i_unsignaled_wrs-- == 0) {
870  send->s_wr.send_flags = IB_SEND_SIGNALED;
871  }
872 
873  /* To avoid the need to have the plumbing to invalidate the fastreg_mr used
874  * for local access after RDS is finished with it, using
875  * IB_WR_RDMA_READ_WITH_INV will invalidate it after the read has completed.
876  */
877  if (op->op_write)
878  send->s_wr.opcode = IB_WR_RDMA_WRITE;
879  else
880  send->s_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
881 
882  send->s_wr.wr.rdma.remote_addr = remote_addr;
883  send->s_wr.wr.rdma.rkey = op->op_rkey;
884  send->s_op = op;
885 
886  if (num_sge > rds_iwdev->max_sge) {
887  send->s_wr.num_sge = rds_iwdev->max_sge;
888  num_sge -= rds_iwdev->max_sge;
889  } else
890  send->s_wr.num_sge = num_sge;
891 
892  send->s_wr.next = NULL;
893 
894  if (prev)
895  prev->s_wr.next = &send->s_wr;
896 
897  for (j = 0; j < send->s_wr.num_sge && scat != &op->op_sg[op->op_count]; j++) {
898  len = ib_sg_dma_len(ic->i_cm_id->device, scat);
899 
900  if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV)
901  send->s_page_list->page_list[j] = ib_sg_dma_address(ic->i_cm_id->device, scat);
902  else {
903  send->s_sge[j].addr = ib_sg_dma_address(ic->i_cm_id->device, scat);
904  send->s_sge[j].length = len;
905  send->s_sge[j].lkey = rds_iw_local_dma_lkey(ic);
906  }
907 
908  sent += len;
909  rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr);
910  remote_addr += len;
911 
912  scat++;
913  }
914 
915  if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV) {
916  send->s_wr.num_sge = 1;
917  send->s_sge[0].addr = conn->c_xmit_rm->m_rs->rs_user_addr;
918  send->s_sge[0].length = conn->c_xmit_rm->m_rs->rs_user_bytes;
919  send->s_sge[0].lkey = ic->i_sends[fr_pos].s_mr->lkey;
920  }
921 
922  rdsdebug("send %p wr %p num_sge %u next %p\n", send,
923  &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
924 
925  prev = send;
926  if (++send == &ic->i_sends[ic->i_send_ring.w_nr])
927  send = ic->i_sends;
928  }
929 
930  /* if we finished the message then send completion owns it */
931  if (scat == &op->op_sg[op->op_count])
932  first->s_wr.send_flags = IB_SEND_SIGNALED;
933 
934  if (i < work_alloc) {
935  rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - i);
936  work_alloc = i;
937  }
938 
939  /* On iWARP, local memory access by a remote system (ie, RDMA Read) is not
940  * recommended. Putting the lkey on the wire is a security hole, as it can
941  * allow for memory access to all of memory on the remote system. Some
942  * adapters do not allow using the lkey for this at all. To bypass this use a
943  * fastreg_mr (or possibly a dma_mr)
944  */
945  if (!op->op_write) {
946  rds_iw_build_send_fastreg(rds_iwdev, ic, &ic->i_sends[fr_pos],
947  op->op_count, sent, conn->c_xmit_rm->m_rs->rs_user_addr);
948  work_alloc++;
949  }
950 
951  failed_wr = &first->s_wr;
952  ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
953  rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
954  first, &first->s_wr, ret, failed_wr);
955  BUG_ON(failed_wr != &first->s_wr);
956  if (ret) {
957  printk(KERN_WARNING "RDS/IW: rdma ib_post_send to %pI4 "
958  "returned %d\n", &conn->c_faddr, ret);
959  rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
960  goto out;
961  }
962 
963 out:
964  return ret;
965 }
966 
968 {
969  struct rds_iw_connection *ic = conn->c_transport_data;
970 
971  /* We may have a pending ACK or window update we were unable
972  * to send previously (due to flow control). Try again. */
973  rds_iw_attempt_ack(ic);
974 }