Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
recv.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2006 Oracle. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses. You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  * Redistribution and use in source and binary forms, with or
11  * without modification, are permitted provided that the following
12  * conditions are met:
13  *
14  * - Redistributions of source code must retain the above
15  * copyright notice, this list of conditions and the following
16  * disclaimer.
17  *
18  * - Redistributions in binary form must reproduce the above
19  * copyright notice, this list of conditions and the following
20  * disclaimer in the documentation and/or other materials
21  * provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  *
32  */
33 #include <linux/kernel.h>
34 #include <linux/slab.h>
35 #include <net/sock.h>
36 #include <linux/in.h>
37 #include <linux/export.h>
38 
39 #include "rds.h"
40 
41 void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
42  __be32 saddr)
43 {
44  atomic_set(&inc->i_refcount, 1);
45  INIT_LIST_HEAD(&inc->i_item);
46  inc->i_conn = conn;
47  inc->i_saddr = saddr;
48  inc->i_rdma_cookie = 0;
49 }
51 
52 static void rds_inc_addref(struct rds_incoming *inc)
53 {
54  rdsdebug("addref inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
55  atomic_inc(&inc->i_refcount);
56 }
57 
58 void rds_inc_put(struct rds_incoming *inc)
59 {
60  rdsdebug("put inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
61  if (atomic_dec_and_test(&inc->i_refcount)) {
62  BUG_ON(!list_empty(&inc->i_item));
63 
64  inc->i_conn->c_trans->inc_free(inc);
65  }
66 }
68 
69 static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk,
70  struct rds_cong_map *map,
71  int delta, __be16 port)
72 {
73  int now_congested;
74 
75  if (delta == 0)
76  return;
77 
78  rs->rs_rcv_bytes += delta;
79  now_congested = rs->rs_rcv_bytes > rds_sk_rcvbuf(rs);
80 
81  rdsdebug("rs %p (%pI4:%u) recv bytes %d buf %d "
82  "now_cong %d delta %d\n",
83  rs, &rs->rs_bound_addr,
85  rds_sk_rcvbuf(rs), now_congested, delta);
86 
87  /* wasn't -> am congested */
88  if (!rs->rs_congested && now_congested) {
89  rs->rs_congested = 1;
90  rds_cong_set_bit(map, port);
92  }
93  /* was -> aren't congested */
94  /* Require more free space before reporting uncongested to prevent
95  bouncing cong/uncong state too often */
96  else if (rs->rs_congested && (rs->rs_rcv_bytes < (rds_sk_rcvbuf(rs)/2))) {
97  rs->rs_congested = 0;
98  rds_cong_clear_bit(map, port);
100  }
101 
102  /* do nothing if no change in cong state */
103 }
104 
105 /*
106  * Process all extension headers that come with this message.
107  */
108 static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock *rs)
109 {
110  struct rds_header *hdr = &inc->i_hdr;
111  unsigned int pos = 0, type, len;
112  union {
114  struct rds_ext_header_rdma rdma;
115  struct rds_ext_header_rdma_dest rdma_dest;
116  } buffer;
117 
118  while (1) {
119  len = sizeof(buffer);
120  type = rds_message_next_extension(hdr, &pos, &buffer, &len);
121  if (type == RDS_EXTHDR_NONE)
122  break;
123  /* Process extension header here */
124  switch (type) {
125  case RDS_EXTHDR_RDMA:
126  rds_rdma_unuse(rs, be32_to_cpu(buffer.rdma.h_rdma_rkey), 0);
127  break;
128 
130  /* We ignore the size for now. We could stash it
131  * somewhere and use it for error checking. */
132  inc->i_rdma_cookie = rds_rdma_make_cookie(
133  be32_to_cpu(buffer.rdma_dest.h_rdma_rkey),
134  be32_to_cpu(buffer.rdma_dest.h_rdma_offset));
135 
136  break;
137  }
138  }
139 }
140 
141 /*
142  * The transport must make sure that this is serialized against other
143  * rx and conn reset on this specific conn.
144  *
145  * We currently assert that only one fragmented message will be sent
146  * down a connection at a time. This lets us reassemble in the conn
147  * instead of per-flow which means that we don't have to go digging through
148  * flows to tear down partial reassembly progress on conn failure and
149  * we save flow lookup and locking for each frag arrival. It does mean
150  * that small messages will wait behind large ones. Fragmenting at all
151  * is only to reduce the memory consumption of pre-posted buffers.
152  *
153  * The caller passes in saddr and daddr instead of us getting it from the
154  * conn. This lets loopback, who only has one conn for both directions,
155  * tell us which roles the addrs in the conn are playing for this message.
156  */
158  struct rds_incoming *inc, gfp_t gfp)
159 {
160  struct rds_sock *rs = NULL;
161  struct sock *sk;
162  unsigned long flags;
163 
164  inc->i_conn = conn;
165  inc->i_rx_jiffies = jiffies;
166 
167  rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u "
168  "flags 0x%x rx_jiffies %lu\n", conn,
169  (unsigned long long)conn->c_next_rx_seq,
170  inc,
171  (unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence),
172  be32_to_cpu(inc->i_hdr.h_len),
173  be16_to_cpu(inc->i_hdr.h_sport),
174  be16_to_cpu(inc->i_hdr.h_dport),
175  inc->i_hdr.h_flags,
176  inc->i_rx_jiffies);
177 
178  /*
179  * Sequence numbers should only increase. Messages get their
180  * sequence number as they're queued in a sending conn. They
181  * can be dropped, though, if the sending socket is closed before
182  * they hit the wire. So sequence numbers can skip forward
183  * under normal operation. They can also drop back in the conn
184  * failover case as previously sent messages are resent down the
185  * new instance of a conn. We drop those, otherwise we have
186  * to assume that the next valid seq does not come after a
187  * hole in the fragment stream.
188  *
189  * The headers don't give us a way to realize if fragments of
190  * a message have been dropped. We assume that frags that arrive
191  * to a flow are part of the current message on the flow that is
192  * being reassembled. This means that senders can't drop messages
193  * from the sending conn until all their frags are sent.
194  *
195  * XXX we could spend more on the wire to get more robust failure
196  * detection, arguably worth it to avoid data corruption.
197  */
198  if (be64_to_cpu(inc->i_hdr.h_sequence) < conn->c_next_rx_seq &&
199  (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) {
200  rds_stats_inc(s_recv_drop_old_seq);
201  goto out;
202  }
203  conn->c_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
204 
205  if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) {
206  rds_stats_inc(s_recv_ping);
207  rds_send_pong(conn, inc->i_hdr.h_sport);
208  goto out;
209  }
210 
211  rs = rds_find_bound(daddr, inc->i_hdr.h_dport);
212  if (!rs) {
213  rds_stats_inc(s_recv_drop_no_sock);
214  goto out;
215  }
216 
217  /* Process extension headers */
218  rds_recv_incoming_exthdrs(inc, rs);
219 
220  /* We can be racing with rds_release() which marks the socket dead. */
221  sk = rds_rs_to_sk(rs);
222 
223  /* serialize with rds_release -> sock_orphan */
224  write_lock_irqsave(&rs->rs_recv_lock, flags);
225  if (!sock_flag(sk, SOCK_DEAD)) {
226  rdsdebug("adding inc %p to rs %p's recv queue\n", inc, rs);
227  rds_stats_inc(s_recv_queued);
228  rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
229  be32_to_cpu(inc->i_hdr.h_len),
230  inc->i_hdr.h_dport);
231  rds_inc_addref(inc);
232  list_add_tail(&inc->i_item, &rs->rs_recv_queue);
233  __rds_wake_sk_sleep(sk);
234  } else {
235  rds_stats_inc(s_recv_drop_dead_sock);
236  }
238 
239 out:
240  if (rs)
241  rds_sock_put(rs);
242 }
244 
245 /*
246  * be very careful here. This is being called as the condition in
247  * wait_event_*() needs to cope with being called many times.
248  */
249 static int rds_next_incoming(struct rds_sock *rs, struct rds_incoming **inc)
250 {
251  unsigned long flags;
252 
253  if (!*inc) {
254  read_lock_irqsave(&rs->rs_recv_lock, flags);
255  if (!list_empty(&rs->rs_recv_queue)) {
256  *inc = list_entry(rs->rs_recv_queue.next,
257  struct rds_incoming,
258  i_item);
259  rds_inc_addref(*inc);
260  }
262  }
263 
264  return *inc != NULL;
265 }
266 
267 static int rds_still_queued(struct rds_sock *rs, struct rds_incoming *inc,
268  int drop)
269 {
270  struct sock *sk = rds_rs_to_sk(rs);
271  int ret = 0;
272  unsigned long flags;
273 
274  write_lock_irqsave(&rs->rs_recv_lock, flags);
275  if (!list_empty(&inc->i_item)) {
276  ret = 1;
277  if (drop) {
278  /* XXX make sure this i_conn is reliable */
279  rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
280  -be32_to_cpu(inc->i_hdr.h_len),
281  inc->i_hdr.h_dport);
282  list_del_init(&inc->i_item);
283  rds_inc_put(inc);
284  }
285  }
287 
288  rdsdebug("inc %p rs %p still %d dropped %d\n", inc, rs, ret, drop);
289  return ret;
290 }
291 
292 /*
293  * Pull errors off the error queue.
294  * If msghdr is NULL, we will just purge the error queue.
295  */
296 int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr)
297 {
298  struct rds_notifier *notifier;
299  struct rds_rdma_notify cmsg = { 0 }; /* fill holes with zero */
300  unsigned int count = 0, max_messages = ~0U;
301  unsigned long flags;
302  LIST_HEAD(copy);
303  int err = 0;
304 
305 
306  /* put_cmsg copies to user space and thus may sleep. We can't do this
307  * with rs_lock held, so first grab as many notifications as we can stuff
308  * in the user provided cmsg buffer. We don't try to copy more, to avoid
309  * losing notifications - except when the buffer is so small that it wouldn't
310  * even hold a single notification. Then we give him as much of this single
311  * msg as we can squeeze in, and set MSG_CTRUNC.
312  */
313  if (msghdr) {
314  max_messages = msghdr->msg_controllen / CMSG_SPACE(sizeof(cmsg));
315  if (!max_messages)
316  max_messages = 1;
317  }
318 
319  spin_lock_irqsave(&rs->rs_lock, flags);
320  while (!list_empty(&rs->rs_notify_queue) && count < max_messages) {
321  notifier = list_entry(rs->rs_notify_queue.next,
322  struct rds_notifier, n_list);
323  list_move(&notifier->n_list, &copy);
324  count++;
325  }
326  spin_unlock_irqrestore(&rs->rs_lock, flags);
327 
328  if (!count)
329  return 0;
330 
331  while (!list_empty(&copy)) {
332  notifier = list_entry(copy.next, struct rds_notifier, n_list);
333 
334  if (msghdr) {
335  cmsg.user_token = notifier->n_user_token;
336  cmsg.status = notifier->n_status;
337 
338  err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_RDMA_STATUS,
339  sizeof(cmsg), &cmsg);
340  if (err)
341  break;
342  }
343 
344  list_del_init(&notifier->n_list);
345  kfree(notifier);
346  }
347 
348  /* If we bailed out because of an error in put_cmsg,
349  * we may be left with one or more notifications that we
350  * didn't process. Return them to the head of the list. */
351  if (!list_empty(&copy)) {
352  spin_lock_irqsave(&rs->rs_lock, flags);
353  list_splice(&copy, &rs->rs_notify_queue);
354  spin_unlock_irqrestore(&rs->rs_lock, flags);
355  }
356 
357  return err;
358 }
359 
360 /*
361  * Queue a congestion notification
362  */
363 static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr)
364 {
366  unsigned long flags;
367  int err;
368 
369  err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_CONG_UPDATE,
370  sizeof(notify), &notify);
371  if (err)
372  return err;
373 
374  spin_lock_irqsave(&rs->rs_lock, flags);
375  rs->rs_cong_notify &= ~notify;
376  spin_unlock_irqrestore(&rs->rs_lock, flags);
377 
378  return 0;
379 }
380 
381 /*
382  * Receive any control messages.
383  */
384 static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg)
385 {
386  int ret = 0;
387 
388  if (inc->i_rdma_cookie) {
389  ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RDMA_DEST,
390  sizeof(inc->i_rdma_cookie), &inc->i_rdma_cookie);
391  if (ret)
392  return ret;
393  }
394 
395  return 0;
396 }
397 
398 int rds_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
399  size_t size, int msg_flags)
400 {
401  struct sock *sk = sock->sk;
402  struct rds_sock *rs = rds_sk_to_rs(sk);
403  long timeo;
404  int ret = 0, nonblock = msg_flags & MSG_DONTWAIT;
405  struct sockaddr_in *sin;
406  struct rds_incoming *inc = NULL;
407 
408  /* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */
409  timeo = sock_rcvtimeo(sk, nonblock);
410 
411  rdsdebug("size %zu flags 0x%x timeo %ld\n", size, msg_flags, timeo);
412 
413  msg->msg_namelen = 0;
414 
415  if (msg_flags & MSG_OOB)
416  goto out;
417 
418  while (1) {
419  /* If there are pending notifications, do those - and nothing else */
420  if (!list_empty(&rs->rs_notify_queue)) {
421  ret = rds_notify_queue_get(rs, msg);
422  break;
423  }
424 
425  if (rs->rs_cong_notify) {
426  ret = rds_notify_cong(rs, msg);
427  break;
428  }
429 
430  if (!rds_next_incoming(rs, &inc)) {
431  if (nonblock) {
432  ret = -EAGAIN;
433  break;
434  }
435 
436  timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
437  (!list_empty(&rs->rs_notify_queue) ||
438  rs->rs_cong_notify ||
439  rds_next_incoming(rs, &inc)), timeo);
440  rdsdebug("recvmsg woke inc %p timeo %ld\n", inc,
441  timeo);
442  if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
443  continue;
444 
445  ret = timeo;
446  if (ret == 0)
447  ret = -ETIMEDOUT;
448  break;
449  }
450 
451  rdsdebug("copying inc %p from %pI4:%u to user\n", inc,
452  &inc->i_conn->c_faddr,
453  ntohs(inc->i_hdr.h_sport));
454  ret = inc->i_conn->c_trans->inc_copy_to_user(inc, msg->msg_iov,
455  size);
456  if (ret < 0)
457  break;
458 
459  /*
460  * if the message we just copied isn't at the head of the
461  * recv queue then someone else raced us to return it, try
462  * to get the next message.
463  */
464  if (!rds_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) {
465  rds_inc_put(inc);
466  inc = NULL;
467  rds_stats_inc(s_recv_deliver_raced);
468  continue;
469  }
470 
471  if (ret < be32_to_cpu(inc->i_hdr.h_len)) {
472  if (msg_flags & MSG_TRUNC)
473  ret = be32_to_cpu(inc->i_hdr.h_len);
474  msg->msg_flags |= MSG_TRUNC;
475  }
476 
477  if (rds_cmsg_recv(inc, msg)) {
478  ret = -EFAULT;
479  goto out;
480  }
481 
482  rds_stats_inc(s_recv_delivered);
483 
484  sin = (struct sockaddr_in *)msg->msg_name;
485  if (sin) {
486  sin->sin_family = AF_INET;
487  sin->sin_port = inc->i_hdr.h_sport;
488  sin->sin_addr.s_addr = inc->i_saddr;
489  memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
490  msg->msg_namelen = sizeof(*sin);
491  }
492  break;
493  }
494 
495  if (inc)
496  rds_inc_put(inc);
497 
498 out:
499  return ret;
500 }
501 
502 /*
503  * The socket is being shut down and we're asked to drop messages that were
504  * queued for recvmsg. The caller has unbound the socket so the receive path
505  * won't queue any more incoming fragments or messages on the socket.
506  */
508 {
509  struct sock *sk = rds_rs_to_sk(rs);
510  struct rds_incoming *inc, *tmp;
511  unsigned long flags;
512 
513  write_lock_irqsave(&rs->rs_recv_lock, flags);
515  rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
516  -be32_to_cpu(inc->i_hdr.h_len),
517  inc->i_hdr.h_dport);
518  list_del_init(&inc->i_item);
519  rds_inc_put(inc);
520  }
522 }
523 
524 /*
525  * inc->i_saddr isn't used here because it is only set in the receive
526  * path.
527  */
529  struct rds_info_iterator *iter,
530  __be32 saddr, __be32 daddr, int flip)
531 {
532  struct rds_info_message minfo;
533 
534  minfo.seq = be64_to_cpu(inc->i_hdr.h_sequence);
535  minfo.len = be32_to_cpu(inc->i_hdr.h_len);
536 
537  if (flip) {
538  minfo.laddr = daddr;
539  minfo.faddr = saddr;
540  minfo.lport = inc->i_hdr.h_dport;
541  minfo.fport = inc->i_hdr.h_sport;
542  } else {
543  minfo.laddr = saddr;
544  minfo.faddr = daddr;
545  minfo.lport = inc->i_hdr.h_sport;
546  minfo.fport = inc->i_hdr.h_dport;
547  }
548 
549  rds_info_copy(iter, &minfo, sizeof(minfo));
550 }