Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
ar-ack.c
Go to the documentation of this file.
1 /* Management of Tx window, Tx resend, ACKs and out-of-sequence reception
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells ([email protected])
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11 
12 #include <linux/module.h>
13 #include <linux/circ_buf.h>
14 #include <linux/net.h>
15 #include <linux/skbuff.h>
16 #include <linux/slab.h>
17 #include <linux/udp.h>
18 #include <net/sock.h>
19 #include <net/af_rxrpc.h>
20 #include "ar-internal.h"
21 
22 static unsigned int rxrpc_ack_defer = 1;
23 
24 static const char *const rxrpc_acks[] = {
25  "---", "REQ", "DUP", "OOS", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL",
26  "-?-"
27 };
28 
29 static const s8 rxrpc_ack_priority[] = {
30  [0] = 0,
31  [RXRPC_ACK_DELAY] = 1,
32  [RXRPC_ACK_REQUESTED] = 2,
33  [RXRPC_ACK_IDLE] = 3,
35  [RXRPC_ACK_DUPLICATE] = 5,
38  [RXRPC_ACK_NOSPACE] = 8,
39 };
40 
41 /*
42  * propose an ACK be sent
43  */
44 void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
45  __be32 serial, bool immediate)
46 {
47  unsigned long expiry;
48  s8 prior = rxrpc_ack_priority[ack_reason];
49 
50  ASSERTCMP(prior, >, 0);
51 
52  _enter("{%d},%s,%%%x,%u",
53  call->debug_id, rxrpc_acks[ack_reason], ntohl(serial),
54  immediate);
55 
56  if (prior < rxrpc_ack_priority[call->ackr_reason]) {
57  if (immediate)
58  goto cancel_timer;
59  return;
60  }
61 
62  /* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
63  * numbers */
64  if (prior == rxrpc_ack_priority[call->ackr_reason]) {
65  if (prior <= 4)
66  call->ackr_serial = serial;
67  if (immediate)
68  goto cancel_timer;
69  return;
70  }
71 
72  call->ackr_reason = ack_reason;
73  call->ackr_serial = serial;
74 
75  switch (ack_reason) {
76  case RXRPC_ACK_DELAY:
77  _debug("run delay timer");
78  call->ack_timer.expires = jiffies + rxrpc_ack_timeout * HZ;
79  add_timer(&call->ack_timer);
80  return;
81 
82  case RXRPC_ACK_IDLE:
83  if (!immediate) {
84  _debug("run defer timer");
85  expiry = 1;
86  goto run_timer;
87  }
88  goto cancel_timer;
89 
91  if (!rxrpc_ack_defer)
92  goto cancel_timer;
93  if (!immediate || serial == cpu_to_be32(1)) {
94  _debug("run defer timer");
95  expiry = rxrpc_ack_defer;
96  goto run_timer;
97  }
98 
99  default:
100  _debug("immediate ACK");
101  goto cancel_timer;
102  }
103 
104 run_timer:
105  expiry += jiffies;
106  if (!timer_pending(&call->ack_timer) ||
107  time_after(call->ack_timer.expires, expiry))
108  mod_timer(&call->ack_timer, expiry);
109  return;
110 
111 cancel_timer:
112  _debug("cancel timer %%%u", ntohl(serial));
114  read_lock_bh(&call->state_lock);
115  if (call->state <= RXRPC_CALL_COMPLETE &&
117  rxrpc_queue_call(call);
118  read_unlock_bh(&call->state_lock);
119 }
120 
121 /*
122  * propose an ACK be sent, locking the call structure
123  */
124 void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
125  __be32 serial, bool immediate)
126 {
127  s8 prior = rxrpc_ack_priority[ack_reason];
128 
129  if (prior > rxrpc_ack_priority[call->ackr_reason]) {
130  spin_lock_bh(&call->lock);
131  __rxrpc_propose_ACK(call, ack_reason, serial, immediate);
132  spin_unlock_bh(&call->lock);
133  }
134 }
135 
136 /*
137  * set the resend timer
138  */
139 static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
140  unsigned long resend_at)
141 {
142  read_lock_bh(&call->state_lock);
143  if (call->state >= RXRPC_CALL_COMPLETE)
144  resend = 0;
145 
146  if (resend & 1) {
147  _debug("SET RESEND");
149  }
150 
151  if (resend & 2) {
152  _debug("MODIFY RESEND TIMER");
154  mod_timer(&call->resend_timer, resend_at);
155  } else {
156  _debug("KILL RESEND TIMER");
160  }
161  read_unlock_bh(&call->state_lock);
162 }
163 
164 /*
165  * resend packets
166  */
167 static void rxrpc_resend(struct rxrpc_call *call)
168 {
169  struct rxrpc_skb_priv *sp;
170  struct rxrpc_header *hdr;
171  struct sk_buff *txb;
172  unsigned long *p_txb, resend_at;
173  int loop, stop;
174  u8 resend;
175 
176  _enter("{%d,%d,%d,%d},",
177  call->acks_hard, call->acks_unacked,
178  atomic_read(&call->sequence),
179  CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
180 
181  stop = 0;
182  resend = 0;
183  resend_at = 0;
184 
185  for (loop = call->acks_tail;
186  loop != call->acks_head || stop;
187  loop = (loop + 1) & (call->acks_winsz - 1)
188  ) {
189  p_txb = call->acks_window + loop;
191  if (*p_txb & 1)
192  continue;
193 
194  txb = (struct sk_buff *) *p_txb;
195  sp = rxrpc_skb(txb);
196 
197  if (sp->need_resend) {
198  sp->need_resend = false;
199 
200  /* each Tx packet has a new serial number */
201  sp->hdr.serial =
202  htonl(atomic_inc_return(&call->conn->serial));
203 
204  hdr = (struct rxrpc_header *) txb->head;
205  hdr->serial = sp->hdr.serial;
206 
207  _proto("Tx DATA %%%u { #%d }",
208  ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
209  if (rxrpc_send_packet(call->conn->trans, txb) < 0) {
210  stop = 0;
211  sp->resend_at = jiffies + 3;
212  } else {
213  sp->resend_at =
215  }
216  }
217 
218  if (time_after_eq(jiffies + 1, sp->resend_at)) {
219  sp->need_resend = true;
220  resend |= 1;
221  } else if (resend & 2) {
222  if (time_before(sp->resend_at, resend_at))
223  resend_at = sp->resend_at;
224  } else {
225  resend_at = sp->resend_at;
226  resend |= 2;
227  }
228  }
229 
230  rxrpc_set_resend(call, resend, resend_at);
231  _leave("");
232 }
233 
234 /*
235  * handle resend timer expiry
236  */
237 static void rxrpc_resend_timer(struct rxrpc_call *call)
238 {
239  struct rxrpc_skb_priv *sp;
240  struct sk_buff *txb;
241  unsigned long *p_txb, resend_at;
242  int loop;
243  u8 resend;
244 
245  _enter("%d,%d,%d",
246  call->acks_tail, call->acks_unacked, call->acks_head);
247 
248  if (call->state >= RXRPC_CALL_COMPLETE)
249  return;
250 
251  resend = 0;
252  resend_at = 0;
253 
254  for (loop = call->acks_unacked;
255  loop != call->acks_head;
256  loop = (loop + 1) & (call->acks_winsz - 1)
257  ) {
258  p_txb = call->acks_window + loop;
260  txb = (struct sk_buff *) (*p_txb & ~1);
261  sp = rxrpc_skb(txb);
262 
263  ASSERT(!(*p_txb & 1));
264 
265  if (sp->need_resend) {
266  ;
267  } else if (time_after_eq(jiffies + 1, sp->resend_at)) {
268  sp->need_resend = true;
269  resend |= 1;
270  } else if (resend & 2) {
271  if (time_before(sp->resend_at, resend_at))
272  resend_at = sp->resend_at;
273  } else {
274  resend_at = sp->resend_at;
275  resend |= 2;
276  }
277  }
278 
279  rxrpc_set_resend(call, resend, resend_at);
280  _leave("");
281 }
282 
283 /*
284  * process soft ACKs of our transmitted packets
285  * - these indicate packets the peer has or has not received, but hasn't yet
286  * given to the consumer, and so can still be discarded and re-requested
287  */
288 static int rxrpc_process_soft_ACKs(struct rxrpc_call *call,
289  struct rxrpc_ackpacket *ack,
290  struct sk_buff *skb)
291 {
292  struct rxrpc_skb_priv *sp;
293  struct sk_buff *txb;
294  unsigned long *p_txb, resend_at;
295  int loop;
296  u8 sacks[RXRPC_MAXACKS], resend;
297 
298  _enter("{%d,%d},{%d},",
299  call->acks_hard,
300  CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz),
301  ack->nAcks);
302 
303  if (skb_copy_bits(skb, 0, sacks, ack->nAcks) < 0)
304  goto protocol_error;
305 
306  resend = 0;
307  resend_at = 0;
308  for (loop = 0; loop < ack->nAcks; loop++) {
309  p_txb = call->acks_window;
310  p_txb += (call->acks_tail + loop) & (call->acks_winsz - 1);
312  txb = (struct sk_buff *) (*p_txb & ~1);
313  sp = rxrpc_skb(txb);
314 
315  switch (sacks[loop]) {
316  case RXRPC_ACK_TYPE_ACK:
317  sp->need_resend = false;
318  *p_txb |= 1;
319  break;
320  case RXRPC_ACK_TYPE_NACK:
321  sp->need_resend = true;
322  *p_txb &= ~1;
323  resend = 1;
324  break;
325  default:
326  _debug("Unsupported ACK type %d", sacks[loop]);
327  goto protocol_error;
328  }
329  }
330 
331  smp_mb();
332  call->acks_unacked = (call->acks_tail + loop) & (call->acks_winsz - 1);
333 
334  /* anything not explicitly ACK'd is implicitly NACK'd, but may just not
335  * have been received or processed yet by the far end */
336  for (loop = call->acks_unacked;
337  loop != call->acks_head;
338  loop = (loop + 1) & (call->acks_winsz - 1)
339  ) {
340  p_txb = call->acks_window + loop;
342  txb = (struct sk_buff *) (*p_txb & ~1);
343  sp = rxrpc_skb(txb);
344 
345  if (*p_txb & 1) {
346  /* packet must have been discarded */
347  sp->need_resend = true;
348  *p_txb &= ~1;
349  resend |= 1;
350  } else if (sp->need_resend) {
351  ;
352  } else if (time_after_eq(jiffies + 1, sp->resend_at)) {
353  sp->need_resend = true;
354  resend |= 1;
355  } else if (resend & 2) {
356  if (time_before(sp->resend_at, resend_at))
357  resend_at = sp->resend_at;
358  } else {
359  resend_at = sp->resend_at;
360  resend |= 2;
361  }
362  }
363 
364  rxrpc_set_resend(call, resend, resend_at);
365  _leave(" = 0");
366  return 0;
367 
368 protocol_error:
369  _leave(" = -EPROTO");
370  return -EPROTO;
371 }
372 
373 /*
374  * discard hard-ACK'd packets from the Tx window
375  */
376 static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard)
377 {
378  unsigned long _skb;
379  int tail = call->acks_tail, old_tail;
380  int win = CIRC_CNT(call->acks_head, tail, call->acks_winsz);
381 
382  _enter("{%u,%u},%u", call->acks_hard, win, hard);
383 
384  ASSERTCMP(hard - call->acks_hard, <=, win);
385 
386  while (call->acks_hard < hard) {
388  _skb = call->acks_window[tail] & ~1;
389  rxrpc_free_skb((struct sk_buff *) _skb);
390  old_tail = tail;
391  tail = (tail + 1) & (call->acks_winsz - 1);
392  call->acks_tail = tail;
393  if (call->acks_unacked == old_tail)
394  call->acks_unacked = tail;
395  call->acks_hard++;
396  }
397 
398  wake_up(&call->tx_waitq);
399 }
400 
401 /*
402  * clear the Tx window in the event of a failure
403  */
404 static void rxrpc_clear_tx_window(struct rxrpc_call *call)
405 {
406  rxrpc_rotate_tx_window(call, atomic_read(&call->sequence));
407 }
408 
409 /*
410  * drain the out of sequence received packet queue into the packet Rx queue
411  */
412 static int rxrpc_drain_rx_oos_queue(struct rxrpc_call *call)
413 {
414  struct rxrpc_skb_priv *sp;
415  struct sk_buff *skb;
416  bool terminal;
417  int ret;
418 
419  _enter("{%d,%d}", call->rx_data_post, call->rx_first_oos);
420 
421  spin_lock_bh(&call->lock);
422 
423  ret = -ECONNRESET;
424  if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
425  goto socket_unavailable;
426 
427  skb = skb_dequeue(&call->rx_oos_queue);
428  if (skb) {
429  sp = rxrpc_skb(skb);
430 
431  _debug("drain OOS packet %d [%d]",
432  ntohl(sp->hdr.seq), call->rx_first_oos);
433 
434  if (ntohl(sp->hdr.seq) != call->rx_first_oos) {
435  skb_queue_head(&call->rx_oos_queue, skb);
436  call->rx_first_oos = ntohl(rxrpc_skb(skb)->hdr.seq);
437  _debug("requeue %p {%u}", skb, call->rx_first_oos);
438  } else {
439  skb->mark = RXRPC_SKB_MARK_DATA;
440  terminal = ((sp->hdr.flags & RXRPC_LAST_PACKET) &&
441  !(sp->hdr.flags & RXRPC_CLIENT_INITIATED));
442  ret = rxrpc_queue_rcv_skb(call, skb, true, terminal);
443  BUG_ON(ret < 0);
444  _debug("drain #%u", call->rx_data_post);
445  call->rx_data_post++;
446 
447  /* find out what the next packet is */
448  skb = skb_peek(&call->rx_oos_queue);
449  if (skb)
450  call->rx_first_oos =
451  ntohl(rxrpc_skb(skb)->hdr.seq);
452  else
453  call->rx_first_oos = 0;
454  _debug("peek %p {%u}", skb, call->rx_first_oos);
455  }
456  }
457 
458  ret = 0;
459 socket_unavailable:
460  spin_unlock_bh(&call->lock);
461  _leave(" = %d", ret);
462  return ret;
463 }
464 
465 /*
466  * insert an out of sequence packet into the buffer
467  */
468 static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
469  struct sk_buff *skb)
470 {
471  struct rxrpc_skb_priv *sp, *psp;
472  struct sk_buff *p;
473  u32 seq;
474 
475  sp = rxrpc_skb(skb);
476  seq = ntohl(sp->hdr.seq);
477  _enter(",,{%u}", seq);
478 
480  ASSERTCMP(sp->call, ==, NULL);
481  sp->call = call;
482  rxrpc_get_call(call);
483 
484  /* insert into the buffer in sequence order */
485  spin_lock_bh(&call->lock);
486 
487  skb_queue_walk(&call->rx_oos_queue, p) {
488  psp = rxrpc_skb(p);
489  if (ntohl(psp->hdr.seq) > seq) {
490  _debug("insert oos #%u before #%u",
491  seq, ntohl(psp->hdr.seq));
492  skb_insert(p, skb, &call->rx_oos_queue);
493  goto inserted;
494  }
495  }
496 
497  _debug("append oos #%u", seq);
498  skb_queue_tail(&call->rx_oos_queue, skb);
499 inserted:
500 
501  /* we might now have a new front to the queue */
502  if (call->rx_first_oos == 0 || seq < call->rx_first_oos)
503  call->rx_first_oos = seq;
504 
505  read_lock(&call->state_lock);
506  if (call->state < RXRPC_CALL_COMPLETE &&
507  call->rx_data_post == call->rx_first_oos) {
508  _debug("drain rx oos now");
510  }
511  read_unlock(&call->state_lock);
512 
513  spin_unlock_bh(&call->lock);
514  _leave(" [stored #%u]", call->rx_first_oos);
515 }
516 
517 /*
518  * clear the Tx window on final ACK reception
519  */
520 static void rxrpc_zap_tx_window(struct rxrpc_call *call)
521 {
522  struct rxrpc_skb_priv *sp;
523  struct sk_buff *skb;
524  unsigned long _skb, *acks_window;
525  u8 winsz = call->acks_winsz;
526  int tail;
527 
528  acks_window = call->acks_window;
529  call->acks_window = NULL;
530 
531  while (CIRC_CNT(call->acks_head, call->acks_tail, winsz) > 0) {
532  tail = call->acks_tail;
534  _skb = acks_window[tail] & ~1;
535  smp_mb();
536  call->acks_tail = (call->acks_tail + 1) & (winsz - 1);
537 
538  skb = (struct sk_buff *) _skb;
539  sp = rxrpc_skb(skb);
540  _debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
541  rxrpc_free_skb(skb);
542  }
543 
544  kfree(acks_window);
545 }
546 
547 /*
548  * process the extra information that may be appended to an ACK packet
549  */
550 static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
551  unsigned int latest, int nAcks)
552 {
553  struct rxrpc_ackinfo ackinfo;
554  struct rxrpc_peer *peer;
555  unsigned int mtu;
556 
557  if (skb_copy_bits(skb, nAcks + 3, &ackinfo, sizeof(ackinfo)) < 0) {
558  _leave(" [no ackinfo]");
559  return;
560  }
561 
562  _proto("Rx ACK %%%u Info { rx=%u max=%u rwin=%u jm=%u }",
563  latest,
564  ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU),
565  ntohl(ackinfo.rwind), ntohl(ackinfo.jumbo_max));
566 
567  mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU));
568 
569  peer = call->conn->trans->peer;
570  if (mtu < peer->maxdata) {
571  spin_lock_bh(&peer->lock);
572  peer->maxdata = mtu;
573  peer->mtu = mtu + peer->hdrsize;
574  spin_unlock_bh(&peer->lock);
575  _net("Net MTU %u (maxdata %u)", peer->mtu, peer->maxdata);
576  }
577 }
578 
579 /*
580  * process packets in the reception queue
581  */
582 static int rxrpc_process_rx_queue(struct rxrpc_call *call,
583  u32 *_abort_code)
584 {
585  struct rxrpc_ackpacket ack;
586  struct rxrpc_skb_priv *sp;
587  struct sk_buff *skb;
588  bool post_ACK;
589  int latest;
590  u32 hard, tx;
591 
592  _enter("");
593 
594 process_further:
595  skb = skb_dequeue(&call->rx_queue);
596  if (!skb)
597  return -EAGAIN;
598 
599  _net("deferred skb %p", skb);
600 
601  sp = rxrpc_skb(skb);
602 
603  _debug("process %s [st %d]", rxrpc_pkts[sp->hdr.type], call->state);
604 
605  post_ACK = false;
606 
607  switch (sp->hdr.type) {
608  /* data packets that wind up here have been received out of
609  * order, need security processing or are jumbo packets */
611  _proto("OOSQ DATA %%%u { #%u }",
612  ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
613 
614  /* secured packets must be verified and possibly decrypted */
615  if (rxrpc_verify_packet(call, skb, _abort_code) < 0)
616  goto protocol_error;
617 
618  rxrpc_insert_oos_packet(call, skb);
619  goto process_further;
620 
621  /* partial ACK to process */
623  if (skb_copy_bits(skb, 0, &ack, sizeof(ack)) < 0) {
624  _debug("extraction failure");
625  goto protocol_error;
626  }
627  if (!skb_pull(skb, sizeof(ack)))
628  BUG();
629 
630  latest = ntohl(sp->hdr.serial);
631  hard = ntohl(ack.firstPacket);
632  tx = atomic_read(&call->sequence);
633 
634  _proto("Rx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
635  latest,
636  ntohs(ack.maxSkew),
637  hard,
638  ntohl(ack.previousPacket),
639  ntohl(ack.serial),
640  rxrpc_acks[ack.reason],
641  ack.nAcks);
642 
643  rxrpc_extract_ackinfo(call, skb, latest, ack.nAcks);
644 
645  if (ack.reason == RXRPC_ACK_PING) {
646  _proto("Rx ACK %%%u PING Request", latest);
648  sp->hdr.serial, true);
649  }
650 
651  /* discard any out-of-order or duplicate ACKs */
652  if (latest - call->acks_latest <= 0) {
653  _debug("discard ACK %d <= %d",
654  latest, call->acks_latest);
655  goto discard;
656  }
657  call->acks_latest = latest;
658 
659  if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
660  call->state != RXRPC_CALL_CLIENT_AWAIT_REPLY &&
661  call->state != RXRPC_CALL_SERVER_SEND_REPLY &&
662  call->state != RXRPC_CALL_SERVER_AWAIT_ACK)
663  goto discard;
664 
665  _debug("Tx=%d H=%u S=%d", tx, call->acks_hard, call->state);
666 
667  if (hard > 0) {
668  if (hard - 1 > tx) {
669  _debug("hard-ACK'd packet %d not transmitted"
670  " (%d top)",
671  hard - 1, tx);
672  goto protocol_error;
673  }
674 
675  if ((call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY ||
676  call->state == RXRPC_CALL_SERVER_AWAIT_ACK) &&
677  hard > tx)
678  goto all_acked;
679 
680  smp_rmb();
681  rxrpc_rotate_tx_window(call, hard - 1);
682  }
683 
684  if (ack.nAcks > 0) {
685  if (hard - 1 + ack.nAcks > tx) {
686  _debug("soft-ACK'd packet %d+%d not"
687  " transmitted (%d top)",
688  hard - 1, ack.nAcks, tx);
689  goto protocol_error;
690  }
691 
692  if (rxrpc_process_soft_ACKs(call, &ack, skb) < 0)
693  goto protocol_error;
694  }
695  goto discard;
696 
697  /* complete ACK to process */
699  goto all_acked;
700 
701  /* abort and busy are handled elsewhere */
704  BUG();
705 
706  /* connection level events - also handled elsewhere */
710  BUG();
711  }
712 
713  /* if we've had a hard ACK that covers all the packets we've sent, then
714  * that ends that phase of the operation */
715 all_acked:
716  write_lock_bh(&call->state_lock);
717  _debug("ack all %d", call->state);
718 
719  switch (call->state) {
720  case RXRPC_CALL_CLIENT_AWAIT_REPLY:
721  call->state = RXRPC_CALL_CLIENT_RECV_REPLY;
722  break;
723  case RXRPC_CALL_SERVER_AWAIT_ACK:
724  _debug("srv complete");
725  call->state = RXRPC_CALL_COMPLETE;
726  post_ACK = true;
727  break;
728  case RXRPC_CALL_CLIENT_SEND_REQUEST:
729  case RXRPC_CALL_SERVER_RECV_REQUEST:
730  goto protocol_error_unlock; /* can't occur yet */
731  default:
732  write_unlock_bh(&call->state_lock);
733  goto discard; /* assume packet left over from earlier phase */
734  }
735 
736  write_unlock_bh(&call->state_lock);
737 
738  /* if all the packets we sent are hard-ACK'd, then we can discard
739  * whatever we've got left */
740  _debug("clear Tx %d",
741  CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
742 
746 
747  if (call->acks_window)
748  rxrpc_zap_tx_window(call);
749 
750  if (post_ACK) {
751  /* post the final ACK message for userspace to pick up */
752  _debug("post ACK");
754  sp->call = call;
755  rxrpc_get_call(call);
756  spin_lock_bh(&call->lock);
757  if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
758  BUG();
759  spin_unlock_bh(&call->lock);
760  goto process_further;
761  }
762 
763 discard:
764  rxrpc_free_skb(skb);
765  goto process_further;
766 
767 protocol_error_unlock:
768  write_unlock_bh(&call->state_lock);
769 protocol_error:
770  rxrpc_free_skb(skb);
771  _leave(" = -EPROTO");
772  return -EPROTO;
773 }
774 
775 /*
776  * post a message to the socket Rx queue for recvmsg() to pick up
777  */
778 static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
779  bool fatal)
780 {
781  struct rxrpc_skb_priv *sp;
782  struct sk_buff *skb;
783  int ret;
784 
785  _enter("{%d,%lx},%u,%u,%d",
786  call->debug_id, call->flags, mark, error, fatal);
787 
788  /* remove timers and things for fatal messages */
789  if (fatal) {
791  del_timer_sync(&call->ack_timer);
793  }
794 
795  if (mark != RXRPC_SKB_MARK_NEW_CALL &&
797  _leave("[no userid]");
798  return 0;
799  }
800 
801  if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags)) {
802  skb = alloc_skb(0, GFP_NOFS);
803  if (!skb)
804  return -ENOMEM;
805 
806  rxrpc_new_skb(skb);
807 
808  skb->mark = mark;
809 
810  sp = rxrpc_skb(skb);
811  memset(sp, 0, sizeof(*sp));
812  sp->error = error;
813  sp->call = call;
814  rxrpc_get_call(call);
815 
816  spin_lock_bh(&call->lock);
817  ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
818  spin_unlock_bh(&call->lock);
819  BUG_ON(ret < 0);
820  }
821 
822  return 0;
823 }
824 
825 /*
826  * handle background processing of incoming call packets and ACK / abort
827  * generation
828  */
830 {
831  struct rxrpc_call *call =
832  container_of(work, struct rxrpc_call, processor);
833  struct rxrpc_ackpacket ack;
834  struct rxrpc_ackinfo ackinfo;
835  struct rxrpc_header hdr;
836  struct msghdr msg;
837  struct kvec iov[5];
838  unsigned long bits;
839  __be32 data, pad;
840  size_t len;
841  int genbit, loop, nbit, ioc, ret, mtu;
842  u32 abort_code = RX_PROTOCOL_ERROR;
843  u8 *acks = NULL;
844 
845  //printk("\n--------------------\n");
846  _enter("{%d,%s,%lx} [%lu]",
847  call->debug_id, rxrpc_call_states[call->state], call->events,
848  (jiffies - call->creation_jif) / (HZ / 10));
849 
851  _debug("XXXXXXXXXXXXX RUNNING ON MULTIPLE CPUS XXXXXXXXXXXXX");
852  return;
853  }
854 
855  /* there's a good chance we're going to have to send a message, so set
856  * one up in advance */
857  msg.msg_name = &call->conn->trans->peer->srx.transport.sin;
858  msg.msg_namelen = sizeof(call->conn->trans->peer->srx.transport.sin);
859  msg.msg_control = NULL;
860  msg.msg_controllen = 0;
861  msg.msg_flags = 0;
862 
863  hdr.epoch = call->conn->epoch;
864  hdr.cid = call->cid;
865  hdr.callNumber = call->call_id;
866  hdr.seq = 0;
868  hdr.flags = call->conn->out_clientflag;
869  hdr.userStatus = 0;
870  hdr.securityIndex = call->conn->security_ix;
871  hdr._rsvd = 0;
872  hdr.serviceId = call->conn->service_id;
873 
874  memset(iov, 0, sizeof(iov));
875  iov[0].iov_base = &hdr;
876  iov[0].iov_len = sizeof(hdr);
877 
878  /* deal with events of a final nature */
879  if (test_bit(RXRPC_CALL_RELEASE, &call->events)) {
880  rxrpc_release_call(call);
882  }
883 
884  if (test_bit(RXRPC_CALL_RCVD_ERROR, &call->events)) {
885  int error;
886 
890 
891  error = call->conn->trans->peer->net_error;
892  _debug("post net error %d", error);
893 
894  if (rxrpc_post_message(call, RXRPC_SKB_MARK_NET_ERROR,
895  error, true) < 0)
896  goto no_mem;
898  goto kill_ACKs;
899  }
900 
901  if (test_bit(RXRPC_CALL_CONN_ABORT, &call->events)) {
902  ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
903 
906 
907  _debug("post conn abort");
908 
909  if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
910  call->conn->error, true) < 0)
911  goto no_mem;
913  goto kill_ACKs;
914  }
915 
916  if (test_bit(RXRPC_CALL_REJECT_BUSY, &call->events)) {
918  genbit = RXRPC_CALL_REJECT_BUSY;
919  goto send_message;
920  }
921 
922  if (test_bit(RXRPC_CALL_ABORT, &call->events)) {
923  ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
924 
925  if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
926  ECONNABORTED, true) < 0)
927  goto no_mem;
929  data = htonl(call->abort_code);
930  iov[1].iov_base = &data;
931  iov[1].iov_len = sizeof(data);
932  genbit = RXRPC_CALL_ABORT;
933  goto send_message;
934  }
935 
936  if (test_bit(RXRPC_CALL_ACK_FINAL, &call->events)) {
937  genbit = RXRPC_CALL_ACK_FINAL;
938 
939  ack.bufferSpace = htons(8);
940  ack.maxSkew = 0;
941  ack.serial = 0;
942  ack.reason = RXRPC_ACK_IDLE;
943  ack.nAcks = 0;
944  call->ackr_reason = 0;
945 
946  spin_lock_bh(&call->lock);
947  ack.serial = call->ackr_serial;
948  ack.previousPacket = call->ackr_prev_seq;
949  ack.firstPacket = htonl(call->rx_data_eaten + 1);
950  spin_unlock_bh(&call->lock);
951 
952  pad = 0;
953 
954  iov[1].iov_base = &ack;
955  iov[1].iov_len = sizeof(ack);
956  iov[2].iov_base = &pad;
957  iov[2].iov_len = 3;
958  iov[3].iov_base = &ackinfo;
959  iov[3].iov_len = sizeof(ackinfo);
960  goto send_ACK;
961  }
962 
963  if (call->events & ((1 << RXRPC_CALL_RCVD_BUSY) |
964  (1 << RXRPC_CALL_RCVD_ABORT))
965  ) {
966  u32 mark;
967 
970  else
971  mark = RXRPC_SKB_MARK_BUSY;
972 
973  _debug("post abort/busy");
974  rxrpc_clear_tx_window(call);
975  if (rxrpc_post_message(call, mark, ECONNABORTED, true) < 0)
976  goto no_mem;
977 
980  goto kill_ACKs;
981  }
982 
984  _debug("do implicit ackall");
985  rxrpc_clear_tx_window(call);
986  }
987 
988  if (test_bit(RXRPC_CALL_LIFE_TIMER, &call->events)) {
989  write_lock_bh(&call->state_lock);
990  if (call->state <= RXRPC_CALL_COMPLETE) {
991  call->state = RXRPC_CALL_LOCALLY_ABORTED;
992  call->abort_code = RX_CALL_TIMEOUT;
994  }
995  write_unlock_bh(&call->state_lock);
996 
997  _debug("post timeout");
998  if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
999  ETIME, true) < 0)
1000  goto no_mem;
1001 
1003  goto kill_ACKs;
1004  }
1005 
1006  /* deal with assorted inbound messages */
1007  if (!skb_queue_empty(&call->rx_queue)) {
1008  switch (rxrpc_process_rx_queue(call, &abort_code)) {
1009  case 0:
1010  case -EAGAIN:
1011  break;
1012  case -ENOMEM:
1013  goto no_mem;
1014  case -EKEYEXPIRED:
1015  case -EKEYREJECTED:
1016  case -EPROTO:
1017  rxrpc_abort_call(call, abort_code);
1018  goto kill_ACKs;
1019  }
1020  }
1021 
1022  /* handle resending */
1024  rxrpc_resend_timer(call);
1026  rxrpc_resend(call);
1027 
1028  /* consider sending an ordinary ACK */
1029  if (test_bit(RXRPC_CALL_ACK, &call->events)) {
1030  _debug("send ACK: window: %d - %d { %lx }",
1031  call->rx_data_eaten, call->ackr_win_top,
1032  call->ackr_window[0]);
1033 
1034  if (call->state > RXRPC_CALL_SERVER_ACK_REQUEST &&
1036  /* ACK by sending reply DATA packet in this state */
1037  clear_bit(RXRPC_CALL_ACK, &call->events);
1038  goto maybe_reschedule;
1039  }
1040 
1041  genbit = RXRPC_CALL_ACK;
1042 
1043  acks = kzalloc(call->ackr_win_top - call->rx_data_eaten,
1044  GFP_NOFS);
1045  if (!acks)
1046  goto no_mem;
1047 
1048  //hdr.flags = RXRPC_SLOW_START_OK;
1049  ack.bufferSpace = htons(8);
1050  ack.maxSkew = 0;
1051  ack.serial = 0;
1052  ack.reason = 0;
1053 
1054  spin_lock_bh(&call->lock);
1055  ack.reason = call->ackr_reason;
1056  ack.serial = call->ackr_serial;
1057  ack.previousPacket = call->ackr_prev_seq;
1058  ack.firstPacket = htonl(call->rx_data_eaten + 1);
1059 
1060  ack.nAcks = 0;
1061  for (loop = 0; loop < RXRPC_ACKR_WINDOW_ASZ; loop++) {
1062  nbit = loop * BITS_PER_LONG;
1063  for (bits = call->ackr_window[loop]; bits; bits >>= 1
1064  ) {
1065  _debug("- l=%d n=%d b=%lx", loop, nbit, bits);
1066  if (bits & 1) {
1067  acks[nbit] = RXRPC_ACK_TYPE_ACK;
1068  ack.nAcks = nbit + 1;
1069  }
1070  nbit++;
1071  }
1072  }
1073  call->ackr_reason = 0;
1074  spin_unlock_bh(&call->lock);
1075 
1076  pad = 0;
1077 
1078  iov[1].iov_base = &ack;
1079  iov[1].iov_len = sizeof(ack);
1080  iov[2].iov_base = acks;
1081  iov[2].iov_len = ack.nAcks;
1082  iov[3].iov_base = &pad;
1083  iov[3].iov_len = 3;
1084  iov[4].iov_base = &ackinfo;
1085  iov[4].iov_len = sizeof(ackinfo);
1086 
1087  switch (ack.reason) {
1088  case RXRPC_ACK_REQUESTED:
1089  case RXRPC_ACK_DUPLICATE:
1092  case RXRPC_ACK_NOSPACE:
1093  case RXRPC_ACK_PING:
1095  goto send_ACK_with_skew;
1096  case RXRPC_ACK_DELAY:
1097  case RXRPC_ACK_IDLE:
1098  goto send_ACK;
1099  }
1100  }
1101 
1102  /* handle completion of security negotiations on an incoming
1103  * connection */
1105  _debug("secured");
1106  spin_lock_bh(&call->lock);
1107 
1108  if (call->state == RXRPC_CALL_SERVER_SECURING) {
1109  _debug("securing");
1110  write_lock(&call->conn->lock);
1111  if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1112  !test_bit(RXRPC_CALL_RELEASE, &call->events)) {
1113  _debug("not released");
1114  call->state = RXRPC_CALL_SERVER_ACCEPTING;
1115  list_move_tail(&call->accept_link,
1116  &call->socket->acceptq);
1117  }
1118  write_unlock(&call->conn->lock);
1119  read_lock(&call->state_lock);
1120  if (call->state < RXRPC_CALL_COMPLETE)
1122  read_unlock(&call->state_lock);
1123  }
1124 
1125  spin_unlock_bh(&call->lock);
1126  if (!test_bit(RXRPC_CALL_POST_ACCEPT, &call->events))
1127  goto maybe_reschedule;
1128  }
1129 
1130  /* post a notification of an acceptable connection to the app */
1131  if (test_bit(RXRPC_CALL_POST_ACCEPT, &call->events)) {
1132  _debug("post accept");
1133  if (rxrpc_post_message(call, RXRPC_SKB_MARK_NEW_CALL,
1134  0, false) < 0)
1135  goto no_mem;
1137  goto maybe_reschedule;
1138  }
1139 
1140  /* handle incoming call acceptance */
1142  _debug("accepted");
1143  ASSERTCMP(call->rx_data_post, ==, 0);
1144  call->rx_data_post = 1;
1145  read_lock_bh(&call->state_lock);
1146  if (call->state < RXRPC_CALL_COMPLETE)
1148  read_unlock_bh(&call->state_lock);
1149  }
1150 
1151  /* drain the out of sequence received packet queue into the packet Rx
1152  * queue */
1154  while (call->rx_data_post == call->rx_first_oos)
1155  if (rxrpc_drain_rx_oos_queue(call) < 0)
1156  break;
1157  goto maybe_reschedule;
1158  }
1159 
1160  /* other events may have been raised since we started checking */
1161  goto maybe_reschedule;
1162 
1163 send_ACK_with_skew:
1164  ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) -
1165  ntohl(ack.serial));
1166 send_ACK:
1167  mtu = call->conn->trans->peer->if_mtu;
1168  mtu -= call->conn->trans->peer->hdrsize;
1169  ackinfo.maxMTU = htonl(mtu);
1170  ackinfo.rwind = htonl(32);
1171 
1172  /* permit the peer to send us jumbo packets if it wants to */
1173  ackinfo.rxMTU = htonl(5692);
1174  ackinfo.jumbo_max = htonl(4);
1175 
1176  hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1177  _proto("Tx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
1178  ntohl(hdr.serial),
1179  ntohs(ack.maxSkew),
1180  ntohl(ack.firstPacket),
1181  ntohl(ack.previousPacket),
1182  ntohl(ack.serial),
1183  rxrpc_acks[ack.reason],
1184  ack.nAcks);
1185 
1186  del_timer_sync(&call->ack_timer);
1187  if (ack.nAcks > 0)
1189  goto send_message_2;
1190 
1191 send_message:
1192  _debug("send message");
1193 
1194  hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1195  _proto("Tx %s %%%u", rxrpc_pkts[hdr.type], ntohl(hdr.serial));
1196 send_message_2:
1197 
1198  len = iov[0].iov_len;
1199  ioc = 1;
1200  if (iov[4].iov_len) {
1201  ioc = 5;
1202  len += iov[4].iov_len;
1203  len += iov[3].iov_len;
1204  len += iov[2].iov_len;
1205  len += iov[1].iov_len;
1206  } else if (iov[3].iov_len) {
1207  ioc = 4;
1208  len += iov[3].iov_len;
1209  len += iov[2].iov_len;
1210  len += iov[1].iov_len;
1211  } else if (iov[2].iov_len) {
1212  ioc = 3;
1213  len += iov[2].iov_len;
1214  len += iov[1].iov_len;
1215  } else if (iov[1].iov_len) {
1216  ioc = 2;
1217  len += iov[1].iov_len;
1218  }
1219 
1220  ret = kernel_sendmsg(call->conn->trans->local->socket,
1221  &msg, iov, ioc, len);
1222  if (ret < 0) {
1223  _debug("sendmsg failed: %d", ret);
1224  read_lock_bh(&call->state_lock);
1225  if (call->state < RXRPC_CALL_DEAD)
1226  rxrpc_queue_call(call);
1227  read_unlock_bh(&call->state_lock);
1228  goto error;
1229  }
1230 
1231  switch (genbit) {
1232  case RXRPC_CALL_ABORT:
1233  clear_bit(genbit, &call->events);
1235  goto kill_ACKs;
1236 
1237  case RXRPC_CALL_ACK_FINAL:
1238  write_lock_bh(&call->state_lock);
1239  if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
1240  call->state = RXRPC_CALL_COMPLETE;
1241  write_unlock_bh(&call->state_lock);
1242  goto kill_ACKs;
1243 
1244  default:
1245  clear_bit(genbit, &call->events);
1246  switch (call->state) {
1247  case RXRPC_CALL_CLIENT_AWAIT_REPLY:
1248  case RXRPC_CALL_CLIENT_RECV_REPLY:
1249  case RXRPC_CALL_SERVER_RECV_REQUEST:
1250  case RXRPC_CALL_SERVER_ACK_REQUEST:
1251  _debug("start ACK timer");
1253  call->ackr_serial, false);
1254  default:
1255  break;
1256  }
1257  goto maybe_reschedule;
1258  }
1259 
1260 kill_ACKs:
1261  del_timer_sync(&call->ack_timer);
1263  rxrpc_put_call(call);
1264  clear_bit(RXRPC_CALL_ACK, &call->events);
1265 
1266 maybe_reschedule:
1267  if (call->events || !skb_queue_empty(&call->rx_queue)) {
1268  read_lock_bh(&call->state_lock);
1269  if (call->state < RXRPC_CALL_DEAD)
1270  rxrpc_queue_call(call);
1271  read_unlock_bh(&call->state_lock);
1272  }
1273 
1274  /* don't leave aborted connections on the accept queue */
1275  if (call->state >= RXRPC_CALL_COMPLETE &&
1276  !list_empty(&call->accept_link)) {
1277  _debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
1278  call, call->events, call->flags,
1279  ntohl(call->conn->cid));
1280 
1281  read_lock_bh(&call->state_lock);
1282  if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1284  rxrpc_queue_call(call);
1285  read_unlock_bh(&call->state_lock);
1286  }
1287 
1288 error:
1290  kfree(acks);
1291 
1292  /* because we don't want two CPUs both processing the work item for one
1293  * call at the same time, we use a flag to note when it's busy; however
1294  * this means there's a race between clearing the flag and setting the
1295  * work pending bit and the work item being processed again */
1296  if (call->events && !work_pending(&call->processor)) {
1297  _debug("jumpstart %x", ntohl(call->conn->cid));
1298  rxrpc_queue_call(call);
1299  }
1300 
1301  _leave("");
1302  return;
1303 
1304 no_mem:
1305  _debug("out of memory");
1306  goto maybe_reschedule;
1307 }