Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
ar-recvmsg.c
Go to the documentation of this file.
1 /* RxRPC recvmsg() implementation
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells ([email protected])
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11 
12 #include <linux/net.h>
13 #include <linux/skbuff.h>
14 #include <linux/export.h>
15 #include <net/sock.h>
16 #include <net/af_rxrpc.h>
17 #include "ar-internal.h"
18 
19 /*
20  * removal a call's user ID from the socket tree to make the user ID available
21  * again and so that it won't be seen again in association with that call
22  */
23 void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call)
24 {
25  _debug("RELEASE CALL %d", call->debug_id);
26 
27  if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
29  rb_erase(&call->sock_node, &call->socket->calls);
32  }
33 
34  read_lock_bh(&call->state_lock);
35  if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
37  rxrpc_queue_call(call);
38  read_unlock_bh(&call->state_lock);
39 }
40 
41 /*
42  * receive a message from an RxRPC socket
43  * - we need to be careful about two or more threads calling recvmsg
44  * simultaneously
45  */
46 int rxrpc_recvmsg(struct kiocb *iocb, struct socket *sock,
47  struct msghdr *msg, size_t len, int flags)
48 {
49  struct rxrpc_skb_priv *sp;
50  struct rxrpc_call *call = NULL, *continue_call = NULL;
51  struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
52  struct sk_buff *skb;
53  long timeo;
54  int copy, ret, ullen, offset, copied = 0;
55  u32 abort_code;
56 
58 
59  _enter(",,,%zu,%d", len, flags);
60 
61  if (flags & (MSG_OOB | MSG_TRUNC))
62  return -EOPNOTSUPP;
63 
64  ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long);
65 
66  timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
67  msg->msg_flags |= MSG_MORE;
68 
69  lock_sock(&rx->sk);
70 
71  for (;;) {
72  /* return immediately if a client socket has no outstanding
73  * calls */
74  if (RB_EMPTY_ROOT(&rx->calls)) {
75  if (copied)
76  goto out;
77  if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
78  release_sock(&rx->sk);
79  if (continue_call)
80  rxrpc_put_call(continue_call);
81  return -ENODATA;
82  }
83  }
84 
85  /* get the next message on the Rx queue */
86  skb = skb_peek(&rx->sk.sk_receive_queue);
87  if (!skb) {
88  /* nothing remains on the queue */
89  if (copied &&
90  (msg->msg_flags & MSG_PEEK || timeo == 0))
91  goto out;
92 
93  /* wait for a message to turn up */
94  release_sock(&rx->sk);
95  prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
97  ret = sock_error(&rx->sk);
98  if (ret)
99  goto wait_error;
100 
101  if (skb_queue_empty(&rx->sk.sk_receive_queue)) {
102  if (signal_pending(current))
103  goto wait_interrupted;
104  timeo = schedule_timeout(timeo);
105  }
106  finish_wait(sk_sleep(&rx->sk), &wait);
107  lock_sock(&rx->sk);
108  continue;
109  }
110 
111  peek_next_packet:
112  sp = rxrpc_skb(skb);
113  call = sp->call;
114  ASSERT(call != NULL);
115 
116  _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]);
117 
118  /* make sure we wait for the state to be updated in this call */
119  spin_lock_bh(&call->lock);
120  spin_unlock_bh(&call->lock);
121 
122  if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
123  _debug("packet from released call");
124  if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
125  BUG();
126  rxrpc_free_skb(skb);
127  continue;
128  }
129 
130  /* determine whether to continue last data receive */
131  if (continue_call) {
132  _debug("maybe cont");
133  if (call != continue_call ||
134  skb->mark != RXRPC_SKB_MARK_DATA) {
135  release_sock(&rx->sk);
136  rxrpc_put_call(continue_call);
137  _leave(" = %d [noncont]", copied);
138  return copied;
139  }
140  }
141 
142  rxrpc_get_call(call);
143 
144  /* copy the peer address and timestamp */
145  if (!continue_call) {
146  if (msg->msg_name && msg->msg_namelen > 0)
147  memcpy(msg->msg_name,
148  &call->conn->trans->peer->srx,
149  sizeof(call->conn->trans->peer->srx));
150  sock_recv_ts_and_drops(msg, &rx->sk, skb);
151  }
152 
153  /* receive the message */
154  if (skb->mark != RXRPC_SKB_MARK_DATA)
155  goto receive_non_data_message;
156 
157  _debug("recvmsg DATA #%u { %d, %d }",
158  ntohl(sp->hdr.seq), skb->len, sp->offset);
159 
160  if (!continue_call) {
161  /* only set the control data once per recvmsg() */
163  ullen, &call->user_call_ID);
164  if (ret < 0)
165  goto copy_error;
167  }
168 
169  ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
170  ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
171  call->rx_data_recv = ntohl(sp->hdr.seq);
172 
173  ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);
174 
175  offset = sp->offset;
176  copy = skb->len - offset;
177  if (copy > len - copied)
178  copy = len - copied;
179 
180  if (skb->ip_summed == CHECKSUM_UNNECESSARY) {
181  ret = skb_copy_datagram_iovec(skb, offset,
182  msg->msg_iov, copy);
183  } else {
184  ret = skb_copy_and_csum_datagram_iovec(skb, offset,
185  msg->msg_iov);
186  if (ret == -EINVAL)
187  goto csum_copy_error;
188  }
189 
190  if (ret < 0)
191  goto copy_error;
192 
193  /* handle piecemeal consumption of data packets */
194  _debug("copied %d+%d", copy, copied);
195 
196  offset += copy;
197  copied += copy;
198 
199  if (!(flags & MSG_PEEK))
200  sp->offset = offset;
201 
202  if (sp->offset < skb->len) {
203  _debug("buffer full");
204  ASSERTCMP(copied, ==, len);
205  break;
206  }
207 
208  /* we transferred the whole data packet */
209  if (sp->hdr.flags & RXRPC_LAST_PACKET) {
210  _debug("last");
211  if (call->conn->out_clientflag) {
212  /* last byte of reply received */
213  ret = copied;
214  goto terminal_message;
215  }
216 
217  /* last bit of request received */
218  if (!(flags & MSG_PEEK)) {
219  _debug("eat packet");
220  if (skb_dequeue(&rx->sk.sk_receive_queue) !=
221  skb)
222  BUG();
223  rxrpc_free_skb(skb);
224  }
225  msg->msg_flags &= ~MSG_MORE;
226  break;
227  }
228 
229  /* move on to the next data message */
230  _debug("next");
231  if (!continue_call)
232  continue_call = sp->call;
233  else
234  rxrpc_put_call(call);
235  call = NULL;
236 
237  if (flags & MSG_PEEK) {
238  _debug("peek next");
239  skb = skb->next;
240  if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue)
241  break;
242  goto peek_next_packet;
243  }
244 
245  _debug("eat packet");
246  if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
247  BUG();
248  rxrpc_free_skb(skb);
249  }
250 
251  /* end of non-terminal data packet reception for the moment */
252  _debug("end rcv data");
253 out:
254  release_sock(&rx->sk);
255  if (call)
256  rxrpc_put_call(call);
257  if (continue_call)
258  rxrpc_put_call(continue_call);
259  _leave(" = %d [data]", copied);
260  return copied;
261 
262  /* handle non-DATA messages such as aborts, incoming connections and
263  * final ACKs */
264 receive_non_data_message:
265  _debug("non-data");
266 
267  if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) {
268  _debug("RECV NEW CALL");
269  ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code);
270  if (ret < 0)
271  goto copy_error;
272  if (!(flags & MSG_PEEK)) {
273  if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
274  BUG();
275  rxrpc_free_skb(skb);
276  }
277  goto out;
278  }
279 
281  ullen, &call->user_call_ID);
282  if (ret < 0)
283  goto copy_error;
285 
286  switch (skb->mark) {
287  case RXRPC_SKB_MARK_DATA:
288  BUG();
290  ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code);
291  break;
292  case RXRPC_SKB_MARK_BUSY:
293  ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code);
294  break;
296  abort_code = call->abort_code;
297  ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
298  break;
300  _debug("RECV NET ERROR %d", sp->error);
301  abort_code = sp->error;
302  ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code);
303  break;
305  _debug("RECV LOCAL ERROR %d", sp->error);
306  abort_code = sp->error;
307  ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4,
308  &abort_code);
309  break;
310  default:
311  BUG();
312  break;
313  }
314 
315  if (ret < 0)
316  goto copy_error;
317 
318 terminal_message:
319  _debug("terminal");
320  msg->msg_flags &= ~MSG_MORE;
321  msg->msg_flags |= MSG_EOR;
322 
323  if (!(flags & MSG_PEEK)) {
324  _net("free terminal skb %p", skb);
325  if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
326  BUG();
327  rxrpc_free_skb(skb);
328  rxrpc_remove_user_ID(rx, call);
329  }
330 
331  release_sock(&rx->sk);
332  rxrpc_put_call(call);
333  if (continue_call)
334  rxrpc_put_call(continue_call);
335  _leave(" = %d", ret);
336  return ret;
337 
338 copy_error:
339  _debug("copy error");
340  release_sock(&rx->sk);
341  rxrpc_put_call(call);
342  if (continue_call)
343  rxrpc_put_call(continue_call);
344  _leave(" = %d", ret);
345  return ret;
346 
347 csum_copy_error:
348  _debug("csum error");
349  release_sock(&rx->sk);
350  if (continue_call)
351  rxrpc_put_call(continue_call);
352  rxrpc_kill_skb(skb);
353  skb_kill_datagram(&rx->sk, skb, flags);
354  rxrpc_put_call(call);
355  return -EAGAIN;
356 
357 wait_interrupted:
358  ret = sock_intr_errno(timeo);
359 wait_error:
360  finish_wait(sk_sleep(&rx->sk), &wait);
361  if (continue_call)
362  rxrpc_put_call(continue_call);
363  if (copied)
364  copied = ret;
365  _leave(" = %d [waitfail %d]", copied, ret);
366  return copied;
367 
368 }
369 
378 {
379  struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
380  struct rxrpc_call *call = sp->call;
381 
382  ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
383  ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
384  call->rx_data_recv = ntohl(sp->hdr.seq);
385 
386  ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);
387  rxrpc_free_skb(skb);
388 }
389 
391 
399 {
400  struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
401 
403 
404  return sp->hdr.flags & RXRPC_LAST_PACKET;
405 }
406 
408 
416 {
417  struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
418 
420 
421  return sp->call->abort_code;
422 }
423 
425 
433 {
434  struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
435 
436  return sp->error;
437 }
438