Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
ib_recv.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2006 Oracle. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses. You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  * Redistribution and use in source and binary forms, with or
11  * without modification, are permitted provided that the following
12  * conditions are met:
13  *
14  * - Redistributions of source code must retain the above
15  * copyright notice, this list of conditions and the following
16  * disclaimer.
17  *
18  * - Redistributions in binary form must reproduce the above
19  * copyright notice, this list of conditions and the following
20  * disclaimer in the documentation and/or other materials
21  * provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  *
32  */
33 #include <linux/kernel.h>
34 #include <linux/slab.h>
35 #include <linux/pci.h>
36 #include <linux/dma-mapping.h>
37 #include <rdma/rdma_cm.h>
38 
39 #include "rds.h"
40 #include "ib.h"
41 
42 static struct kmem_cache *rds_ib_incoming_slab;
43 static struct kmem_cache *rds_ib_frag_slab;
44 static atomic_t rds_ib_allocation = ATOMIC_INIT(0);
45 
47 {
48  struct rds_ib_recv_work *recv;
49  u32 i;
50 
51  for (i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
52  struct ib_sge *sge;
53 
54  recv->r_ibinc = NULL;
55  recv->r_frag = NULL;
56 
57  recv->r_wr.next = NULL;
58  recv->r_wr.wr_id = i;
59  recv->r_wr.sg_list = recv->r_sge;
60  recv->r_wr.num_sge = RDS_IB_RECV_SGE;
61 
62  sge = &recv->r_sge[0];
63  sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header));
64  sge->length = sizeof(struct rds_header);
65  sge->lkey = ic->i_mr->lkey;
66 
67  sge = &recv->r_sge[1];
68  sge->addr = 0;
69  sge->length = RDS_FRAG_SIZE;
70  sge->lkey = ic->i_mr->lkey;
71  }
72 }
73 
74 /*
75  * The entire 'from' list, including the from element itself, is put on
76  * to the tail of the 'to' list.
77  */
78 static void list_splice_entire_tail(struct list_head *from,
79  struct list_head *to)
80 {
81  struct list_head *from_last = from->prev;
82 
83  list_splice_tail(from_last, to);
84  list_add_tail(from_last, to);
85 }
86 
87 static void rds_ib_cache_xfer_to_ready(struct rds_ib_refill_cache *cache)
88 {
89  struct list_head *tmp;
90 
91  tmp = xchg(&cache->xfer, NULL);
92  if (tmp) {
93  if (cache->ready)
94  list_splice_entire_tail(tmp, cache->ready);
95  else
96  cache->ready = tmp;
97  }
98 }
99 
100 static int rds_ib_recv_alloc_cache(struct rds_ib_refill_cache *cache)
101 {
102  struct rds_ib_cache_head *head;
103  int cpu;
104 
105  cache->percpu = alloc_percpu(struct rds_ib_cache_head);
106  if (!cache->percpu)
107  return -ENOMEM;
108 
109  for_each_possible_cpu(cpu) {
110  head = per_cpu_ptr(cache->percpu, cpu);
111  head->first = NULL;
112  head->count = 0;
113  }
114  cache->xfer = NULL;
115  cache->ready = NULL;
116 
117  return 0;
118 }
119 
121 {
122  int ret;
123 
124  ret = rds_ib_recv_alloc_cache(&ic->i_cache_incs);
125  if (!ret) {
126  ret = rds_ib_recv_alloc_cache(&ic->i_cache_frags);
127  if (ret)
128  free_percpu(ic->i_cache_incs.percpu);
129  }
130 
131  return ret;
132 }
133 
134 static void rds_ib_cache_splice_all_lists(struct rds_ib_refill_cache *cache,
135  struct list_head *caller_list)
136 {
137  struct rds_ib_cache_head *head;
138  int cpu;
139 
140  for_each_possible_cpu(cpu) {
141  head = per_cpu_ptr(cache->percpu, cpu);
142  if (head->first) {
143  list_splice_entire_tail(head->first, caller_list);
144  head->first = NULL;
145  }
146  }
147 
148  if (cache->ready) {
149  list_splice_entire_tail(cache->ready, caller_list);
150  cache->ready = NULL;
151  }
152 }
153 
155 {
156  struct rds_ib_incoming *inc;
157  struct rds_ib_incoming *inc_tmp;
158  struct rds_page_frag *frag;
159  struct rds_page_frag *frag_tmp;
160  LIST_HEAD(list);
161 
162  rds_ib_cache_xfer_to_ready(&ic->i_cache_incs);
163  rds_ib_cache_splice_all_lists(&ic->i_cache_incs, &list);
164  free_percpu(ic->i_cache_incs.percpu);
165 
166  list_for_each_entry_safe(inc, inc_tmp, &list, ii_cache_entry) {
167  list_del(&inc->ii_cache_entry);
168  WARN_ON(!list_empty(&inc->ii_frags));
169  kmem_cache_free(rds_ib_incoming_slab, inc);
170  }
171 
172  rds_ib_cache_xfer_to_ready(&ic->i_cache_frags);
173  rds_ib_cache_splice_all_lists(&ic->i_cache_frags, &list);
174  free_percpu(ic->i_cache_frags.percpu);
175 
176  list_for_each_entry_safe(frag, frag_tmp, &list, f_cache_entry) {
177  list_del(&frag->f_cache_entry);
178  WARN_ON(!list_empty(&frag->f_item));
179  kmem_cache_free(rds_ib_frag_slab, frag);
180  }
181 }
182 
183 /* fwd decl */
184 static void rds_ib_recv_cache_put(struct list_head *new_item,
185  struct rds_ib_refill_cache *cache);
186 static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache);
187 
188 
189 /* Recycle frag and attached recv buffer f_sg */
190 static void rds_ib_frag_free(struct rds_ib_connection *ic,
191  struct rds_page_frag *frag)
192 {
193  rdsdebug("frag %p page %p\n", frag, sg_page(&frag->f_sg));
194 
195  rds_ib_recv_cache_put(&frag->f_cache_entry, &ic->i_cache_frags);
196 }
197 
198 /* Recycle inc after freeing attached frags */
200 {
201  struct rds_ib_incoming *ibinc;
202  struct rds_page_frag *frag;
203  struct rds_page_frag *pos;
204  struct rds_ib_connection *ic = inc->i_conn->c_transport_data;
205 
206  ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
207 
208  /* Free attached frags */
209  list_for_each_entry_safe(frag, pos, &ibinc->ii_frags, f_item) {
210  list_del_init(&frag->f_item);
211  rds_ib_frag_free(ic, frag);
212  }
213  BUG_ON(!list_empty(&ibinc->ii_frags));
214 
215  rdsdebug("freeing ibinc %p inc %p\n", ibinc, inc);
216  rds_ib_recv_cache_put(&ibinc->ii_cache_entry, &ic->i_cache_incs);
217 }
218 
219 static void rds_ib_recv_clear_one(struct rds_ib_connection *ic,
220  struct rds_ib_recv_work *recv)
221 {
222  if (recv->r_ibinc) {
223  rds_inc_put(&recv->r_ibinc->ii_inc);
224  recv->r_ibinc = NULL;
225  }
226  if (recv->r_frag) {
227  ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE);
228  rds_ib_frag_free(ic, recv->r_frag);
229  recv->r_frag = NULL;
230  }
231 }
232 
234 {
235  u32 i;
236 
237  for (i = 0; i < ic->i_recv_ring.w_nr; i++)
238  rds_ib_recv_clear_one(ic, &ic->i_recvs[i]);
239 }
240 
241 static struct rds_ib_incoming *rds_ib_refill_one_inc(struct rds_ib_connection *ic,
242  gfp_t slab_mask)
243 {
244  struct rds_ib_incoming *ibinc;
245  struct list_head *cache_item;
246  int avail_allocs;
247 
248  cache_item = rds_ib_recv_cache_get(&ic->i_cache_incs);
249  if (cache_item) {
250  ibinc = container_of(cache_item, struct rds_ib_incoming, ii_cache_entry);
251  } else {
252  avail_allocs = atomic_add_unless(&rds_ib_allocation,
254  if (!avail_allocs) {
255  rds_ib_stats_inc(s_ib_rx_alloc_limit);
256  return NULL;
257  }
258  ibinc = kmem_cache_alloc(rds_ib_incoming_slab, slab_mask);
259  if (!ibinc) {
260  atomic_dec(&rds_ib_allocation);
261  return NULL;
262  }
263  }
264  INIT_LIST_HEAD(&ibinc->ii_frags);
265  rds_inc_init(&ibinc->ii_inc, ic->conn, ic->conn->c_faddr);
266 
267  return ibinc;
268 }
269 
270 static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic,
271  gfp_t slab_mask, gfp_t page_mask)
272 {
273  struct rds_page_frag *frag;
274  struct list_head *cache_item;
275  int ret;
276 
277  cache_item = rds_ib_recv_cache_get(&ic->i_cache_frags);
278  if (cache_item) {
279  frag = container_of(cache_item, struct rds_page_frag, f_cache_entry);
280  } else {
281  frag = kmem_cache_alloc(rds_ib_frag_slab, slab_mask);
282  if (!frag)
283  return NULL;
284 
285  sg_init_table(&frag->f_sg, 1);
286  ret = rds_page_remainder_alloc(&frag->f_sg,
287  RDS_FRAG_SIZE, page_mask);
288  if (ret) {
289  kmem_cache_free(rds_ib_frag_slab, frag);
290  return NULL;
291  }
292  }
293 
294  INIT_LIST_HEAD(&frag->f_item);
295 
296  return frag;
297 }
298 
299 static int rds_ib_recv_refill_one(struct rds_connection *conn,
300  struct rds_ib_recv_work *recv, int prefill)
301 {
302  struct rds_ib_connection *ic = conn->c_transport_data;
303  struct ib_sge *sge;
304  int ret = -ENOMEM;
305  gfp_t slab_mask = GFP_NOWAIT;
306  gfp_t page_mask = GFP_NOWAIT;
307 
308  if (prefill) {
309  slab_mask = GFP_KERNEL;
310  page_mask = GFP_HIGHUSER;
311  }
312 
313  if (!ic->i_cache_incs.ready)
314  rds_ib_cache_xfer_to_ready(&ic->i_cache_incs);
315  if (!ic->i_cache_frags.ready)
316  rds_ib_cache_xfer_to_ready(&ic->i_cache_frags);
317 
318  /*
319  * ibinc was taken from recv if recv contained the start of a message.
320  * recvs that were continuations will still have this allocated.
321  */
322  if (!recv->r_ibinc) {
323  recv->r_ibinc = rds_ib_refill_one_inc(ic, slab_mask);
324  if (!recv->r_ibinc)
325  goto out;
326  }
327 
328  WARN_ON(recv->r_frag); /* leak! */
329  recv->r_frag = rds_ib_refill_one_frag(ic, slab_mask, page_mask);
330  if (!recv->r_frag)
331  goto out;
332 
333  ret = ib_dma_map_sg(ic->i_cm_id->device, &recv->r_frag->f_sg,
334  1, DMA_FROM_DEVICE);
335  WARN_ON(ret != 1);
336 
337  sge = &recv->r_sge[0];
338  sge->addr = ic->i_recv_hdrs_dma + (recv - ic->i_recvs) * sizeof(struct rds_header);
339  sge->length = sizeof(struct rds_header);
340 
341  sge = &recv->r_sge[1];
342  sge->addr = sg_dma_address(&recv->r_frag->f_sg);
343  sge->length = sg_dma_len(&recv->r_frag->f_sg);
344 
345  ret = 0;
346 out:
347  return ret;
348 }
349 
350 /*
351  * This tries to allocate and post unused work requests after making sure that
352  * they have all the allocations they need to queue received fragments into
353  * sockets.
354  *
355  * -1 is returned if posting fails due to temporary resource exhaustion.
356  */
357 void rds_ib_recv_refill(struct rds_connection *conn, int prefill)
358 {
359  struct rds_ib_connection *ic = conn->c_transport_data;
360  struct rds_ib_recv_work *recv;
361  struct ib_recv_wr *failed_wr;
362  unsigned int posted = 0;
363  int ret = 0;
364  u32 pos;
365 
366  while ((prefill || rds_conn_up(conn)) &&
367  rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
368  if (pos >= ic->i_recv_ring.w_nr) {
369  printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n",
370  pos);
371  break;
372  }
373 
374  recv = &ic->i_recvs[pos];
375  ret = rds_ib_recv_refill_one(conn, recv, prefill);
376  if (ret) {
377  break;
378  }
379 
380  /* XXX when can this fail? */
381  ret = ib_post_recv(ic->i_cm_id->qp, &recv->r_wr, &failed_wr);
382  rdsdebug("recv %p ibinc %p page %p addr %lu ret %d\n", recv,
383  recv->r_ibinc, sg_page(&recv->r_frag->f_sg),
384  (long) sg_dma_address(&recv->r_frag->f_sg), ret);
385  if (ret) {
386  rds_ib_conn_error(conn, "recv post on "
387  "%pI4 returned %d, disconnecting and "
388  "reconnecting\n", &conn->c_faddr,
389  ret);
390  break;
391  }
392 
393  posted++;
394  }
395 
396  /* We're doing flow control - update the window. */
397  if (ic->i_flowctl && posted)
398  rds_ib_advertise_credits(conn, posted);
399 
400  if (ret)
402 }
403 
404 /*
405  * We want to recycle several types of recv allocations, like incs and frags.
406  * To use this, the *_free() function passes in the ptr to a list_head within
407  * the recyclee, as well as the cache to put it on.
408  *
409  * First, we put the memory on a percpu list. When this reaches a certain size,
410  * We move it to an intermediate non-percpu list in a lockless manner, with some
411  * xchg/compxchg wizardry.
412  *
413  * N.B. Instead of a list_head as the anchor, we use a single pointer, which can
414  * be NULL and xchg'd. The list is actually empty when the pointer is NULL, and
415  * list_empty() will return true with one element is actually present.
416  */
417 static void rds_ib_recv_cache_put(struct list_head *new_item,
418  struct rds_ib_refill_cache *cache)
419 {
420  unsigned long flags;
421  struct rds_ib_cache_head *chp;
422  struct list_head *old;
423 
424  local_irq_save(flags);
425 
426  chp = per_cpu_ptr(cache->percpu, smp_processor_id());
427  if (!chp->first)
428  INIT_LIST_HEAD(new_item);
429  else /* put on front */
430  list_add_tail(new_item, chp->first);
431  chp->first = new_item;
432  chp->count++;
433 
435  goto end;
436 
437  /*
438  * Return our per-cpu first list to the cache's xfer by atomically
439  * grabbing the current xfer list, appending it to our per-cpu list,
440  * and then atomically returning that entire list back to the
441  * cache's xfer list as long as it's still empty.
442  */
443  do {
444  old = xchg(&cache->xfer, NULL);
445  if (old)
446  list_splice_entire_tail(old, chp->first);
447  old = cmpxchg(&cache->xfer, NULL, chp->first);
448  } while (old);
449 
450  chp->first = NULL;
451  chp->count = 0;
452 end:
453  local_irq_restore(flags);
454 }
455 
456 static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache)
457 {
458  struct list_head *head = cache->ready;
459 
460  if (head) {
461  if (!list_empty(head)) {
462  cache->ready = head->next;
463  list_del_init(head);
464  } else
465  cache->ready = NULL;
466  }
467 
468  return head;
469 }
470 
471 int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov,
472  size_t size)
473 {
474  struct rds_ib_incoming *ibinc;
475  struct rds_page_frag *frag;
476  struct iovec *iov = first_iov;
477  unsigned long to_copy;
478  unsigned long frag_off = 0;
479  unsigned long iov_off = 0;
480  int copied = 0;
481  int ret;
482  u32 len;
483 
484  ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
485  frag = list_entry(ibinc->ii_frags.next, struct rds_page_frag, f_item);
486  len = be32_to_cpu(inc->i_hdr.h_len);
487 
488  while (copied < size && copied < len) {
489  if (frag_off == RDS_FRAG_SIZE) {
490  frag = list_entry(frag->f_item.next,
491  struct rds_page_frag, f_item);
492  frag_off = 0;
493  }
494  while (iov_off == iov->iov_len) {
495  iov_off = 0;
496  iov++;
497  }
498 
499  to_copy = min(iov->iov_len - iov_off, RDS_FRAG_SIZE - frag_off);
500  to_copy = min_t(size_t, to_copy, size - copied);
501  to_copy = min_t(unsigned long, to_copy, len - copied);
502 
503  rdsdebug("%lu bytes to user [%p, %zu] + %lu from frag "
504  "[%p, %u] + %lu\n",
505  to_copy, iov->iov_base, iov->iov_len, iov_off,
506  sg_page(&frag->f_sg), frag->f_sg.offset, frag_off);
507 
508  /* XXX needs + offset for multiple recvs per page */
509  ret = rds_page_copy_to_user(sg_page(&frag->f_sg),
510  frag->f_sg.offset + frag_off,
511  iov->iov_base + iov_off,
512  to_copy);
513  if (ret) {
514  copied = ret;
515  break;
516  }
517 
518  iov_off += to_copy;
519  frag_off += to_copy;
520  copied += to_copy;
521  }
522 
523  return copied;
524 }
525 
526 /* ic starts out kzalloc()ed */
528 {
529  struct ib_send_wr *wr = &ic->i_ack_wr;
530  struct ib_sge *sge = &ic->i_ack_sge;
531 
532  sge->addr = ic->i_ack_dma;
533  sge->length = sizeof(struct rds_header);
534  sge->lkey = ic->i_mr->lkey;
535 
536  wr->sg_list = sge;
537  wr->num_sge = 1;
538  wr->opcode = IB_WR_SEND;
539  wr->wr_id = RDS_IB_ACK_WR_ID;
541 }
542 
543 /*
544  * You'd think that with reliable IB connections you wouldn't need to ack
545  * messages that have been received. The problem is that IB hardware generates
546  * an ack message before it has DMAed the message into memory. This creates a
547  * potential message loss if the HCA is disabled for any reason between when it
548  * sends the ack and before the message is DMAed and processed. This is only a
549  * potential issue if another HCA is available for fail-over.
550  *
551  * When the remote host receives our ack they'll free the sent message from
552  * their send queue. To decrease the latency of this we always send an ack
553  * immediately after we've received messages.
554  *
555  * For simplicity, we only have one ack in flight at a time. This puts
556  * pressure on senders to have deep enough send queues to absorb the latency of
557  * a single ack frame being in flight. This might not be good enough.
558  *
559  * This is implemented by have a long-lived send_wr and sge which point to a
560  * statically allocated ack frame. This ack wr does not fall under the ring
561  * accounting that the tx and rx wrs do. The QP attribute specifically makes
562  * room for it beyond the ring size. Send completion notices its special
563  * wr_id and avoids working with the ring in that case.
564  */
565 #ifndef KERNEL_HAS_ATOMIC64
566 static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
567  int ack_required)
568 {
569  unsigned long flags;
570 
571  spin_lock_irqsave(&ic->i_ack_lock, flags);
572  ic->i_ack_next = seq;
573  if (ack_required)
575  spin_unlock_irqrestore(&ic->i_ack_lock, flags);
576 }
577 
578 static u64 rds_ib_get_ack(struct rds_ib_connection *ic)
579 {
580  unsigned long flags;
581  u64 seq;
582 
584 
585  spin_lock_irqsave(&ic->i_ack_lock, flags);
586  seq = ic->i_ack_next;
587  spin_unlock_irqrestore(&ic->i_ack_lock, flags);
588 
589  return seq;
590 }
591 #else
592 static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
593  int ack_required)
594 {
595  atomic64_set(&ic->i_ack_next, seq);
596  if (ack_required) {
599  }
600 }
601 
602 static u64 rds_ib_get_ack(struct rds_ib_connection *ic)
603 {
606 
607  return atomic64_read(&ic->i_ack_next);
608 }
609 #endif
610 
611 
612 static void rds_ib_send_ack(struct rds_ib_connection *ic, unsigned int adv_credits)
613 {
614  struct rds_header *hdr = ic->i_ack;
615  struct ib_send_wr *failed_wr;
616  u64 seq;
617  int ret;
618 
619  seq = rds_ib_get_ack(ic);
620 
621  rdsdebug("send_ack: ic %p ack %llu\n", ic, (unsigned long long) seq);
622  rds_message_populate_header(hdr, 0, 0, 0);
623  hdr->h_ack = cpu_to_be64(seq);
624  hdr->h_credit = adv_credits;
625  rds_message_make_checksum(hdr);
626  ic->i_ack_queued = jiffies;
627 
628  ret = ib_post_send(ic->i_cm_id->qp, &ic->i_ack_wr, &failed_wr);
629  if (unlikely(ret)) {
630  /* Failed to send. Release the WR, and
631  * force another ACK.
632  */
635 
636  rds_ib_stats_inc(s_ib_ack_send_failure);
637 
638  rds_ib_conn_error(ic->conn, "sending ack failed\n");
639  } else
640  rds_ib_stats_inc(s_ib_ack_sent);
641 }
642 
643 /*
644  * There are 3 ways of getting acknowledgements to the peer:
645  * 1. We call rds_ib_attempt_ack from the recv completion handler
646  * to send an ACK-only frame.
647  * However, there can be only one such frame in the send queue
648  * at any time, so we may have to postpone it.
649  * 2. When another (data) packet is transmitted while there's
650  * an ACK in the queue, we piggyback the ACK sequence number
651  * on the data packet.
652  * 3. If the ACK WR is done sending, we get called from the
653  * send queue completion handler, and check whether there's
654  * another ACK pending (postponed because the WR was on the
655  * queue). If so, we transmit it.
656  *
657  * We maintain 2 variables:
658  * - i_ack_flags, which keeps track of whether the ACK WR
659  * is currently in the send queue or not (IB_ACK_IN_FLIGHT)
660  * - i_ack_next, which is the last sequence number we received
661  *
662  * Potentially, send queue and receive queue handlers can run concurrently.
663  * It would be nice to not have to use a spinlock to synchronize things,
664  * but the one problem that rules this out is that 64bit updates are
665  * not atomic on all platforms. Things would be a lot simpler if
666  * we had atomic64 or maybe cmpxchg64 everywhere.
667  *
668  * Reconnecting complicates this picture just slightly. When we
669  * reconnect, we may be seeing duplicate packets. The peer
670  * is retransmitting them, because it hasn't seen an ACK for
671  * them. It is important that we ACK these.
672  *
673  * ACK mitigation adds a header flag "ACK_REQUIRED"; any packet with
674  * this flag set *MUST* be acknowledged immediately.
675  */
676 
677 /*
678  * When we get here, we're called from the recv queue handler.
679  * Check whether we ought to transmit an ACK.
680  */
682 {
683  unsigned int adv_credits;
684 
686  return;
687 
689  rds_ib_stats_inc(s_ib_ack_send_delayed);
690  return;
691  }
692 
693  /* Can we get a send credit? */
694  if (!rds_ib_send_grab_credits(ic, 1, &adv_credits, 0, RDS_MAX_ADV_CREDIT)) {
695  rds_ib_stats_inc(s_ib_tx_throttle);
697  return;
698  }
699 
701  rds_ib_send_ack(ic, adv_credits);
702 }
703 
704 /*
705  * We get here from the send completion handler, when the
706  * adapter tells us the ACK frame was sent.
707  */
709 {
711  rds_ib_attempt_ack(ic);
712 }
713 
714 /*
715  * This is called by the regular xmit code when it wants to piggyback
716  * an ACK on an outgoing frame.
717  */
719 {
721  rds_ib_stats_inc(s_ib_ack_send_piggybacked);
722  return rds_ib_get_ack(ic);
723 }
724 
725 /*
726  * It's kind of lame that we're copying from the posted receive pages into
727  * long-lived bitmaps. We could have posted the bitmaps and rdma written into
728  * them. But receiving new congestion bitmaps should be a *rare* event, so
729  * hopefully we won't need to invest that complexity in making it more
730  * efficient. By copying we can share a simpler core with TCP which has to
731  * copy.
732  */
733 static void rds_ib_cong_recv(struct rds_connection *conn,
734  struct rds_ib_incoming *ibinc)
735 {
736  struct rds_cong_map *map;
737  unsigned int map_off;
738  unsigned int map_page;
739  struct rds_page_frag *frag;
740  unsigned long frag_off;
741  unsigned long to_copy;
742  unsigned long copied;
743  uint64_t uncongested = 0;
744  void *addr;
745 
746  /* catch completely corrupt packets */
747  if (be32_to_cpu(ibinc->ii_inc.i_hdr.h_len) != RDS_CONG_MAP_BYTES)
748  return;
749 
750  map = conn->c_fcong;
751  map_page = 0;
752  map_off = 0;
753 
754  frag = list_entry(ibinc->ii_frags.next, struct rds_page_frag, f_item);
755  frag_off = 0;
756 
757  copied = 0;
758 
759  while (copied < RDS_CONG_MAP_BYTES) {
760  uint64_t *src, *dst;
761  unsigned int k;
762 
763  to_copy = min(RDS_FRAG_SIZE - frag_off, PAGE_SIZE - map_off);
764  BUG_ON(to_copy & 7); /* Must be 64bit aligned. */
765 
766  addr = kmap_atomic(sg_page(&frag->f_sg));
767 
768  src = addr + frag_off;
769  dst = (void *)map->m_page_addrs[map_page] + map_off;
770  for (k = 0; k < to_copy; k += 8) {
771  /* Record ports that became uncongested, ie
772  * bits that changed from 0 to 1. */
773  uncongested |= ~(*src) & *dst;
774  *dst++ = *src++;
775  }
776  kunmap_atomic(addr);
777 
778  copied += to_copy;
779 
780  map_off += to_copy;
781  if (map_off == PAGE_SIZE) {
782  map_off = 0;
783  map_page++;
784  }
785 
786  frag_off += to_copy;
787  if (frag_off == RDS_FRAG_SIZE) {
788  frag = list_entry(frag->f_item.next,
789  struct rds_page_frag, f_item);
790  frag_off = 0;
791  }
792  }
793 
794  /* the congestion map is in little endian order */
795  uncongested = le64_to_cpu(uncongested);
796 
797  rds_cong_map_updated(map, uncongested);
798 }
799 
800 /*
801  * Rings are posted with all the allocations they'll need to queue the
802  * incoming message to the receiving socket so this can't fail.
803  * All fragments start with a header, so we can make sure we're not receiving
804  * garbage, and we can tell a small 8 byte fragment from an ACK frame.
805  */
809  unsigned int ack_required:1;
810  unsigned int ack_next_valid:1;
811  unsigned int ack_recv_valid:1;
812 };
813 
814 static void rds_ib_process_recv(struct rds_connection *conn,
815  struct rds_ib_recv_work *recv, u32 data_len,
816  struct rds_ib_ack_state *state)
817 {
818  struct rds_ib_connection *ic = conn->c_transport_data;
819  struct rds_ib_incoming *ibinc = ic->i_ibinc;
820  struct rds_header *ihdr, *hdr;
821 
822  /* XXX shut down the connection if port 0,0 are seen? */
823 
824  rdsdebug("ic %p ibinc %p recv %p byte len %u\n", ic, ibinc, recv,
825  data_len);
826 
827  if (data_len < sizeof(struct rds_header)) {
828  rds_ib_conn_error(conn, "incoming message "
829  "from %pI4 didn't include a "
830  "header, disconnecting and "
831  "reconnecting\n",
832  &conn->c_faddr);
833  return;
834  }
835  data_len -= sizeof(struct rds_header);
836 
837  ihdr = &ic->i_recv_hdrs[recv - ic->i_recvs];
838 
839  /* Validate the checksum. */
840  if (!rds_message_verify_checksum(ihdr)) {
841  rds_ib_conn_error(conn, "incoming message "
842  "from %pI4 has corrupted header - "
843  "forcing a reconnect\n",
844  &conn->c_faddr);
845  rds_stats_inc(s_recv_drop_bad_checksum);
846  return;
847  }
848 
849  /* Process the ACK sequence which comes with every packet */
850  state->ack_recv = be64_to_cpu(ihdr->h_ack);
851  state->ack_recv_valid = 1;
852 
853  /* Process the credits update if there was one */
854  if (ihdr->h_credit)
855  rds_ib_send_add_credits(conn, ihdr->h_credit);
856 
857  if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && data_len == 0) {
858  /* This is an ACK-only packet. The fact that it gets
859  * special treatment here is that historically, ACKs
860  * were rather special beasts.
861  */
862  rds_ib_stats_inc(s_ib_ack_received);
863 
864  /*
865  * Usually the frags make their way on to incs and are then freed as
866  * the inc is freed. We don't go that route, so we have to drop the
867  * page ref ourselves. We can't just leave the page on the recv
868  * because that confuses the dma mapping of pages and each recv's use
869  * of a partial page.
870  *
871  * FIXME: Fold this into the code path below.
872  */
873  rds_ib_frag_free(ic, recv->r_frag);
874  recv->r_frag = NULL;
875  return;
876  }
877 
878  /*
879  * If we don't already have an inc on the connection then this
880  * fragment has a header and starts a message.. copy its header
881  * into the inc and save the inc so we can hang upcoming fragments
882  * off its list.
883  */
884  if (!ibinc) {
885  ibinc = recv->r_ibinc;
886  recv->r_ibinc = NULL;
887  ic->i_ibinc = ibinc;
888 
889  hdr = &ibinc->ii_inc.i_hdr;
890  memcpy(hdr, ihdr, sizeof(*hdr));
891  ic->i_recv_data_rem = be32_to_cpu(hdr->h_len);
892 
893  rdsdebug("ic %p ibinc %p rem %u flag 0x%x\n", ic, ibinc,
894  ic->i_recv_data_rem, hdr->h_flags);
895  } else {
896  hdr = &ibinc->ii_inc.i_hdr;
897  /* We can't just use memcmp here; fragments of a
898  * single message may carry different ACKs */
899  if (hdr->h_sequence != ihdr->h_sequence ||
900  hdr->h_len != ihdr->h_len ||
901  hdr->h_sport != ihdr->h_sport ||
902  hdr->h_dport != ihdr->h_dport) {
903  rds_ib_conn_error(conn,
904  "fragment header mismatch; forcing reconnect\n");
905  return;
906  }
907  }
908 
909  list_add_tail(&recv->r_frag->f_item, &ibinc->ii_frags);
910  recv->r_frag = NULL;
911 
912  if (ic->i_recv_data_rem > RDS_FRAG_SIZE)
914  else {
915  ic->i_recv_data_rem = 0;
916  ic->i_ibinc = NULL;
917 
918  if (ibinc->ii_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
919  rds_ib_cong_recv(conn, ibinc);
920  else {
921  rds_recv_incoming(conn, conn->c_faddr, conn->c_laddr,
922  &ibinc->ii_inc, GFP_ATOMIC);
923  state->ack_next = be64_to_cpu(hdr->h_sequence);
924  state->ack_next_valid = 1;
925  }
926 
927  /* Evaluate the ACK_REQUIRED flag *after* we received
928  * the complete frame, and after bumping the next_rx
929  * sequence. */
930  if (hdr->h_flags & RDS_FLAG_ACK_REQUIRED) {
931  rds_stats_inc(s_recv_ack_required);
932  state->ack_required = 1;
933  }
934 
935  rds_inc_put(&ibinc->ii_inc);
936  }
937 }
938 
939 /*
940  * Plucking the oldest entry from the ring can be done concurrently with
941  * the thread refilling the ring. Each ring operation is protected by
942  * spinlocks and the transient state of refilling doesn't change the
943  * recording of which entry is oldest.
944  *
945  * This relies on IB only calling one cq comp_handler for each cq so that
946  * there will only be one caller of rds_recv_incoming() per RDS connection.
947  */
949 {
950  struct rds_connection *conn = context;
951  struct rds_ib_connection *ic = conn->c_transport_data;
952 
953  rdsdebug("conn %p cq %p\n", conn, cq);
954 
955  rds_ib_stats_inc(s_ib_rx_cq_call);
956 
957  tasklet_schedule(&ic->i_recv_tasklet);
958 }
959 
960 static inline void rds_poll_cq(struct rds_ib_connection *ic,
961  struct rds_ib_ack_state *state)
962 {
963  struct rds_connection *conn = ic->conn;
964  struct ib_wc wc;
965  struct rds_ib_recv_work *recv;
966 
967  while (ib_poll_cq(ic->i_recv_cq, 1, &wc) > 0) {
968  rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
969  (unsigned long long)wc.wr_id, wc.status,
970  rds_ib_wc_status_str(wc.status), wc.byte_len,
971  be32_to_cpu(wc.ex.imm_data));
972  rds_ib_stats_inc(s_ib_rx_cq_event);
973 
974  recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
975 
976  ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE);
977 
978  /*
979  * Also process recvs in connecting state because it is possible
980  * to get a recv completion _before_ the rdmacm ESTABLISHED
981  * event is processed.
982  */
983  if (wc.status == IB_WC_SUCCESS) {
984  rds_ib_process_recv(conn, recv, wc.byte_len, state);
985  } else {
986  /* We expect errors as the qp is drained during shutdown */
987  if (rds_conn_up(conn) || rds_conn_connecting(conn))
988  rds_ib_conn_error(conn, "recv completion on %pI4 had "
989  "status %u (%s), disconnecting and "
990  "reconnecting\n", &conn->c_faddr,
991  wc.status,
992  rds_ib_wc_status_str(wc.status));
993  }
994 
995  /*
996  * It's very important that we only free this ring entry if we've truly
997  * freed the resources allocated to the entry. The refilling path can
998  * leak if we don't.
999  */
1000  rds_ib_ring_free(&ic->i_recv_ring, 1);
1001  }
1002 }
1003 
1004 void rds_ib_recv_tasklet_fn(unsigned long data)
1005 {
1006  struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
1007  struct rds_connection *conn = ic->conn;
1008  struct rds_ib_ack_state state = { 0, };
1009 
1010  rds_poll_cq(ic, &state);
1011  ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
1012  rds_poll_cq(ic, &state);
1013 
1014  if (state.ack_next_valid)
1015  rds_ib_set_ack(ic, state.ack_next, state.ack_required);
1016  if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
1017  rds_send_drop_acked(conn, state.ack_recv, NULL);
1018  ic->i_ack_recv = state.ack_recv;
1019  }
1020  if (rds_conn_up(conn))
1021  rds_ib_attempt_ack(ic);
1022 
1023  /* If we ever end up with a really empty receive ring, we're
1024  * in deep trouble, as the sender will definitely see RNR
1025  * timeouts. */
1026  if (rds_ib_ring_empty(&ic->i_recv_ring))
1027  rds_ib_stats_inc(s_ib_rx_ring_empty);
1028 
1029  if (rds_ib_ring_low(&ic->i_recv_ring))
1030  rds_ib_recv_refill(conn, 0);
1031 }
1032 
1033 int rds_ib_recv(struct rds_connection *conn)
1034 {
1035  struct rds_ib_connection *ic = conn->c_transport_data;
1036  int ret = 0;
1037 
1038  rdsdebug("conn %p\n", conn);
1039  if (rds_conn_up(conn))
1040  rds_ib_attempt_ack(ic);
1041 
1042  return ret;
1043 }
1044 
1046 {
1047  struct sysinfo si;
1048  int ret = -ENOMEM;
1049 
1050  /* Default to 30% of all available RAM for recv memory */
1051  si_meminfo(&si);
1053 
1054  rds_ib_incoming_slab = kmem_cache_create("rds_ib_incoming",
1055  sizeof(struct rds_ib_incoming),
1056  0, SLAB_HWCACHE_ALIGN, NULL);
1057  if (!rds_ib_incoming_slab)
1058  goto out;
1059 
1060  rds_ib_frag_slab = kmem_cache_create("rds_ib_frag",
1061  sizeof(struct rds_page_frag),
1062  0, SLAB_HWCACHE_ALIGN, NULL);
1063  if (!rds_ib_frag_slab)
1064  kmem_cache_destroy(rds_ib_incoming_slab);
1065  else
1066  ret = 0;
1067 out:
1068  return ret;
1069 }
1070 
1072 {
1073  kmem_cache_destroy(rds_ib_incoming_slab);
1074  kmem_cache_destroy(rds_ib_frag_slab);
1075 }