Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
drbd_receiver.c
Go to the documentation of this file.
1 /*
2  drbd_receiver.c
3 
4  This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6  Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7  Copyright (C) 1999-2008, Philipp Reisner <[email protected]>.
8  Copyright (C) 2002-2008, Lars Ellenberg <[email protected]>.
9 
10  drbd is free software; you can redistribute it and/or modify
11  it under the terms of the GNU General Public License as published by
12  the Free Software Foundation; either version 2, or (at your option)
13  any later version.
14 
15  drbd is distributed in the hope that it will be useful,
16  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  GNU General Public License for more details.
19 
20  You should have received a copy of the GNU General Public License
21  along with drbd; see the file COPYING. If not, write to
22  the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48 
49 #include "drbd_vli.h"
50 
55 };
56 
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
59 
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62 
63 
64 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
65 
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70 
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77  struct page *page;
78  struct page *tmp;
79 
80  BUG_ON(!n);
81  BUG_ON(!head);
82 
83  page = *head;
84 
85  if (!page)
86  return NULL;
87 
88  while (page) {
89  tmp = page_chain_next(page);
90  if (--n == 0)
91  break; /* found sufficient pages */
92  if (tmp == NULL)
93  /* insufficient pages, don't use any of them. */
94  return NULL;
95  page = tmp;
96  }
97 
98  /* add end of list marker for the returned list */
99  set_page_private(page, 0);
100  /* actual return value, and adjustment of head */
101  page = *head;
102  *head = tmp;
103  return page;
104 }
105 
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111  struct page *tmp;
112  int i = 1;
113  while ((tmp = page_chain_next(page)))
114  ++i, page = tmp;
115  if (len)
116  *len = i;
117  return page;
118 }
119 
120 static int page_chain_free(struct page *page)
121 {
122  struct page *tmp;
123  int i = 0;
124  page_chain_for_each_safe(page, tmp) {
125  put_page(page);
126  ++i;
127  }
128  return i;
129 }
130 
131 static void page_chain_add(struct page **head,
132  struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135  struct page *tmp;
136  tmp = page_chain_tail(chain_first, NULL);
137  BUG_ON(tmp != chain_last);
138 #endif
139 
140  /* add chain to head */
141  set_page_private(chain_last, (unsigned long)*head);
142  *head = chain_first;
143 }
144 
145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
146 {
147  struct page *page = NULL;
148  struct page *tmp = NULL;
149  int i = 0;
150 
151  /* Yes, testing drbd_pp_vacant outside the lock is racy.
152  * So what. It saves a spin_lock. */
153  if (drbd_pp_vacant >= number) {
154  spin_lock(&drbd_pp_lock);
155  page = page_chain_del(&drbd_pp_pool, number);
156  if (page)
157  drbd_pp_vacant -= number;
158  spin_unlock(&drbd_pp_lock);
159  if (page)
160  return page;
161  }
162 
163  /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164  * "criss-cross" setup, that might cause write-out on some other DRBD,
165  * which in turn might block on the other node at this very place. */
166  for (i = 0; i < number; i++) {
167  tmp = alloc_page(GFP_TRY);
168  if (!tmp)
169  break;
170  set_page_private(tmp, (unsigned long)page);
171  page = tmp;
172  }
173 
174  if (i == number)
175  return page;
176 
177  /* Not enough pages immediately available this time.
178  * No need to jump around here, drbd_pp_alloc will retry this
179  * function "soon". */
180  if (page) {
181  tmp = page_chain_tail(page, NULL);
182  spin_lock(&drbd_pp_lock);
183  page_chain_add(&drbd_pp_pool, page, tmp);
184  drbd_pp_vacant += i;
185  spin_unlock(&drbd_pp_lock);
186  }
187  return NULL;
188 }
189 
190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191 {
192  struct drbd_epoch_entry *e;
193  struct list_head *le, *tle;
194 
195  /* The EEs are always appended to the end of the list. Since
196  they are sent in order over the wire, they have to finish
197  in order. As soon as we see the first not finished we can
198  stop to examine the list... */
199 
200  list_for_each_safe(le, tle, &mdev->net_ee) {
201  e = list_entry(le, struct drbd_epoch_entry, w.list);
202  if (drbd_ee_has_active_page(e))
203  break;
204  list_move(le, to_be_freed);
205  }
206 }
207 
208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209 {
210  LIST_HEAD(reclaimed);
211  struct drbd_epoch_entry *e, *t;
212 
213  spin_lock_irq(&mdev->req_lock);
214  reclaim_net_ee(mdev, &reclaimed);
215  spin_unlock_irq(&mdev->req_lock);
216 
217  list_for_each_entry_safe(e, t, &reclaimed, w.list)
218  drbd_free_net_ee(mdev, e);
219 }
220 
233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
234 {
235  struct page *page = NULL;
236  DEFINE_WAIT(wait);
237 
238  /* Yes, we may run up to @number over max_buffers. If we
239  * follow it strictly, the admin will get it wrong anyways. */
240  if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
241  page = drbd_pp_first_pages_or_try_alloc(mdev, number);
242 
243  while (page == NULL) {
245 
246  drbd_kick_lo_and_reclaim_net(mdev);
247 
248  if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
249  page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250  if (page)
251  break;
252  }
253 
254  if (!retry)
255  break;
256 
257  if (signal_pending(current)) {
258  dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259  break;
260  }
261 
262  schedule();
263  }
265 
266  if (page)
267  atomic_add(number, &mdev->pp_in_use);
268  return page;
269 }
270 
271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273  * Either links the page chain back to the global pool,
274  * or returns all pages to the system. */
275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
276 {
277  atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
278  int i;
279 
280  if (page == NULL)
281  return;
282 
284  i = page_chain_free(page);
285  else {
286  struct page *tmp;
287  tmp = page_chain_tail(page, &i);
288  spin_lock(&drbd_pp_lock);
289  page_chain_add(&drbd_pp_pool, page, tmp);
290  drbd_pp_vacant += i;
291  spin_unlock(&drbd_pp_lock);
292  }
293  i = atomic_sub_return(i, a);
294  if (i < 0)
295  dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
296  is_net ? "pp_in_use_by_net" : "pp_in_use", i);
298 }
299 
300 /*
301 You need to hold the req_lock:
302  _drbd_wait_ee_list_empty()
303 
304 You must not have the req_lock:
305  drbd_free_ee()
306  drbd_alloc_ee()
307  drbd_init_ee()
308  drbd_release_ee()
309  drbd_ee_fix_bhs()
310  drbd_process_done_ee()
311  drbd_clear_done_ee()
312  drbd_wait_ee_list_empty()
313 */
314 
316  u64 id,
318  unsigned int data_size,
319  gfp_t gfp_mask) __must_hold(local)
320 {
321  struct drbd_epoch_entry *e;
322  struct page *page = NULL;
323  unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
324 
325  if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
326  return NULL;
327 
329  if (!e) {
330  if (!(gfp_mask & __GFP_NOWARN))
331  dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
332  return NULL;
333  }
334 
335  if (data_size) {
336  page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
337  if (!page)
338  goto fail;
339  }
340 
341  INIT_HLIST_NODE(&e->collision);
342  e->epoch = NULL;
343  e->mdev = mdev;
344  e->pages = page;
345  atomic_set(&e->pending_bios, 0);
346  e->size = data_size;
347  e->flags = 0;
348  e->sector = sector;
349  e->block_id = id;
350 
351  return e;
352 
353  fail:
355  return NULL;
356 }
357 
358 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
359 {
360  if (e->flags & EE_HAS_DIGEST)
361  kfree(e->digest);
362  drbd_pp_free(mdev, e->pages, is_net);
363  D_ASSERT(atomic_read(&e->pending_bios) == 0);
364  D_ASSERT(hlist_unhashed(&e->collision));
366 }
367 
368 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
369 {
370  LIST_HEAD(work_list);
371  struct drbd_epoch_entry *e, *t;
372  int count = 0;
373  int is_net = list == &mdev->net_ee;
374 
375  spin_lock_irq(&mdev->req_lock);
376  list_splice_init(list, &work_list);
377  spin_unlock_irq(&mdev->req_lock);
378 
379  list_for_each_entry_safe(e, t, &work_list, w.list) {
380  drbd_free_some_ee(mdev, e, is_net);
381  count++;
382  }
383  return count;
384 }
385 
386 
387 /*
388  * This function is called from _asender only_
389  * but see also comments in _req_mod(,barrier_acked)
390  * and receive_Barrier.
391  *
392  * Move entries from net_ee to done_ee, if ready.
393  * Grab done_ee, call all callbacks, free the entries.
394  * The callbacks typically send out ACKs.
395  */
396 static int drbd_process_done_ee(struct drbd_conf *mdev)
397 {
398  LIST_HEAD(work_list);
399  LIST_HEAD(reclaimed);
400  struct drbd_epoch_entry *e, *t;
401  int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
402 
403  spin_lock_irq(&mdev->req_lock);
404  reclaim_net_ee(mdev, &reclaimed);
405  list_splice_init(&mdev->done_ee, &work_list);
406  spin_unlock_irq(&mdev->req_lock);
407 
408  list_for_each_entry_safe(e, t, &reclaimed, w.list)
409  drbd_free_net_ee(mdev, e);
410 
411  /* possible callbacks here:
412  * e_end_block, and e_end_resync_block, e_send_discard_ack.
413  * all ignore the last argument.
414  */
415  list_for_each_entry_safe(e, t, &work_list, w.list) {
416  /* list_del not necessary, next/prev members not touched */
417  ok = e->w.cb(mdev, &e->w, !ok) && ok;
418  drbd_free_ee(mdev, e);
419  }
420  wake_up(&mdev->ee_wait);
421 
422  return ok;
423 }
424 
425 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
426 {
427  DEFINE_WAIT(wait);
428 
429  /* avoids spin_lock/unlock
430  * and calling prepare_to_wait in the fast path */
431  while (!list_empty(head)) {
433  spin_unlock_irq(&mdev->req_lock);
434  io_schedule();
435  finish_wait(&mdev->ee_wait, &wait);
436  spin_lock_irq(&mdev->req_lock);
437  }
438 }
439 
440 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
441 {
442  spin_lock_irq(&mdev->req_lock);
443  _drbd_wait_ee_list_empty(mdev, head);
444  spin_unlock_irq(&mdev->req_lock);
445 }
446 
447 /* see also kernel_accept; which is only present since 2.6.18.
448  * also we want to log which part of it failed, exactly */
449 static int drbd_accept(struct drbd_conf *mdev, const char **what,
450  struct socket *sock, struct socket **newsock)
451 {
452  struct sock *sk = sock->sk;
453  int err = 0;
454 
455  *what = "listen";
456  err = sock->ops->listen(sock, 5);
457  if (err < 0)
458  goto out;
459 
460  *what = "sock_create_lite";
461  err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
462  newsock);
463  if (err < 0)
464  goto out;
465 
466  *what = "accept";
467  err = sock->ops->accept(sock, *newsock, 0);
468  if (err < 0) {
469  sock_release(*newsock);
470  *newsock = NULL;
471  goto out;
472  }
473  (*newsock)->ops = sock->ops;
474  __module_get((*newsock)->ops->owner);
475 
476 out:
477  return err;
478 }
479 
480 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
481  void *buf, size_t size, int flags)
482 {
483  mm_segment_t oldfs;
484  struct kvec iov = {
485  .iov_base = buf,
486  .iov_len = size,
487  };
488  struct msghdr msg = {
489  .msg_iovlen = 1,
490  .msg_iov = (struct iovec *)&iov,
491  .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
492  };
493  int rv;
494 
495  oldfs = get_fs();
496  set_fs(KERNEL_DS);
497  rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
498  set_fs(oldfs);
499 
500  return rv;
501 }
502 
503 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
504 {
505  mm_segment_t oldfs;
506  struct kvec iov = {
507  .iov_base = buf,
508  .iov_len = size,
509  };
510  struct msghdr msg = {
511  .msg_iovlen = 1,
512  .msg_iov = (struct iovec *)&iov,
513  .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
514  };
515  int rv;
516 
517  oldfs = get_fs();
518  set_fs(KERNEL_DS);
519 
520  for (;;) {
521  rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
522  if (rv == size)
523  break;
524 
525  /* Note:
526  * ECONNRESET other side closed the connection
527  * ERESTARTSYS (on sock) we got a signal
528  */
529 
530  if (rv < 0) {
531  if (rv == -ECONNRESET)
532  dev_info(DEV, "sock was reset by peer\n");
533  else if (rv != -ERESTARTSYS)
534  dev_err(DEV, "sock_recvmsg returned %d\n", rv);
535  break;
536  } else if (rv == 0) {
537  dev_info(DEV, "sock was shut down by peer\n");
538  break;
539  } else {
540  /* signal came in, or peer/link went down,
541  * after we read a partial message
542  */
543  /* D_ASSERT(signal_pending(current)); */
544  break;
545  }
546  };
547 
548  set_fs(oldfs);
549 
550  if (rv != size)
551  drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
552 
553  return rv;
554 }
555 
556 /* quoting tcp(7):
557  * On individual connections, the socket buffer size must be set prior to the
558  * listen(2) or connect(2) calls in order to have it take effect.
559  * This is our wrapper to do so.
560  */
561 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
562  unsigned int rcv)
563 {
564  /* open coded SO_SNDBUF, SO_RCVBUF */
565  if (snd) {
566  sock->sk->sk_sndbuf = snd;
567  sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
568  }
569  if (rcv) {
570  sock->sk->sk_rcvbuf = rcv;
571  sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
572  }
573 }
574 
575 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
576 {
577  const char *what;
578  struct socket *sock;
579  struct sockaddr_in6 src_in6;
580  int err;
581  int disconnect_on_error = 1;
582 
583  if (!get_net_conf(mdev))
584  return NULL;
585 
586  what = "sock_create_kern";
587  err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
588  SOCK_STREAM, IPPROTO_TCP, &sock);
589  if (err < 0) {
590  sock = NULL;
591  goto out;
592  }
593 
594  sock->sk->sk_rcvtimeo =
595  sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
596  drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
597  mdev->net_conf->rcvbuf_size);
598 
599  /* explicitly bind to the configured IP as source IP
600  * for the outgoing connections.
601  * This is needed for multihomed hosts and to be
602  * able to use lo: interfaces for drbd.
603  * Make sure to use 0 as port number, so linux selects
604  * a free one dynamically.
605  */
606  memcpy(&src_in6, mdev->net_conf->my_addr,
607  min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
608  if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
609  src_in6.sin6_port = 0;
610  else
611  ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
612 
613  what = "bind before connect";
614  err = sock->ops->bind(sock,
615  (struct sockaddr *) &src_in6,
616  mdev->net_conf->my_addr_len);
617  if (err < 0)
618  goto out;
619 
620  /* connect may fail, peer not yet available.
621  * stay C_WF_CONNECTION, don't go Disconnecting! */
622  disconnect_on_error = 0;
623  what = "connect";
624  err = sock->ops->connect(sock,
625  (struct sockaddr *)mdev->net_conf->peer_addr,
626  mdev->net_conf->peer_addr_len, 0);
627 
628 out:
629  if (err < 0) {
630  if (sock) {
631  sock_release(sock);
632  sock = NULL;
633  }
634  switch (-err) {
635  /* timeout, busy, signal pending */
636  case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
637  case EINTR: case ERESTARTSYS:
638  /* peer not (yet) available, network problem */
639  case ECONNREFUSED: case ENETUNREACH:
640  case EHOSTDOWN: case EHOSTUNREACH:
641  disconnect_on_error = 0;
642  break;
643  default:
644  dev_err(DEV, "%s failed, err = %d\n", what, err);
645  }
646  if (disconnect_on_error)
647  drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
648  }
649  put_net_conf(mdev);
650  return sock;
651 }
652 
653 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
654 {
655  int timeo, err;
656  struct socket *s_estab = NULL, *s_listen;
657  const char *what;
658 
659  if (!get_net_conf(mdev))
660  return NULL;
661 
662  what = "sock_create_kern";
663  err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
664  SOCK_STREAM, IPPROTO_TCP, &s_listen);
665  if (err) {
666  s_listen = NULL;
667  goto out;
668  }
669 
670  timeo = mdev->net_conf->try_connect_int * HZ;
671  timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
672 
673  s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
674  s_listen->sk->sk_rcvtimeo = timeo;
675  s_listen->sk->sk_sndtimeo = timeo;
676  drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
677  mdev->net_conf->rcvbuf_size);
678 
679  what = "bind before listen";
680  err = s_listen->ops->bind(s_listen,
681  (struct sockaddr *) mdev->net_conf->my_addr,
682  mdev->net_conf->my_addr_len);
683  if (err < 0)
684  goto out;
685 
686  err = drbd_accept(mdev, &what, s_listen, &s_estab);
687 
688 out:
689  if (s_listen)
690  sock_release(s_listen);
691  if (err < 0) {
692  if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
693  dev_err(DEV, "%s failed, err = %d\n", what, err);
694  drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
695  }
696  }
697  put_net_conf(mdev);
698 
699  return s_estab;
700 }
701 
702 static int drbd_send_fp(struct drbd_conf *mdev,
703  struct socket *sock, enum drbd_packets cmd)
704 {
705  struct p_header80 *h = &mdev->data.sbuf.header.h80;
706 
707  return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
708 }
709 
710 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
711 {
712  struct p_header80 *h = &mdev->data.rbuf.header.h80;
713  int rr;
714 
715  rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
716 
717  if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
718  return be16_to_cpu(h->command);
719 
720  return 0xffff;
721 }
722 
728 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
729 {
730  int rr;
731  char tb[4];
732 
733  if (!*sock)
734  return false;
735 
736  rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
737 
738  if (rr > 0 || rr == -EAGAIN) {
739  return true;
740  } else {
741  sock_release(*sock);
742  *sock = NULL;
743  return false;
744  }
745 }
746 
747 /*
748  * return values:
749  * 1 yes, we have a valid connection
750  * 0 oops, did not work out, please try again
751  * -1 peer talks different language,
752  * no point in trying again, please go standalone.
753  * -2 We do not have a network config...
754  */
755 static int drbd_connect(struct drbd_conf *mdev)
756 {
757  struct socket *s, *sock, *msock;
758  int try, h, ok;
759  enum drbd_state_rv rv;
760 
761  D_ASSERT(!mdev->data.socket);
762 
763  if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
764  return -2;
765 
767 
768  sock = NULL;
769  msock = NULL;
770 
771  do {
772  for (try = 0;;) {
773  /* 3 tries, this should take less than a second! */
774  s = drbd_try_connect(mdev);
775  if (s || ++try >= 3)
776  break;
777  /* give the other side time to call bind() & listen() */
779  }
780 
781  if (s) {
782  if (!sock) {
783  drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
784  sock = s;
785  s = NULL;
786  } else if (!msock) {
787  drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
788  msock = s;
789  s = NULL;
790  } else {
791  dev_err(DEV, "Logic error in drbd_connect()\n");
792  goto out_release_sockets;
793  }
794  }
795 
796  if (sock && msock) {
797  schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10);
798  ok = drbd_socket_okay(mdev, &sock);
799  ok = drbd_socket_okay(mdev, &msock) && ok;
800  if (ok)
801  break;
802  }
803 
804 retry:
805  s = drbd_wait_for_connect(mdev);
806  if (s) {
807  try = drbd_recv_fp(mdev, s);
808  drbd_socket_okay(mdev, &sock);
809  drbd_socket_okay(mdev, &msock);
810  switch (try) {
811  case P_HAND_SHAKE_S:
812  if (sock) {
813  dev_warn(DEV, "initial packet S crossed\n");
814  sock_release(sock);
815  }
816  sock = s;
817  break;
818  case P_HAND_SHAKE_M:
819  if (msock) {
820  dev_warn(DEV, "initial packet M crossed\n");
821  sock_release(msock);
822  }
823  msock = s;
825  break;
826  default:
827  dev_warn(DEV, "Error receiving initial packet\n");
828  sock_release(s);
829  if (random32() & 1)
830  goto retry;
831  }
832  }
833 
834  if (mdev->state.conn <= C_DISCONNECTING)
835  goto out_release_sockets;
836  if (signal_pending(current)) {
838  smp_rmb();
839  if (get_t_state(&mdev->receiver) == Exiting)
840  goto out_release_sockets;
841  }
842 
843  if (sock && msock) {
844  ok = drbd_socket_okay(mdev, &sock);
845  ok = drbd_socket_okay(mdev, &msock) && ok;
846  if (ok)
847  break;
848  }
849  } while (1);
850 
851  msock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
852  sock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
853 
854  sock->sk->sk_allocation = GFP_NOIO;
855  msock->sk->sk_allocation = GFP_NOIO;
856 
857  sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
858  msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
859 
860  /* NOT YET ...
861  * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862  * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
863  * first set it to the P_HAND_SHAKE timeout,
864  * which we set to 4x the configured ping_timeout. */
865  sock->sk->sk_sndtimeo =
866  sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
867 
868  msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
869  msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
870 
871  /* we don't want delays.
872  * we use TCP_CORK where appropriate, though */
873  drbd_tcp_nodelay(sock);
874  drbd_tcp_nodelay(msock);
875 
876  mdev->data.socket = sock;
877  mdev->meta.socket = msock;
878  mdev->last_received = jiffies;
879 
880  D_ASSERT(mdev->asender.task == NULL);
881 
882  h = drbd_do_handshake(mdev);
883  if (h <= 0)
884  return h;
885 
886  if (mdev->cram_hmac_tfm) {
887  /* drbd_request_state(mdev, NS(conn, WFAuth)); */
888  switch (drbd_do_auth(mdev)) {
889  case -1:
890  dev_err(DEV, "Authentication of peer failed\n");
891  return -1;
892  case 0:
893  dev_err(DEV, "Authentication of peer failed, trying again.\n");
894  return 0;
895  }
896  }
897 
898  sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
899  sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
900 
901  atomic_set(&mdev->packet_seq, 0);
902  mdev->peer_seq = 0;
903 
904  if (drbd_send_protocol(mdev) == -1)
905  return -1;
906  set_bit(STATE_SENT, &mdev->flags);
907  drbd_send_sync_param(mdev, &mdev->sync_conf);
908  drbd_send_sizes(mdev, 0, 0);
909  drbd_send_uuids(mdev);
911  clear_bit(USE_DEGR_WFC_T, &mdev->flags);
912  clear_bit(RESIZE_PENDING, &mdev->flags);
913 
914  spin_lock_irq(&mdev->req_lock);
915  rv = _drbd_set_state(_NS(mdev, conn, C_WF_REPORT_PARAMS), CS_VERBOSE, NULL);
916  if (mdev->state.conn != C_WF_REPORT_PARAMS)
917  clear_bit(STATE_SENT, &mdev->flags);
918  spin_unlock_irq(&mdev->req_lock);
919 
920  if (rv < SS_SUCCESS)
921  return 0;
922 
923  drbd_thread_start(&mdev->asender);
924  mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
925 
926  return 1;
927 
928 out_release_sockets:
929  if (sock)
930  sock_release(sock);
931  if (msock)
932  sock_release(msock);
933  return -1;
934 }
935 
936 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
937 {
938  union p_header *h = &mdev->data.rbuf.header;
939  int r;
940 
941  r = drbd_recv(mdev, h, sizeof(*h));
942  if (unlikely(r != sizeof(*h))) {
943  if (!signal_pending(current))
944  dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
945  return false;
946  }
947 
948  if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
949  *cmd = be16_to_cpu(h->h80.command);
950  *packet_size = be16_to_cpu(h->h80.length);
951  } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
952  *cmd = be16_to_cpu(h->h95.command);
953  *packet_size = be32_to_cpu(h->h95.length);
954  } else {
955  dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
956  be32_to_cpu(h->h80.magic),
957  be16_to_cpu(h->h80.command),
958  be16_to_cpu(h->h80.length));
959  return false;
960  }
961  mdev->last_received = jiffies;
962 
963  return true;
964 }
965 
966 static void drbd_flush(struct drbd_conf *mdev)
967 {
968  int rv;
969 
970  if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
971  rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
972  NULL);
973  if (rv) {
974  dev_info(DEV, "local disk flush failed with status %d\n", rv);
975  /* would rather check on EOPNOTSUPP, but that is not reliable.
976  * don't try again for ANY return value != 0
977  * if (rv == -EOPNOTSUPP) */
979  }
980  put_ldev(mdev);
981  }
982 }
983 
990 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
991  struct drbd_epoch *epoch,
992  enum epoch_event ev)
993 {
994  int epoch_size;
995  struct drbd_epoch *next_epoch;
996  enum finish_epoch rv = FE_STILL_LIVE;
997 
998  spin_lock(&mdev->epoch_lock);
999  do {
1000  next_epoch = NULL;
1001 
1002  epoch_size = atomic_read(&epoch->epoch_size);
1003 
1004  switch (ev & ~EV_CLEANUP) {
1005  case EV_PUT:
1006  atomic_dec(&epoch->active);
1007  break;
1008  case EV_GOT_BARRIER_NR:
1010  break;
1011  case EV_BECAME_LAST:
1012  /* nothing to do*/
1013  break;
1014  }
1015 
1016  if (epoch_size != 0 &&
1017  atomic_read(&epoch->active) == 0 &&
1018  (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1019  if (!(ev & EV_CLEANUP)) {
1020  spin_unlock(&mdev->epoch_lock);
1021  drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1022  spin_lock(&mdev->epoch_lock);
1023  }
1024  if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1025  dec_unacked(mdev);
1026 
1027  if (mdev->current_epoch != epoch) {
1028  next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1029  list_del(&epoch->list);
1030  ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1031  mdev->epochs--;
1032  kfree(epoch);
1033 
1034  if (rv == FE_STILL_LIVE)
1035  rv = FE_DESTROYED;
1036  } else {
1037  epoch->flags = 0;
1038  atomic_set(&epoch->epoch_size, 0);
1039  /* atomic_set(&epoch->active, 0); is already zero */
1040  if (rv == FE_STILL_LIVE)
1041  rv = FE_RECYCLED;
1042  wake_up(&mdev->ee_wait);
1043  }
1044  }
1045 
1046  if (!next_epoch)
1047  break;
1048 
1049  epoch = next_epoch;
1050  } while (1);
1051 
1052  spin_unlock(&mdev->epoch_lock);
1053 
1054  return rv;
1055 }
1056 
1063 {
1064  enum write_ordering_e pwo;
1065  static char *write_ordering_str[] = {
1066  [WO_none] = "none",
1067  [WO_drain_io] = "drain",
1068  [WO_bdev_flush] = "flush",
1069  };
1070 
1071  pwo = mdev->write_ordering;
1072  wo = min(pwo, wo);
1073  if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1074  wo = WO_drain_io;
1075  if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1076  wo = WO_none;
1077  mdev->write_ordering = wo;
1078  if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1079  dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1080 }
1081 
1098 /* TODO allocate from our own bio_set. */
1099 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1100  const unsigned rw, const int fault_type)
1101 {
1102  struct bio *bios = NULL;
1103  struct bio *bio;
1104  struct page *page = e->pages;
1105  sector_t sector = e->sector;
1106  unsigned ds = e->size;
1107  unsigned n_bios = 0;
1108  unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1109  int err = -ENOMEM;
1110 
1111  /* In most cases, we will only need one bio. But in case the lower
1112  * level restrictions happen to be different at this offset on this
1113  * side than those of the sending peer, we may need to submit the
1114  * request in more than one bio.
1115  *
1116  * Plain bio_alloc is good enough here, this is no DRBD internally
1117  * generated bio, but a bio allocated on behalf of the peer.
1118  */
1119 next_bio:
1120  bio = bio_alloc(GFP_NOIO, nr_pages);
1121  if (!bio) {
1122  dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1123  goto fail;
1124  }
1125  /* > e->sector, unless this is the first bio */
1126  bio->bi_sector = sector;
1127  bio->bi_bdev = mdev->ldev->backing_bdev;
1128  bio->bi_rw = rw;
1129  bio->bi_private = e;
1130  bio->bi_end_io = drbd_endio_sec;
1131 
1132  bio->bi_next = bios;
1133  bios = bio;
1134  ++n_bios;
1135 
1136  page_chain_for_each(page) {
1137  unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1138  if (!bio_add_page(bio, page, len, 0)) {
1139  /* A single page must always be possible!
1140  * But in case it fails anyways,
1141  * we deal with it, and complain (below). */
1142  if (bio->bi_vcnt == 0) {
1143  dev_err(DEV,
1144  "bio_add_page failed for len=%u, "
1145  "bi_vcnt=0 (bi_sector=%llu)\n",
1146  len, (unsigned long long)bio->bi_sector);
1147  err = -ENOSPC;
1148  goto fail;
1149  }
1150  goto next_bio;
1151  }
1152  ds -= len;
1153  sector += len >> 9;
1154  --nr_pages;
1155  }
1156  D_ASSERT(page == NULL);
1157  D_ASSERT(ds == 0);
1158 
1159  atomic_set(&e->pending_bios, n_bios);
1160  do {
1161  bio = bios;
1162  bios = bios->bi_next;
1163  bio->bi_next = NULL;
1164 
1165  drbd_generic_make_request(mdev, fault_type, bio);
1166  } while (bios);
1167  return 0;
1168 
1169 fail:
1170  while (bios) {
1171  bio = bios;
1172  bios = bios->bi_next;
1173  bio_put(bio);
1174  }
1175  return err;
1176 }
1177 
1178 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1179 {
1180  int rv;
1181  struct p_barrier *p = &mdev->data.rbuf.barrier;
1182  struct drbd_epoch *epoch;
1183 
1184  inc_unacked(mdev);
1185 
1186  mdev->current_epoch->barrier_nr = p->barrier;
1187  rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1188 
1189  /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1190  * the activity log, which means it would not be resynced in case the
1191  * R_PRIMARY crashes now.
1192  * Therefore we must send the barrier_ack after the barrier request was
1193  * completed. */
1194  switch (mdev->write_ordering) {
1195  case WO_none:
1196  if (rv == FE_RECYCLED)
1197  return true;
1198 
1199  /* receiver context, in the writeout path of the other node.
1200  * avoid potential distributed deadlock */
1201  epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1202  if (epoch)
1203  break;
1204  else
1205  dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1206  /* Fall through */
1207 
1208  case WO_bdev_flush:
1209  case WO_drain_io:
1210  drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1211  drbd_flush(mdev);
1212 
1213  if (atomic_read(&mdev->current_epoch->epoch_size)) {
1214  epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1215  if (epoch)
1216  break;
1217  }
1218 
1219  epoch = mdev->current_epoch;
1220  wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1221 
1222  D_ASSERT(atomic_read(&epoch->active) == 0);
1223  D_ASSERT(epoch->flags == 0);
1224 
1225  return true;
1226  default:
1227  dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1228  return false;
1229  }
1230 
1231  epoch->flags = 0;
1232  atomic_set(&epoch->epoch_size, 0);
1233  atomic_set(&epoch->active, 0);
1234 
1235  spin_lock(&mdev->epoch_lock);
1236  if (atomic_read(&mdev->current_epoch->epoch_size)) {
1237  list_add(&epoch->list, &mdev->current_epoch->list);
1238  mdev->current_epoch = epoch;
1239  mdev->epochs++;
1240  } else {
1241  /* The current_epoch got recycled while we allocated this one... */
1242  kfree(epoch);
1243  }
1244  spin_unlock(&mdev->epoch_lock);
1245 
1246  return true;
1247 }
1248 
1249 /* used from receive_RSDataReply (recv_resync_read)
1250  * and from receive_Data */
1251 static struct drbd_epoch_entry *
1252 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1253 {
1254  const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1255  struct drbd_epoch_entry *e;
1256  struct page *page;
1257  int dgs, ds, rr;
1258  void *dig_in = mdev->int_dig_in;
1259  void *dig_vv = mdev->int_dig_vv;
1260  unsigned long *data;
1261 
1262  dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1263  crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1264 
1265  if (dgs) {
1266  rr = drbd_recv(mdev, dig_in, dgs);
1267  if (rr != dgs) {
1268  if (!signal_pending(current))
1269  dev_warn(DEV,
1270  "short read receiving data digest: read %d expected %d\n",
1271  rr, dgs);
1272  return NULL;
1273  }
1274  }
1275 
1276  data_size -= dgs;
1277 
1278  ERR_IF(data_size & 0x1ff) return NULL;
1279  ERR_IF(data_size > DRBD_MAX_BIO_SIZE) return NULL;
1280 
1281  /* even though we trust out peer,
1282  * we sometimes have to double check. */
1283  if (sector + (data_size>>9) > capacity) {
1284  dev_err(DEV, "request from peer beyond end of local disk: "
1285  "capacity: %llus < sector: %llus + size: %u\n",
1286  (unsigned long long)capacity,
1287  (unsigned long long)sector, data_size);
1288  return NULL;
1289  }
1290 
1291  /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1292  * "criss-cross" setup, that might cause write-out on some other DRBD,
1293  * which in turn might block on the other node at this very place. */
1294  e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1295  if (!e)
1296  return NULL;
1297 
1298  if (!data_size)
1299  return e;
1300 
1301  ds = data_size;
1302  page = e->pages;
1303  page_chain_for_each(page) {
1304  unsigned len = min_t(int, ds, PAGE_SIZE);
1305  data = kmap(page);
1306  rr = drbd_recv(mdev, data, len);
1307  if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1308  dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1309  data[0] = data[0] ^ (unsigned long)-1;
1310  }
1311  kunmap(page);
1312  if (rr != len) {
1313  drbd_free_ee(mdev, e);
1314  if (!signal_pending(current))
1315  dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1316  rr, len);
1317  return NULL;
1318  }
1319  ds -= rr;
1320  }
1321 
1322  if (dgs) {
1323  drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1324  if (memcmp(dig_in, dig_vv, dgs)) {
1325  dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1326  (unsigned long long)sector, data_size);
1327  drbd_bcast_ee(mdev, "digest failed",
1328  dgs, dig_in, dig_vv, e);
1329  drbd_free_ee(mdev, e);
1330  return NULL;
1331  }
1332  }
1333  mdev->recv_cnt += data_size>>9;
1334  return e;
1335 }
1336 
1337 /* drbd_drain_block() just takes a data block
1338  * out of the socket input buffer, and discards it.
1339  */
1340 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1341 {
1342  struct page *page;
1343  int rr, rv = 1;
1344  void *data;
1345 
1346  if (!data_size)
1347  return true;
1348 
1349  page = drbd_pp_alloc(mdev, 1, 1);
1350 
1351  data = kmap(page);
1352  while (data_size) {
1353  rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1354  if (rr != min_t(int, data_size, PAGE_SIZE)) {
1355  rv = 0;
1356  if (!signal_pending(current))
1357  dev_warn(DEV,
1358  "short read receiving data: read %d expected %d\n",
1359  rr, min_t(int, data_size, PAGE_SIZE));
1360  break;
1361  }
1362  data_size -= rr;
1363  }
1364  kunmap(page);
1365  drbd_pp_free(mdev, page, 0);
1366  return rv;
1367 }
1368 
1369 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1370  sector_t sector, int data_size)
1371 {
1372  struct bio_vec *bvec;
1373  struct bio *bio;
1374  int dgs, rr, i, expect;
1375  void *dig_in = mdev->int_dig_in;
1376  void *dig_vv = mdev->int_dig_vv;
1377 
1378  dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1379  crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1380 
1381  if (dgs) {
1382  rr = drbd_recv(mdev, dig_in, dgs);
1383  if (rr != dgs) {
1384  if (!signal_pending(current))
1385  dev_warn(DEV,
1386  "short read receiving data reply digest: read %d expected %d\n",
1387  rr, dgs);
1388  return 0;
1389  }
1390  }
1391 
1392  data_size -= dgs;
1393 
1394  /* optimistically update recv_cnt. if receiving fails below,
1395  * we disconnect anyways, and counters will be reset. */
1396  mdev->recv_cnt += data_size>>9;
1397 
1398  bio = req->master_bio;
1399  D_ASSERT(sector == bio->bi_sector);
1400 
1401  bio_for_each_segment(bvec, bio, i) {
1402  expect = min_t(int, data_size, bvec->bv_len);
1403  rr = drbd_recv(mdev,
1404  kmap(bvec->bv_page)+bvec->bv_offset,
1405  expect);
1406  kunmap(bvec->bv_page);
1407  if (rr != expect) {
1408  if (!signal_pending(current))
1409  dev_warn(DEV, "short read receiving data reply: "
1410  "read %d expected %d\n",
1411  rr, expect);
1412  return 0;
1413  }
1414  data_size -= rr;
1415  }
1416 
1417  if (dgs) {
1418  drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1419  if (memcmp(dig_in, dig_vv, dgs)) {
1420  dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1421  return 0;
1422  }
1423  }
1424 
1425  D_ASSERT(data_size == 0);
1426  return 1;
1427 }
1428 
1429 /* e_end_resync_block() is called via
1430  * drbd_process_done_ee() by asender only */
1431 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1432 {
1433  struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1434  sector_t sector = e->sector;
1435  int ok;
1436 
1437  D_ASSERT(hlist_unhashed(&e->collision));
1438 
1439  if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1440  drbd_set_in_sync(mdev, sector, e->size);
1441  ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1442  } else {
1443  /* Record failure to sync */
1444  drbd_rs_failed_io(mdev, sector, e->size);
1445 
1446  ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1447  }
1448  dec_unacked(mdev);
1449 
1450  return ok;
1451 }
1452 
1453 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1454 {
1455  struct drbd_epoch_entry *e;
1456 
1457  e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1458  if (!e)
1459  goto fail;
1460 
1461  dec_rs_pending(mdev);
1462 
1463  inc_unacked(mdev);
1464  /* corresponding dec_unacked() in e_end_resync_block()
1465  * respective _drbd_clear_done_ee */
1466 
1467  e->w.cb = e_end_resync_block;
1468 
1469  spin_lock_irq(&mdev->req_lock);
1470  list_add(&e->w.list, &mdev->sync_ee);
1471  spin_unlock_irq(&mdev->req_lock);
1472 
1473  atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1474  if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1475  return true;
1476 
1477  /* don't care for the reason here */
1478  dev_err(DEV, "submit failed, triggering re-connect\n");
1479  spin_lock_irq(&mdev->req_lock);
1480  list_del(&e->w.list);
1481  spin_unlock_irq(&mdev->req_lock);
1482 
1483  drbd_free_ee(mdev, e);
1484 fail:
1485  put_ldev(mdev);
1486  return false;
1487 }
1488 
1489 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1490 {
1491  struct drbd_request *req;
1492  sector_t sector;
1493  int ok;
1494  struct p_data *p = &mdev->data.rbuf.data;
1495 
1496  sector = be64_to_cpu(p->sector);
1497 
1498  spin_lock_irq(&mdev->req_lock);
1499  req = _ar_id_to_req(mdev, p->block_id, sector);
1500  spin_unlock_irq(&mdev->req_lock);
1501  if (unlikely(!req)) {
1502  dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1503  return false;
1504  }
1505 
1506  /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1507  * special casing it there for the various failure cases.
1508  * still no race with drbd_fail_pending_reads */
1509  ok = recv_dless_read(mdev, req, sector, data_size);
1510 
1511  if (ok)
1512  req_mod(req, data_received);
1513  /* else: nothing. handled from drbd_disconnect...
1514  * I don't think we may complete this just yet
1515  * in case we are "on-disconnect: freeze" */
1516 
1517  return ok;
1518 }
1519 
1520 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1521 {
1522  sector_t sector;
1523  int ok;
1524  struct p_data *p = &mdev->data.rbuf.data;
1525 
1526  sector = be64_to_cpu(p->sector);
1527  D_ASSERT(p->block_id == ID_SYNCER);
1528 
1529  if (get_ldev(mdev)) {
1530  /* data is submitted to disk within recv_resync_read.
1531  * corresponding put_ldev done below on error,
1532  * or in drbd_endio_write_sec. */
1533  ok = recv_resync_read(mdev, sector, data_size);
1534  } else {
1535  if (__ratelimit(&drbd_ratelimit_state))
1536  dev_err(DEV, "Can not write resync data to local disk.\n");
1537 
1538  ok = drbd_drain_block(mdev, data_size);
1539 
1540  drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1541  }
1542 
1543  atomic_add(data_size >> 9, &mdev->rs_sect_in);
1544 
1545  return ok;
1546 }
1547 
1548 /* e_end_block() is called via drbd_process_done_ee().
1549  * this means this function only runs in the asender thread
1550  */
1551 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1552 {
1553  struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1554  sector_t sector = e->sector;
1555  int ok = 1, pcmd;
1556 
1557  if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1558  if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1559  pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1560  mdev->state.conn <= C_PAUSED_SYNC_T &&
1561  e->flags & EE_MAY_SET_IN_SYNC) ?
1563  ok &= drbd_send_ack(mdev, pcmd, e);
1564  if (pcmd == P_RS_WRITE_ACK)
1565  drbd_set_in_sync(mdev, sector, e->size);
1566  } else {
1567  ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1568  /* we expect it to be marked out of sync anyways...
1569  * maybe assert this? */
1570  }
1571  dec_unacked(mdev);
1572  }
1573  /* we delete from the conflict detection hash _after_ we sent out the
1574  * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1575  if (mdev->net_conf->two_primaries) {
1576  spin_lock_irq(&mdev->req_lock);
1577  D_ASSERT(!hlist_unhashed(&e->collision));
1578  hlist_del_init(&e->collision);
1579  spin_unlock_irq(&mdev->req_lock);
1580  } else {
1581  D_ASSERT(hlist_unhashed(&e->collision));
1582  }
1583 
1584  drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1585 
1586  return ok;
1587 }
1588 
1589 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1590 {
1591  struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1592  int ok = 1;
1593 
1594  D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1595  ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1596 
1597  spin_lock_irq(&mdev->req_lock);
1598  D_ASSERT(!hlist_unhashed(&e->collision));
1599  hlist_del_init(&e->collision);
1600  spin_unlock_irq(&mdev->req_lock);
1601 
1602  dec_unacked(mdev);
1603 
1604  return ok;
1605 }
1606 
1607 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_epoch_entry *data_e)
1608 {
1609 
1610  struct drbd_epoch_entry *rs_e;
1611  bool rv = 0;
1612 
1613  spin_lock_irq(&mdev->req_lock);
1614  list_for_each_entry(rs_e, &mdev->sync_ee, w.list) {
1615  if (overlaps(data_e->sector, data_e->size, rs_e->sector, rs_e->size)) {
1616  rv = 1;
1617  break;
1618  }
1619  }
1620  spin_unlock_irq(&mdev->req_lock);
1621 
1622  return rv;
1623 }
1624 
1625 /* Called from receive_Data.
1626  * Synchronize packets on sock with packets on msock.
1627  *
1628  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1629  * packet traveling on msock, they are still processed in the order they have
1630  * been sent.
1631  *
1632  * Note: we don't care for Ack packets overtaking P_DATA packets.
1633  *
1634  * In case packet_seq is larger than mdev->peer_seq number, there are
1635  * outstanding packets on the msock. We wait for them to arrive.
1636  * In case we are the logically next packet, we update mdev->peer_seq
1637  * ourselves. Correctly handles 32bit wrap around.
1638  *
1639  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1640  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1641  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1642  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1643  *
1644  * returns 0 if we may process the packet,
1645  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1646 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1647 {
1648  DEFINE_WAIT(wait);
1649  unsigned int p_seq;
1650  long timeout;
1651  int ret = 0;
1652  spin_lock(&mdev->peer_seq_lock);
1653  for (;;) {
1655  if (seq_le(packet_seq, mdev->peer_seq+1))
1656  break;
1657  if (signal_pending(current)) {
1658  ret = -ERESTARTSYS;
1659  break;
1660  }
1661  p_seq = mdev->peer_seq;
1662  spin_unlock(&mdev->peer_seq_lock);
1663  timeout = schedule_timeout(30*HZ);
1664  spin_lock(&mdev->peer_seq_lock);
1665  if (timeout == 0 && p_seq == mdev->peer_seq) {
1666  ret = -ETIMEDOUT;
1667  dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1668  break;
1669  }
1670  }
1671  finish_wait(&mdev->seq_wait, &wait);
1672  if (mdev->peer_seq+1 == packet_seq)
1673  mdev->peer_seq++;
1674  spin_unlock(&mdev->peer_seq_lock);
1675  return ret;
1676 }
1677 
1678 /* see also bio_flags_to_wire()
1679  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1680  * flags and back. We may replicate to other kernel versions. */
1681 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1682 {
1683  return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1684  (dpf & DP_FUA ? REQ_FUA : 0) |
1685  (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1686  (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1687 }
1688 
1689 /* mirrored write */
1690 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1691 {
1692  sector_t sector;
1693  struct drbd_epoch_entry *e;
1694  struct p_data *p = &mdev->data.rbuf.data;
1695  int rw = WRITE;
1696  u32 dp_flags;
1697 
1698  if (!get_ldev(mdev)) {
1699  spin_lock(&mdev->peer_seq_lock);
1700  if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1701  mdev->peer_seq++;
1702  spin_unlock(&mdev->peer_seq_lock);
1703 
1704  drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1705  atomic_inc(&mdev->current_epoch->epoch_size);
1706  return drbd_drain_block(mdev, data_size);
1707  }
1708 
1709  /* get_ldev(mdev) successful.
1710  * Corresponding put_ldev done either below (on various errors),
1711  * or in drbd_endio_write_sec, if we successfully submit the data at
1712  * the end of this function. */
1713 
1714  sector = be64_to_cpu(p->sector);
1715  e = read_in_block(mdev, p->block_id, sector, data_size);
1716  if (!e) {
1717  put_ldev(mdev);
1718  return false;
1719  }
1720 
1721  e->w.cb = e_end_block;
1722 
1723  dp_flags = be32_to_cpu(p->dp_flags);
1724  rw |= wire_flags_to_bio(mdev, dp_flags);
1725  if (e->pages == NULL) {
1726  D_ASSERT(e->size == 0);
1727  D_ASSERT(dp_flags & DP_FLUSH);
1728  }
1729 
1730  if (dp_flags & DP_MAY_SET_IN_SYNC)
1731  e->flags |= EE_MAY_SET_IN_SYNC;
1732 
1733  spin_lock(&mdev->epoch_lock);
1734  e->epoch = mdev->current_epoch;
1735  atomic_inc(&e->epoch->epoch_size);
1736  atomic_inc(&e->epoch->active);
1737  spin_unlock(&mdev->epoch_lock);
1738 
1739  /* I'm the receiver, I do hold a net_cnt reference. */
1740  if (!mdev->net_conf->two_primaries) {
1741  spin_lock_irq(&mdev->req_lock);
1742  } else {
1743  /* don't get the req_lock yet,
1744  * we may sleep in drbd_wait_peer_seq */
1745  const int size = e->size;
1746  const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1747  DEFINE_WAIT(wait);
1748  struct drbd_request *i;
1749  struct hlist_node *n;
1750  struct hlist_head *slot;
1751  int first;
1752 
1753  D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1754  BUG_ON(mdev->ee_hash == NULL);
1755  BUG_ON(mdev->tl_hash == NULL);
1756 
1757  /* conflict detection and handling:
1758  * 1. wait on the sequence number,
1759  * in case this data packet overtook ACK packets.
1760  * 2. check our hash tables for conflicting requests.
1761  * we only need to walk the tl_hash, since an ee can not
1762  * have a conflict with an other ee: on the submitting
1763  * node, the corresponding req had already been conflicting,
1764  * and a conflicting req is never sent.
1765  *
1766  * Note: for two_primaries, we are protocol C,
1767  * so there cannot be any request that is DONE
1768  * but still on the transfer log.
1769  *
1770  * unconditionally add to the ee_hash.
1771  *
1772  * if no conflicting request is found:
1773  * submit.
1774  *
1775  * if any conflicting request is found
1776  * that has not yet been acked,
1777  * AND I have the "discard concurrent writes" flag:
1778  * queue (via done_ee) the P_DISCARD_ACK; OUT.
1779  *
1780  * if any conflicting request is found:
1781  * block the receiver, waiting on misc_wait
1782  * until no more conflicting requests are there,
1783  * or we get interrupted (disconnect).
1784  *
1785  * we do not just write after local io completion of those
1786  * requests, but only after req is done completely, i.e.
1787  * we wait for the P_DISCARD_ACK to arrive!
1788  *
1789  * then proceed normally, i.e. submit.
1790  */
1791  if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1792  goto out_interrupted;
1793 
1794  spin_lock_irq(&mdev->req_lock);
1795 
1796  hlist_add_head(&e->collision, ee_hash_slot(mdev, sector));
1797 
1798 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1799  slot = tl_hash_slot(mdev, sector);
1800  first = 1;
1801  for (;;) {
1802  int have_unacked = 0;
1803  int have_conflict = 0;
1804  prepare_to_wait(&mdev->misc_wait, &wait,
1806  hlist_for_each_entry(i, n, slot, collision) {
1807  if (OVERLAPS) {
1808  /* only ALERT on first iteration,
1809  * we may be woken up early... */
1810  if (first)
1811  dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1812  " new: %llus +%u; pending: %llus +%u\n",
1813  current->comm, current->pid,
1814  (unsigned long long)sector, size,
1815  (unsigned long long)i->sector, i->size);
1816  if (i->rq_state & RQ_NET_PENDING)
1817  ++have_unacked;
1818  ++have_conflict;
1819  }
1820  }
1821 #undef OVERLAPS
1822  if (!have_conflict)
1823  break;
1824 
1825  /* Discard Ack only for the _first_ iteration */
1826  if (first && discard && have_unacked) {
1827  dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1828  (unsigned long long)sector);
1829  inc_unacked(mdev);
1830  e->w.cb = e_send_discard_ack;
1831  list_add_tail(&e->w.list, &mdev->done_ee);
1832 
1833  spin_unlock_irq(&mdev->req_lock);
1834 
1835  /* we could probably send that P_DISCARD_ACK ourselves,
1836  * but I don't like the receiver using the msock */
1837 
1838  put_ldev(mdev);
1839  wake_asender(mdev);
1840  finish_wait(&mdev->misc_wait, &wait);
1841  return true;
1842  }
1843 
1844  if (signal_pending(current)) {
1845  hlist_del_init(&e->collision);
1846 
1847  spin_unlock_irq(&mdev->req_lock);
1848 
1849  finish_wait(&mdev->misc_wait, &wait);
1850  goto out_interrupted;
1851  }
1852 
1853  spin_unlock_irq(&mdev->req_lock);
1854  if (first) {
1855  first = 0;
1856  dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1857  "sec=%llus\n", (unsigned long long)sector);
1858  } else if (discard) {
1859  /* we had none on the first iteration.
1860  * there must be none now. */
1861  D_ASSERT(have_unacked == 0);
1862  }
1863  schedule();
1864  spin_lock_irq(&mdev->req_lock);
1865  }
1866  finish_wait(&mdev->misc_wait, &wait);
1867  }
1868 
1869  list_add(&e->w.list, &mdev->active_ee);
1870  spin_unlock_irq(&mdev->req_lock);
1871 
1872  if (mdev->state.conn == C_SYNC_TARGET)
1873  wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, e));
1874 
1875  switch (mdev->net_conf->wire_protocol) {
1876  case DRBD_PROT_C:
1877  inc_unacked(mdev);
1878  /* corresponding dec_unacked() in e_end_block()
1879  * respective _drbd_clear_done_ee */
1880  break;
1881  case DRBD_PROT_B:
1882  /* I really don't like it that the receiver thread
1883  * sends on the msock, but anyways */
1884  drbd_send_ack(mdev, P_RECV_ACK, e);
1885  break;
1886  case DRBD_PROT_A:
1887  /* nothing to do */
1888  break;
1889  }
1890 
1891  if (mdev->state.pdsk < D_INCONSISTENT) {
1892  /* In case we have the only disk of the cluster, */
1893  drbd_set_out_of_sync(mdev, e->sector, e->size);
1895  e->flags &= ~EE_MAY_SET_IN_SYNC;
1896  drbd_al_begin_io(mdev, e->sector);
1897  }
1898 
1899  if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1900  return true;
1901 
1902  /* don't care for the reason here */
1903  dev_err(DEV, "submit failed, triggering re-connect\n");
1904  spin_lock_irq(&mdev->req_lock);
1905  list_del(&e->w.list);
1906  hlist_del_init(&e->collision);
1907  spin_unlock_irq(&mdev->req_lock);
1908  if (e->flags & EE_CALL_AL_COMPLETE_IO)
1909  drbd_al_complete_io(mdev, e->sector);
1910 
1911 out_interrupted:
1912  drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1913  put_ldev(mdev);
1914  drbd_free_ee(mdev, e);
1915  return false;
1916 }
1917 
1918 /* We may throttle resync, if the lower device seems to be busy,
1919  * and current sync rate is above c_min_rate.
1920  *
1921  * To decide whether or not the lower device is busy, we use a scheme similar
1922  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1923  * (more than 64 sectors) of activity we cannot account for with our own resync
1924  * activity, it obviously is "busy".
1925  *
1926  * The current sync rate used here uses only the most recent two step marks,
1927  * to have a short time average so we can react faster.
1928  */
1930 {
1931  struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1932  unsigned long db, dt, dbdt;
1933  struct lc_element *tmp;
1934  int curr_events;
1935  int throttle = 0;
1936 
1937  /* feature disabled? */
1938  if (mdev->sync_conf.c_min_rate == 0)
1939  return 0;
1940 
1941  spin_lock_irq(&mdev->al_lock);
1942  tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1943  if (tmp) {
1944  struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1945  if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1946  spin_unlock_irq(&mdev->al_lock);
1947  return 0;
1948  }
1949  /* Do not slow down if app IO is already waiting for this extent */
1950  }
1951  spin_unlock_irq(&mdev->al_lock);
1952 
1953  curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1954  (int)part_stat_read(&disk->part0, sectors[1]) -
1955  atomic_read(&mdev->rs_sect_ev);
1956 
1957  if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1958  unsigned long rs_left;
1959  int i;
1960 
1961  mdev->rs_last_events = curr_events;
1962 
1963  /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1964  * approx. */
1965  i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1966 
1967  if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1968  rs_left = mdev->ov_left;
1969  else
1970  rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1971 
1972  dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1973  if (!dt)
1974  dt++;
1975  db = mdev->rs_mark_left[i] - rs_left;
1976  dbdt = Bit2KB(db/dt);
1977 
1978  if (dbdt > mdev->sync_conf.c_min_rate)
1979  throttle = 1;
1980  }
1981  return throttle;
1982 }
1983 
1984 
1985 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1986 {
1987  sector_t sector;
1988  const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1989  struct drbd_epoch_entry *e;
1990  struct digest_info *di = NULL;
1991  int size, verb;
1992  unsigned int fault_type;
1993  struct p_block_req *p = &mdev->data.rbuf.block_req;
1994 
1995  sector = be64_to_cpu(p->sector);
1996  size = be32_to_cpu(p->blksize);
1997 
1998  if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1999  dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2000  (unsigned long long)sector, size);
2001  return false;
2002  }
2003  if (sector + (size>>9) > capacity) {
2004  dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2005  (unsigned long long)sector, size);
2006  return false;
2007  }
2008 
2009  if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2010  verb = 1;
2011  switch (cmd) {
2012  case P_DATA_REQUEST:
2013  drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2014  break;
2015  case P_RS_DATA_REQUEST:
2016  case P_CSUM_RS_REQUEST:
2017  case P_OV_REQUEST:
2018  drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2019  break;
2020  case P_OV_REPLY:
2021  verb = 0;
2022  dec_rs_pending(mdev);
2023  drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2024  break;
2025  default:
2026  dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2027  cmdname(cmd));
2028  }
2029  if (verb && __ratelimit(&drbd_ratelimit_state))
2030  dev_err(DEV, "Can not satisfy peer's read request, "
2031  "no local data.\n");
2032 
2033  /* drain possibly payload */
2034  return drbd_drain_block(mdev, digest_size);
2035  }
2036 
2037  /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2038  * "criss-cross" setup, that might cause write-out on some other DRBD,
2039  * which in turn might block on the other node at this very place. */
2040  e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2041  if (!e) {
2042  put_ldev(mdev);
2043  return false;
2044  }
2045 
2046  switch (cmd) {
2047  case P_DATA_REQUEST:
2048  e->w.cb = w_e_end_data_req;
2049  fault_type = DRBD_FAULT_DT_RD;
2050  /* application IO, don't drbd_rs_begin_io */
2051  goto submit;
2052 
2053  case P_RS_DATA_REQUEST:
2054  e->w.cb = w_e_end_rsdata_req;
2055  fault_type = DRBD_FAULT_RS_RD;
2056  /* used in the sector offset progress display */
2057  mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2058  break;
2059 
2060  case P_OV_REPLY:
2061  case P_CSUM_RS_REQUEST:
2062  fault_type = DRBD_FAULT_RS_RD;
2063  di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2064  if (!di)
2065  goto out_free_e;
2066 
2067  di->digest_size = digest_size;
2068  di->digest = (((char *)di)+sizeof(struct digest_info));
2069 
2070  e->digest = di;
2071  e->flags |= EE_HAS_DIGEST;
2072 
2073  if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2074  goto out_free_e;
2075 
2076  if (cmd == P_CSUM_RS_REQUEST) {
2077  D_ASSERT(mdev->agreed_pro_version >= 89);
2078  e->w.cb = w_e_end_csum_rs_req;
2079  /* used in the sector offset progress display */
2080  mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2081  } else if (cmd == P_OV_REPLY) {
2082  /* track progress, we may need to throttle */
2083  atomic_add(size >> 9, &mdev->rs_sect_in);
2084  e->w.cb = w_e_end_ov_reply;
2085  dec_rs_pending(mdev);
2086  /* drbd_rs_begin_io done when we sent this request,
2087  * but accounting still needs to be done. */
2088  goto submit_for_resync;
2089  }
2090  break;
2091 
2092  case P_OV_REQUEST:
2093  if (mdev->ov_start_sector == ~(sector_t)0 &&
2094  mdev->agreed_pro_version >= 90) {
2095  unsigned long now = jiffies;
2096  int i;
2097  mdev->ov_start_sector = sector;
2098  mdev->ov_position = sector;
2099  mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2100  mdev->rs_total = mdev->ov_left;
2101  for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2102  mdev->rs_mark_left[i] = mdev->ov_left;
2103  mdev->rs_mark_time[i] = now;
2104  }
2105  dev_info(DEV, "Online Verify start sector: %llu\n",
2106  (unsigned long long)sector);
2107  }
2108  e->w.cb = w_e_end_ov_req;
2109  fault_type = DRBD_FAULT_RS_RD;
2110  break;
2111 
2112  default:
2113  dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2114  cmdname(cmd));
2115  fault_type = DRBD_FAULT_MAX;
2116  goto out_free_e;
2117  }
2118 
2119  /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2120  * wrt the receiver, but it is not as straightforward as it may seem.
2121  * Various places in the resync start and stop logic assume resync
2122  * requests are processed in order, requeuing this on the worker thread
2123  * introduces a bunch of new code for synchronization between threads.
2124  *
2125  * Unlimited throttling before drbd_rs_begin_io may stall the resync
2126  * "forever", throttling after drbd_rs_begin_io will lock that extent
2127  * for application writes for the same time. For now, just throttle
2128  * here, where the rest of the code expects the receiver to sleep for
2129  * a while, anyways.
2130  */
2131 
2132  /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2133  * this defers syncer requests for some time, before letting at least
2134  * on request through. The resync controller on the receiving side
2135  * will adapt to the incoming rate accordingly.
2136  *
2137  * We cannot throttle here if remote is Primary/SyncTarget:
2138  * we would also throttle its application reads.
2139  * In that case, throttling is done on the SyncTarget only.
2140  */
2141  if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2143  if (drbd_rs_begin_io(mdev, sector))
2144  goto out_free_e;
2145 
2146 submit_for_resync:
2147  atomic_add(size >> 9, &mdev->rs_sect_ev);
2148 
2149 submit:
2150  inc_unacked(mdev);
2151  spin_lock_irq(&mdev->req_lock);
2152  list_add_tail(&e->w.list, &mdev->read_ee);
2153  spin_unlock_irq(&mdev->req_lock);
2154 
2155  if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2156  return true;
2157 
2158  /* don't care for the reason here */
2159  dev_err(DEV, "submit failed, triggering re-connect\n");
2160  spin_lock_irq(&mdev->req_lock);
2161  list_del(&e->w.list);
2162  spin_unlock_irq(&mdev->req_lock);
2163  /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2164 
2165 out_free_e:
2166  put_ldev(mdev);
2167  drbd_free_ee(mdev, e);
2168  return false;
2169 }
2170 
2171 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2172 {
2173  int self, peer, rv = -100;
2174  unsigned long ch_self, ch_peer;
2175 
2176  self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2177  peer = mdev->p_uuid[UI_BITMAP] & 1;
2178 
2179  ch_peer = mdev->p_uuid[UI_SIZE];
2180  ch_self = mdev->comm_bm_set;
2181 
2182  switch (mdev->net_conf->after_sb_0p) {
2183  case ASB_CONSENSUS:
2184  case ASB_DISCARD_SECONDARY:
2185  case ASB_CALL_HELPER:
2186  dev_err(DEV, "Configuration error.\n");
2187  break;
2188  case ASB_DISCONNECT:
2189  break;
2191  if (self == 0 && peer == 1) {
2192  rv = -1;
2193  break;
2194  }
2195  if (self == 1 && peer == 0) {
2196  rv = 1;
2197  break;
2198  }
2199  /* Else fall through to one of the other strategies... */
2200  case ASB_DISCARD_OLDER_PRI:
2201  if (self == 0 && peer == 1) {
2202  rv = 1;
2203  break;
2204  }
2205  if (self == 1 && peer == 0) {
2206  rv = -1;
2207  break;
2208  }
2209  /* Else fall through to one of the other strategies... */
2210  dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2211  "Using discard-least-changes instead\n");
2212  case ASB_DISCARD_ZERO_CHG:
2213  if (ch_peer == 0 && ch_self == 0) {
2214  rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2215  ? -1 : 1;
2216  break;
2217  } else {
2218  if (ch_peer == 0) { rv = 1; break; }
2219  if (ch_self == 0) { rv = -1; break; }
2220  }
2221  if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2222  break;
2223  case ASB_DISCARD_LEAST_CHG:
2224  if (ch_self < ch_peer)
2225  rv = -1;
2226  else if (ch_self > ch_peer)
2227  rv = 1;
2228  else /* ( ch_self == ch_peer ) */
2229  /* Well, then use something else. */
2230  rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2231  ? -1 : 1;
2232  break;
2233  case ASB_DISCARD_LOCAL:
2234  rv = -1;
2235  break;
2236  case ASB_DISCARD_REMOTE:
2237  rv = 1;
2238  }
2239 
2240  return rv;
2241 }
2242 
2243 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2244 {
2245  int hg, rv = -100;
2246 
2247  switch (mdev->net_conf->after_sb_1p) {
2249  case ASB_DISCARD_OLDER_PRI:
2250  case ASB_DISCARD_LEAST_CHG:
2251  case ASB_DISCARD_LOCAL:
2252  case ASB_DISCARD_REMOTE:
2253  dev_err(DEV, "Configuration error.\n");
2254  break;
2255  case ASB_DISCONNECT:
2256  break;
2257  case ASB_CONSENSUS:
2258  hg = drbd_asb_recover_0p(mdev);
2259  if (hg == -1 && mdev->state.role == R_SECONDARY)
2260  rv = hg;
2261  if (hg == 1 && mdev->state.role == R_PRIMARY)
2262  rv = hg;
2263  break;
2264  case ASB_VIOLENTLY:
2265  rv = drbd_asb_recover_0p(mdev);
2266  break;
2267  case ASB_DISCARD_SECONDARY:
2268  return mdev->state.role == R_PRIMARY ? 1 : -1;
2269  case ASB_CALL_HELPER:
2270  hg = drbd_asb_recover_0p(mdev);
2271  if (hg == -1 && mdev->state.role == R_PRIMARY) {
2272  enum drbd_state_rv rv2;
2273 
2274  drbd_set_role(mdev, R_SECONDARY, 0);
2275  /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2276  * we might be here in C_WF_REPORT_PARAMS which is transient.
2277  * we do not need to wait for the after state change work either. */
2279  if (rv2 != SS_SUCCESS) {
2280  drbd_khelper(mdev, "pri-lost-after-sb");
2281  } else {
2282  dev_warn(DEV, "Successfully gave up primary role.\n");
2283  rv = hg;
2284  }
2285  } else
2286  rv = hg;
2287  }
2288 
2289  return rv;
2290 }
2291 
2292 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2293 {
2294  int hg, rv = -100;
2295 
2296  switch (mdev->net_conf->after_sb_2p) {
2298  case ASB_DISCARD_OLDER_PRI:
2299  case ASB_DISCARD_LEAST_CHG:
2300  case ASB_DISCARD_LOCAL:
2301  case ASB_DISCARD_REMOTE:
2302  case ASB_CONSENSUS:
2303  case ASB_DISCARD_SECONDARY:
2304  dev_err(DEV, "Configuration error.\n");
2305  break;
2306  case ASB_VIOLENTLY:
2307  rv = drbd_asb_recover_0p(mdev);
2308  break;
2309  case ASB_DISCONNECT:
2310  break;
2311  case ASB_CALL_HELPER:
2312  hg = drbd_asb_recover_0p(mdev);
2313  if (hg == -1) {
2314  enum drbd_state_rv rv2;
2315 
2316  /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2317  * we might be here in C_WF_REPORT_PARAMS which is transient.
2318  * we do not need to wait for the after state change work either. */
2320  if (rv2 != SS_SUCCESS) {
2321  drbd_khelper(mdev, "pri-lost-after-sb");
2322  } else {
2323  dev_warn(DEV, "Successfully gave up primary role.\n");
2324  rv = hg;
2325  }
2326  } else
2327  rv = hg;
2328  }
2329 
2330  return rv;
2331 }
2332 
2333 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2334  u64 bits, u64 flags)
2335 {
2336  if (!uuid) {
2337  dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2338  return;
2339  }
2340  dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2341  text,
2342  (unsigned long long)uuid[UI_CURRENT],
2343  (unsigned long long)uuid[UI_BITMAP],
2344  (unsigned long long)uuid[UI_HISTORY_START],
2345  (unsigned long long)uuid[UI_HISTORY_END],
2346  (unsigned long long)bits,
2347  (unsigned long long)flags);
2348 }
2349 
2350 /*
2351  100 after split brain try auto recover
2352  2 C_SYNC_SOURCE set BitMap
2353  1 C_SYNC_SOURCE use BitMap
2354  0 no Sync
2355  -1 C_SYNC_TARGET use BitMap
2356  -2 C_SYNC_TARGET set BitMap
2357  -100 after split brain, disconnect
2358 -1000 unrelated data
2359 -1091 requires proto 91
2360 -1096 requires proto 96
2361  */
2362 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2363 {
2364  u64 self, peer;
2365  int i, j;
2366 
2367  self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2368  peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2369 
2370  *rule_nr = 10;
2371  if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2372  return 0;
2373 
2374  *rule_nr = 20;
2375  if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2376  peer != UUID_JUST_CREATED)
2377  return -2;
2378 
2379  *rule_nr = 30;
2380  if (self != UUID_JUST_CREATED &&
2381  (peer == UUID_JUST_CREATED || peer == (u64)0))
2382  return 2;
2383 
2384  if (self == peer) {
2385  int rct, dc; /* roles at crash time */
2386 
2387  if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2388 
2389  if (mdev->agreed_pro_version < 91)
2390  return -1091;
2391 
2392  if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2393  (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2394  dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2395  drbd_uuid_set_bm(mdev, 0UL);
2396 
2397  drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2398  mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2399  *rule_nr = 34;
2400  } else {
2401  dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2402  *rule_nr = 36;
2403  }
2404 
2405  return 1;
2406  }
2407 
2408  if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2409 
2410  if (mdev->agreed_pro_version < 91)
2411  return -1091;
2412 
2413  if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2414  (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2415  dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2416 
2417  mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2418  mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2419  mdev->p_uuid[UI_BITMAP] = 0UL;
2420 
2421  drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2422  *rule_nr = 35;
2423  } else {
2424  dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2425  *rule_nr = 37;
2426  }
2427 
2428  return -1;
2429  }
2430 
2431  /* Common power [off|failure] */
2432  rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2433  (mdev->p_uuid[UI_FLAGS] & 2);
2434  /* lowest bit is set when we were primary,
2435  * next bit (weight 2) is set when peer was primary */
2436  *rule_nr = 40;
2437 
2438  switch (rct) {
2439  case 0: /* !self_pri && !peer_pri */ return 0;
2440  case 1: /* self_pri && !peer_pri */ return 1;
2441  case 2: /* !self_pri && peer_pri */ return -1;
2442  case 3: /* self_pri && peer_pri */
2443  dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2444  return dc ? -1 : 1;
2445  }
2446  }
2447 
2448  *rule_nr = 50;
2449  peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2450  if (self == peer)
2451  return -1;
2452 
2453  *rule_nr = 51;
2454  peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2455  if (self == peer) {
2456  if (mdev->agreed_pro_version < 96 ?
2457  (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2458  (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2459  peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2460  /* The last P_SYNC_UUID did not get though. Undo the last start of
2461  resync as sync source modifications of the peer's UUIDs. */
2462 
2463  if (mdev->agreed_pro_version < 91)
2464  return -1091;
2465 
2466  mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2467  mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2468 
2469  dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2470  drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2471 
2472  return -1;
2473  }
2474  }
2475 
2476  *rule_nr = 60;
2477  self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2478  for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2479  peer = mdev->p_uuid[i] & ~((u64)1);
2480  if (self == peer)
2481  return -2;
2482  }
2483 
2484  *rule_nr = 70;
2485  self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2486  peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2487  if (self == peer)
2488  return 1;
2489 
2490  *rule_nr = 71;
2491  self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2492  if (self == peer) {
2493  if (mdev->agreed_pro_version < 96 ?
2494  (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2495  (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2496  self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2497  /* The last P_SYNC_UUID did not get though. Undo the last start of
2498  resync as sync source modifications of our UUIDs. */
2499 
2500  if (mdev->agreed_pro_version < 91)
2501  return -1091;
2502 
2503  _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2504  _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2505 
2506  dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2507  drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2508  mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2509 
2510  return 1;
2511  }
2512  }
2513 
2514 
2515  *rule_nr = 80;
2516  peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2517  for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2518  self = mdev->ldev->md.uuid[i] & ~((u64)1);
2519  if (self == peer)
2520  return 2;
2521  }
2522 
2523  *rule_nr = 90;
2524  self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2525  peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2526  if (self == peer && self != ((u64)0))
2527  return 100;
2528 
2529  *rule_nr = 100;
2530  for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2531  self = mdev->ldev->md.uuid[i] & ~((u64)1);
2532  for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2533  peer = mdev->p_uuid[j] & ~((u64)1);
2534  if (self == peer)
2535  return -100;
2536  }
2537  }
2538 
2539  return -1000;
2540 }
2541 
2542 /* drbd_sync_handshake() returns the new conn state on success, or
2543  CONN_MASK (-1) on failure.
2544  */
2545 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2546  enum drbd_disk_state peer_disk) __must_hold(local)
2547 {
2548  int hg, rule_nr;
2549  enum drbd_conns rv = C_MASK;
2550  enum drbd_disk_state mydisk;
2551 
2552  mydisk = mdev->state.disk;
2553  if (mydisk == D_NEGOTIATING)
2554  mydisk = mdev->new_state_tmp.disk;
2555 
2556  dev_info(DEV, "drbd_sync_handshake:\n");
2557  drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2558  drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2559  mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2560 
2561  hg = drbd_uuid_compare(mdev, &rule_nr);
2562 
2563  dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2564 
2565  if (hg == -1000) {
2566  dev_alert(DEV, "Unrelated data, aborting!\n");
2567  return C_MASK;
2568  }
2569  if (hg < -1000) {
2570  dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2571  return C_MASK;
2572  }
2573 
2574  if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2575  (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2576  int f = (hg == -100) || abs(hg) == 2;
2577  hg = mydisk > D_INCONSISTENT ? 1 : -1;
2578  if (f)
2579  hg = hg*2;
2580  dev_info(DEV, "Becoming sync %s due to disk states.\n",
2581  hg > 0 ? "source" : "target");
2582  }
2583 
2584  if (abs(hg) == 100)
2585  drbd_khelper(mdev, "initial-split-brain");
2586 
2587  if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2588  int pcount = (mdev->state.role == R_PRIMARY)
2589  + (peer_role == R_PRIMARY);
2590  int forced = (hg == -100);
2591 
2592  switch (pcount) {
2593  case 0:
2594  hg = drbd_asb_recover_0p(mdev);
2595  break;
2596  case 1:
2597  hg = drbd_asb_recover_1p(mdev);
2598  break;
2599  case 2:
2600  hg = drbd_asb_recover_2p(mdev);
2601  break;
2602  }
2603  if (abs(hg) < 100) {
2604  dev_warn(DEV, "Split-Brain detected, %d primaries, "
2605  "automatically solved. Sync from %s node\n",
2606  pcount, (hg < 0) ? "peer" : "this");
2607  if (forced) {
2608  dev_warn(DEV, "Doing a full sync, since"
2609  " UUIDs where ambiguous.\n");
2610  hg = hg*2;
2611  }
2612  }
2613  }
2614 
2615  if (hg == -100) {
2616  if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2617  hg = -1;
2618  if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2619  hg = 1;
2620 
2621  if (abs(hg) < 100)
2622  dev_warn(DEV, "Split-Brain detected, manually solved. "
2623  "Sync from %s node\n",
2624  (hg < 0) ? "peer" : "this");
2625  }
2626 
2627  if (hg == -100) {
2628  /* FIXME this log message is not correct if we end up here
2629  * after an attempted attach on a diskless node.
2630  * We just refuse to attach -- well, we drop the "connection"
2631  * to that disk, in a way... */
2632  dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2633  drbd_khelper(mdev, "split-brain");
2634  return C_MASK;
2635  }
2636 
2637  if (hg > 0 && mydisk <= D_INCONSISTENT) {
2638  dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2639  return C_MASK;
2640  }
2641 
2642  if (hg < 0 && /* by intention we do not use mydisk here. */
2643  mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2644  switch (mdev->net_conf->rr_conflict) {
2645  case ASB_CALL_HELPER:
2646  drbd_khelper(mdev, "pri-lost");
2647  /* fall through */
2648  case ASB_DISCONNECT:
2649  dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2650  return C_MASK;
2651  case ASB_VIOLENTLY:
2652  dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2653  "assumption\n");
2654  }
2655  }
2656 
2657  if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2658  if (hg == 0)
2659  dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2660  else
2661  dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2663  abs(hg) >= 2 ? "full" : "bit-map based");
2664  return C_MASK;
2665  }
2666 
2667  if (abs(hg) >= 2) {
2668  dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2669  if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2671  return C_MASK;
2672  }
2673 
2674  if (hg > 0) { /* become sync source. */
2675  rv = C_WF_BITMAP_S;
2676  } else if (hg < 0) { /* become sync target */
2677  rv = C_WF_BITMAP_T;
2678  } else {
2679  rv = C_CONNECTED;
2680  if (drbd_bm_total_weight(mdev)) {
2681  dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2682  drbd_bm_total_weight(mdev));
2683  }
2684  }
2685 
2686  return rv;
2687 }
2688 
2689 /* returns 1 if invalid */
2690 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2691 {
2692  /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2693  if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2694  (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2695  return 0;
2696 
2697  /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2698  if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2699  self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2700  return 1;
2701 
2702  /* everything else is valid if they are equal on both sides. */
2703  if (peer == self)
2704  return 0;
2705 
2706  /* everything es is invalid. */
2707  return 1;
2708 }
2709 
2710 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2711 {
2712  struct p_protocol *p = &mdev->data.rbuf.protocol;
2713  int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2714  int p_want_lose, p_two_primaries, cf;
2715  char p_integrity_alg[SHARED_SECRET_MAX] = "";
2716 
2717  p_proto = be32_to_cpu(p->protocol);
2718  p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2719  p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2720  p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
2721  p_two_primaries = be32_to_cpu(p->two_primaries);
2722  cf = be32_to_cpu(p->conn_flags);
2723  p_want_lose = cf & CF_WANT_LOSE;
2724 
2725  clear_bit(CONN_DRY_RUN, &mdev->flags);
2726 
2727  if (cf & CF_DRY_RUN)
2728  set_bit(CONN_DRY_RUN, &mdev->flags);
2729 
2730  if (p_proto != mdev->net_conf->wire_protocol) {
2731  dev_err(DEV, "incompatible communication protocols\n");
2732  goto disconnect;
2733  }
2734 
2735  if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2736  dev_err(DEV, "incompatible after-sb-0pri settings\n");
2737  goto disconnect;
2738  }
2739 
2740  if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2741  dev_err(DEV, "incompatible after-sb-1pri settings\n");
2742  goto disconnect;
2743  }
2744 
2745  if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2746  dev_err(DEV, "incompatible after-sb-2pri settings\n");
2747  goto disconnect;
2748  }
2749 
2750  if (p_want_lose && mdev->net_conf->want_lose) {
2751  dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2752  goto disconnect;
2753  }
2754 
2755  if (p_two_primaries != mdev->net_conf->two_primaries) {
2756  dev_err(DEV, "incompatible setting of the two-primaries options\n");
2757  goto disconnect;
2758  }
2759 
2760  if (mdev->agreed_pro_version >= 87) {
2761  unsigned char *my_alg = mdev->net_conf->integrity_alg;
2762 
2763  if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2764  return false;
2765 
2766  p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2767  if (strcmp(p_integrity_alg, my_alg)) {
2768  dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2769  goto disconnect;
2770  }
2771  dev_info(DEV, "data-integrity-alg: %s\n",
2772  my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2773  }
2774 
2775  return true;
2776 
2777 disconnect:
2778  drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2779  return false;
2780 }
2781 
2782 /* helper function
2783  * input: alg name, feature name
2784  * return: NULL (alg name was "")
2785  * ERR_PTR(error) if something goes wrong
2786  * or the crypto hash ptr, if it worked out ok. */
2788  const char *alg, const char *name)
2789 {
2790  struct crypto_hash *tfm;
2791 
2792  if (!alg[0])
2793  return NULL;
2794 
2795  tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2796  if (IS_ERR(tfm)) {
2797  dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2798  alg, name, PTR_ERR(tfm));
2799  return tfm;
2800  }
2801  if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2802  crypto_free_hash(tfm);
2803  dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2804  return ERR_PTR(-EINVAL);
2805  }
2806  return tfm;
2807 }
2808 
2809 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2810 {
2811  int ok = true;
2812  struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2813  unsigned int header_size, data_size, exp_max_sz;
2814  struct crypto_hash *verify_tfm = NULL;
2815  struct crypto_hash *csums_tfm = NULL;
2816  const int apv = mdev->agreed_pro_version;
2817  int *rs_plan_s = NULL;
2818  int fifo_size = 0;
2819 
2820  exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2821  : apv == 88 ? sizeof(struct p_rs_param)
2823  : apv <= 94 ? sizeof(struct p_rs_param_89)
2824  : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2825 
2826  if (packet_size > exp_max_sz) {
2827  dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2828  packet_size, exp_max_sz);
2829  return false;
2830  }
2831 
2832  if (apv <= 88) {
2833  header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2834  data_size = packet_size - header_size;
2835  } else if (apv <= 94) {
2836  header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2837  data_size = packet_size - header_size;
2838  D_ASSERT(data_size == 0);
2839  } else {
2840  header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2841  data_size = packet_size - header_size;
2842  D_ASSERT(data_size == 0);
2843  }
2844 
2845  /* initialize verify_alg and csums_alg */
2846  memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2847 
2848  if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2849  return false;
2850 
2851  mdev->sync_conf.rate = be32_to_cpu(p->rate);
2852 
2853  if (apv >= 88) {
2854  if (apv == 88) {
2855  if (data_size > SHARED_SECRET_MAX || data_size == 0) {
2856  dev_err(DEV, "verify-alg of wrong size, "
2857  "peer wants %u, accepting only up to %u byte\n",
2858  data_size, SHARED_SECRET_MAX);
2859  return false;
2860  }
2861 
2862  if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2863  return false;
2864 
2865  /* we expect NUL terminated string */
2866  /* but just in case someone tries to be evil */
2867  D_ASSERT(p->verify_alg[data_size-1] == 0);
2868  p->verify_alg[data_size-1] = 0;
2869 
2870  } else /* apv >= 89 */ {
2871  /* we still expect NUL terminated strings */
2872  /* but just in case someone tries to be evil */
2874  D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2875  p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2876  p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2877  }
2878 
2879  if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2880  if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2881  dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2882  mdev->sync_conf.verify_alg, p->verify_alg);
2883  goto disconnect;
2884  }
2885  verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2886  p->verify_alg, "verify-alg");
2887  if (IS_ERR(verify_tfm)) {
2888  verify_tfm = NULL;
2889  goto disconnect;
2890  }
2891  }
2892 
2893  if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2894  if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2895  dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2896  mdev->sync_conf.csums_alg, p->csums_alg);
2897  goto disconnect;
2898  }
2899  csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2900  p->csums_alg, "csums-alg");
2901  if (IS_ERR(csums_tfm)) {
2902  csums_tfm = NULL;
2903  goto disconnect;
2904  }
2905  }
2906 
2907  if (apv > 94) {
2908  mdev->sync_conf.rate = be32_to_cpu(p->rate);
2909  mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2910  mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2911  mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2912  mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2913 
2914  fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2915  if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2916  rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2917  if (!rs_plan_s) {
2918  dev_err(DEV, "kmalloc of fifo_buffer failed");
2919  goto disconnect;
2920  }
2921  }
2922  }
2923 
2924  spin_lock(&mdev->peer_seq_lock);
2925  /* lock against drbd_nl_syncer_conf() */
2926  if (verify_tfm) {
2927  strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2928  mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2929  crypto_free_hash(mdev->verify_tfm);
2930  mdev->verify_tfm = verify_tfm;
2931  dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2932  }
2933  if (csums_tfm) {
2934  strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2935  mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2936  crypto_free_hash(mdev->csums_tfm);
2937  mdev->csums_tfm = csums_tfm;
2938  dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2939  }
2940  if (fifo_size != mdev->rs_plan_s.size) {
2941  kfree(mdev->rs_plan_s.values);
2942  mdev->rs_plan_s.values = rs_plan_s;
2943  mdev->rs_plan_s.size = fifo_size;
2944  mdev->rs_planed = 0;
2945  }
2946  spin_unlock(&mdev->peer_seq_lock);
2947  }
2948 
2949  return ok;
2950 disconnect:
2951  /* just for completeness: actually not needed,
2952  * as this is not reached if csums_tfm was ok. */
2953  crypto_free_hash(csums_tfm);
2954  /* but free the verify_tfm again, if csums_tfm did not work out */
2955  crypto_free_hash(verify_tfm);
2956  drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2957  return false;
2958 }
2959 
2960 /* warn if the arguments differ by more than 12.5% */
2961 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2962  const char *s, sector_t a, sector_t b)
2963 {
2964  sector_t d;
2965  if (a == 0 || b == 0)
2966  return;
2967  d = (a > b) ? (a - b) : (b - a);
2968  if (d > (a>>3) || d > (b>>3))
2969  dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2970  (unsigned long long)a, (unsigned long long)b);
2971 }
2972 
2973 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2974 {
2975  struct p_sizes *p = &mdev->data.rbuf.sizes;
2977  sector_t p_size, p_usize, my_usize;
2978  int ldsc = 0; /* local disk size changed */
2979  enum dds_flags ddsf;
2980 
2981  p_size = be64_to_cpu(p->d_size);
2982  p_usize = be64_to_cpu(p->u_size);
2983 
2984  if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2985  dev_err(DEV, "some backing storage is needed\n");
2986  drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2987  return false;
2988  }
2989 
2990  /* just store the peer's disk size for now.
2991  * we still need to figure out whether we accept that. */
2992  mdev->p_size = p_size;
2993 
2994  if (get_ldev(mdev)) {
2995  warn_if_differ_considerably(mdev, "lower level device sizes",
2996  p_size, drbd_get_max_capacity(mdev->ldev));
2997  warn_if_differ_considerably(mdev, "user requested size",
2998  p_usize, mdev->ldev->dc.disk_size);
2999 
3000  /* if this is the first connect, or an otherwise expected
3001  * param exchange, choose the minimum */
3002  if (mdev->state.conn == C_WF_REPORT_PARAMS)
3003  p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3004  p_usize);
3005 
3006  my_usize = mdev->ldev->dc.disk_size;
3007 
3008  if (mdev->ldev->dc.disk_size != p_usize) {
3009  mdev->ldev->dc.disk_size = p_usize;
3010  dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3011  (unsigned long)mdev->ldev->dc.disk_size);
3012  }
3013 
3014  /* Never shrink a device with usable data during connect.
3015  But allow online shrinking if we are connected. */
3016  if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3017  drbd_get_capacity(mdev->this_bdev) &&
3018  mdev->state.disk >= D_OUTDATED &&
3019  mdev->state.conn < C_CONNECTED) {
3020  dev_err(DEV, "The peer's disk size is too small!\n");
3021  drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3022  mdev->ldev->dc.disk_size = my_usize;
3023  put_ldev(mdev);
3024  return false;
3025  }
3026  put_ldev(mdev);
3027  }
3028 
3029  ddsf = be16_to_cpu(p->dds_flags);
3030  if (get_ldev(mdev)) {
3031  dd = drbd_determine_dev_size(mdev, ddsf);
3032  put_ldev(mdev);
3033  if (dd == dev_size_error)
3034  return false;
3035  drbd_md_sync(mdev);
3036  } else {
3037  /* I am diskless, need to accept the peer's size. */
3038  drbd_set_my_capacity(mdev, p_size);
3039  }
3040 
3043 
3044  if (get_ldev(mdev)) {
3045  if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3046  mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3047  ldsc = 1;
3048  }
3049 
3050  put_ldev(mdev);
3051  }
3052 
3053  if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3054  if (be64_to_cpu(p->c_size) !=
3055  drbd_get_capacity(mdev->this_bdev) || ldsc) {
3056  /* we have different sizes, probably peer
3057  * needs to know my new size... */
3058  drbd_send_sizes(mdev, 0, ddsf);
3059  }
3060  if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3061  (dd == grew && mdev->state.conn == C_CONNECTED)) {
3062  if (mdev->state.pdsk >= D_INCONSISTENT &&
3063  mdev->state.disk >= D_INCONSISTENT) {
3064  if (ddsf & DDSF_NO_RESYNC)
3065  dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3066  else
3068  } else
3069  set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3070  }
3071  }
3072 
3073  return true;
3074 }
3075 
3076 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3077 {
3078  struct p_uuids *p = &mdev->data.rbuf.uuids;
3079  u64 *p_uuid;
3080  int i, updated_uuids = 0;
3081 
3082  p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3083 
3084  for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3085  p_uuid[i] = be64_to_cpu(p->uuid[i]);
3086 
3087  kfree(mdev->p_uuid);
3088  mdev->p_uuid = p_uuid;
3089 
3090  if (mdev->state.conn < C_CONNECTED &&
3091  mdev->state.disk < D_INCONSISTENT &&
3092  mdev->state.role == R_PRIMARY &&
3093  (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3094  dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3095  (unsigned long long)mdev->ed_uuid);
3096  drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3097  return false;
3098  }
3099 
3100  if (get_ldev(mdev)) {
3101  int skip_initial_sync =
3102  mdev->state.conn == C_CONNECTED &&
3103  mdev->agreed_pro_version >= 90 &&
3104  mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3105  (p_uuid[UI_FLAGS] & 8);
3106  if (skip_initial_sync) {
3107  dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3109  "clear_n_write from receive_uuids",
3111  _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3112  _drbd_uuid_set(mdev, UI_BITMAP, 0);
3113  _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3114  CS_VERBOSE, NULL);
3115  drbd_md_sync(mdev);
3116  updated_uuids = 1;
3117  }
3118  put_ldev(mdev);
3119  } else if (mdev->state.disk < D_INCONSISTENT &&
3120  mdev->state.role == R_PRIMARY) {
3121  /* I am a diskless primary, the peer just created a new current UUID
3122  for me. */
3123  updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3124  }
3125 
3126  /* Before we test for the disk state, we should wait until an eventually
3127  ongoing cluster wide state change is finished. That is important if
3128  we are primary and are detaching from our disk. We need to see the
3129  new disk state... */
3131  if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3132  updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3133 
3134  if (updated_uuids)
3135  drbd_print_uuids(mdev, "receiver updated UUIDs to");
3136 
3137  return true;
3138 }
3139 
3144 static union drbd_state convert_state(union drbd_state ps)
3145 {
3146  union drbd_state ms;
3147 
3148  static enum drbd_conns c_tab[] = {
3150 
3153  [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3154  [C_VERIFY_S] = C_VERIFY_T,
3155  [C_MASK] = C_MASK,
3156  };
3157 
3158  ms.i = ps.i;
3159 
3160  ms.conn = c_tab[ps.conn];
3161  ms.peer = ps.role;
3162  ms.role = ps.peer;
3163  ms.pdsk = ps.disk;
3164  ms.disk = ps.pdsk;
3165  ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3166 
3167  return ms;
3168 }
3169 
3170 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3171 {
3172  struct p_req_state *p = &mdev->data.rbuf.req_state;
3173  union drbd_state mask, val;
3174  enum drbd_state_rv rv;
3175 
3176  mask.i = be32_to_cpu(p->mask);
3177  val.i = be32_to_cpu(p->val);
3178 
3179  if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3180  test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3182  return true;
3183  }
3184 
3185  mask = convert_state(mask);
3186  val = convert_state(val);
3187 
3188  rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3189 
3190  drbd_send_sr_reply(mdev, rv);
3191  drbd_md_sync(mdev);
3192 
3193  return true;
3194 }
3195 
3196 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3197 {
3198  struct p_state *p = &mdev->data.rbuf.state;
3199  union drbd_state os, ns, peer_state;
3200  enum drbd_disk_state real_peer_disk;
3201  enum chg_state_flags cs_flags;
3202  int rv;
3203 
3204  peer_state.i = be32_to_cpu(p->state);
3205 
3206  real_peer_disk = peer_state.disk;
3207  if (peer_state.disk == D_NEGOTIATING) {
3208  real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3209  dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3210  }
3211 
3212  spin_lock_irq(&mdev->req_lock);
3213  retry:
3214  os = ns = mdev->state;
3215  spin_unlock_irq(&mdev->req_lock);
3216 
3217  /* If some other part of the code (asender thread, timeout)
3218  * already decided to close the connection again,
3219  * we must not "re-establish" it here. */
3220  if (os.conn <= C_TEAR_DOWN)
3221  return false;
3222 
3223  /* If this is the "end of sync" confirmation, usually the peer disk
3224  * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3225  * set) resync started in PausedSyncT, or if the timing of pause-/
3226  * unpause-sync events has been "just right", the peer disk may
3227  * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3228  */
3229  if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3230  real_peer_disk == D_UP_TO_DATE &&
3231  os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3232  /* If we are (becoming) SyncSource, but peer is still in sync
3233  * preparation, ignore its uptodate-ness to avoid flapping, it
3234  * will change to inconsistent once the peer reaches active
3235  * syncing states.
3236  * It may have changed syncer-paused flags, however, so we
3237  * cannot ignore this completely. */
3238  if (peer_state.conn > C_CONNECTED &&
3239  peer_state.conn < C_SYNC_SOURCE)
3240  real_peer_disk = D_INCONSISTENT;
3241 
3242  /* if peer_state changes to connected at the same time,
3243  * it explicitly notifies us that it finished resync.
3244  * Maybe we should finish it up, too? */
3245  else if (os.conn >= C_SYNC_SOURCE &&
3246  peer_state.conn == C_CONNECTED) {
3247  if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3248  drbd_resync_finished(mdev);
3249  return true;
3250  }
3251  }
3252 
3253  /* peer says his disk is inconsistent, while we think it is uptodate,
3254  * and this happens while the peer still thinks we have a sync going on,
3255  * but we think we are already done with the sync.
3256  * We ignore this to avoid flapping pdsk.
3257  * This should not happen, if the peer is a recent version of drbd. */
3258  if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3259  os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3260  real_peer_disk = D_UP_TO_DATE;
3261 
3262  if (ns.conn == C_WF_REPORT_PARAMS)
3263  ns.conn = C_CONNECTED;
3264 
3265  if (peer_state.conn == C_AHEAD)
3266  ns.conn = C_BEHIND;
3267 
3268  if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3270  int cr; /* consider resync */
3271 
3272  /* if we established a new connection */
3273  cr = (os.conn < C_CONNECTED);
3274  /* if we had an established connection
3275  * and one of the nodes newly attaches a disk */
3276  cr |= (os.conn == C_CONNECTED &&
3277  (peer_state.disk == D_NEGOTIATING ||
3278  os.disk == D_NEGOTIATING));
3279  /* if we have both been inconsistent, and the peer has been
3280  * forced to be UpToDate with --overwrite-data */
3281  cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3282  /* if we had been plain connected, and the admin requested to
3283  * start a sync by "invalidate" or "invalidate-remote" */
3284  cr |= (os.conn == C_CONNECTED &&
3285  (peer_state.conn >= C_STARTING_SYNC_S &&
3286  peer_state.conn <= C_WF_BITMAP_T));
3287 
3288  if (cr)
3289  ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3290 
3291  put_ldev(mdev);
3292  if (ns.conn == C_MASK) {
3293  ns.conn = C_CONNECTED;
3294  if (mdev->state.disk == D_NEGOTIATING) {
3295  drbd_force_state(mdev, NS(disk, D_FAILED));
3296  } else if (peer_state.disk == D_NEGOTIATING) {
3297  dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3298  peer_state.disk = D_DISKLESS;
3299  real_peer_disk = D_DISKLESS;
3300  } else {
3301  if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3302  return false;
3303  D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3305  return false;
3306  }
3307  }
3308  }
3309 
3310  spin_lock_irq(&mdev->req_lock);
3311  if (mdev->state.i != os.i)
3312  goto retry;
3313  clear_bit(CONSIDER_RESYNC, &mdev->flags);
3314  ns.peer = peer_state.role;
3315  ns.pdsk = real_peer_disk;
3316  ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3317  if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3318  ns.disk = mdev->new_state_tmp.disk;
3319  cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3320  if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3321  test_bit(NEW_CUR_UUID, &mdev->flags)) {
3322  /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3323  for temporal network outages! */
3324  spin_unlock_irq(&mdev->req_lock);
3325  dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3326  tl_clear(mdev);
3327  drbd_uuid_new_current(mdev);
3328  clear_bit(NEW_CUR_UUID, &mdev->flags);
3330  return false;
3331  }
3332  rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3333  ns = mdev->state;
3334  spin_unlock_irq(&mdev->req_lock);
3335 
3336  if (rv < SS_SUCCESS) {
3338  return false;
3339  }
3340 
3341  if (os.conn > C_WF_REPORT_PARAMS) {
3342  if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3343  peer_state.disk != D_NEGOTIATING ) {
3344  /* we want resync, peer has not yet decided to sync... */
3345  /* Nowadays only used when forcing a node into primary role and
3346  setting its disk to UpToDate with that */
3347  drbd_send_uuids(mdev);
3349  }
3350  }
3351 
3352  mdev->net_conf->want_lose = 0;
3353 
3354  drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3355 
3356  return true;
3357 }
3358 
3359 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3360 {
3361  struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3362 
3363  wait_event(mdev->misc_wait,
3364  mdev->state.conn == C_WF_SYNC_UUID ||
3365  mdev->state.conn == C_BEHIND ||
3366  mdev->state.conn < C_CONNECTED ||
3367  mdev->state.disk < D_NEGOTIATING);
3368 
3369  /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3370 
3371  /* Here the _drbd_uuid_ functions are right, current should
3372  _not_ be rotated into the history */
3373  if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3374  _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3375  _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3376 
3377  drbd_print_uuids(mdev, "updated sync uuid");
3379 
3380  put_ldev(mdev);
3381  } else
3382  dev_err(DEV, "Ignoring SyncUUID packet!\n");
3383 
3384  return true;
3385 }
3386 
3393 static int
3394 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3395  unsigned long *buffer, struct bm_xfer_ctx *c)
3396 {
3397  unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3398  unsigned want = num_words * sizeof(long);
3399  int err;
3400 
3401  if (want != data_size) {
3402  dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3403  return -EIO;
3404  }
3405  if (want == 0)
3406  return 0;
3407  err = drbd_recv(mdev, buffer, want);
3408  if (err != want) {
3409  if (err >= 0)
3410  err = -EIO;
3411  return err;
3412  }
3413 
3414  drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3415 
3416  c->word_offset += num_words;
3418  if (c->bit_offset > c->bm_bits)
3419  c->bit_offset = c->bm_bits;
3420 
3421  return 1;
3422 }
3423 
3430 static int
3431 recv_bm_rle_bits(struct drbd_conf *mdev,
3432  struct p_compressed_bm *p,
3433  struct bm_xfer_ctx *c)
3434 {
3435  struct bitstream bs;
3436  u64 look_ahead;
3437  u64 rl;
3438  u64 tmp;
3439  unsigned long s = c->bit_offset;
3440  unsigned long e;
3441  int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3442  int toggle = DCBP_get_start(p);
3443  int have;
3444  int bits;
3445 
3446  bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3447 
3448  bits = bitstream_get_bits(&bs, &look_ahead, 64);
3449  if (bits < 0)
3450  return -EIO;
3451 
3452  for (have = bits; have > 0; s += rl, toggle = !toggle) {
3453  bits = vli_decode_bits(&rl, look_ahead);
3454  if (bits <= 0)
3455  return -EIO;
3456 
3457  if (toggle) {
3458  e = s + rl -1;
3459  if (e >= c->bm_bits) {
3460  dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3461  return -EIO;
3462  }
3463  _drbd_bm_set_bits(mdev, s, e);
3464  }
3465 
3466  if (have < bits) {
3467  dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3468  have, bits, look_ahead,
3469  (unsigned int)(bs.cur.b - p->code),
3470  (unsigned int)bs.buf_len);
3471  return -EIO;
3472  }
3473  look_ahead >>= bits;
3474  have -= bits;
3475 
3476  bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3477  if (bits < 0)
3478  return -EIO;
3479  look_ahead |= tmp << have;
3480  have += bits;
3481  }
3482 
3483  c->bit_offset = s;
3484  bm_xfer_ctx_bit_to_word_offset(c);
3485 
3486  return (s != c->bm_bits);
3487 }
3488 
3495 static int
3496 decode_bitmap_c(struct drbd_conf *mdev,
3497  struct p_compressed_bm *p,
3498  struct bm_xfer_ctx *c)
3499 {
3500  if (DCBP_get_code(p) == RLE_VLI_Bits)
3501  return recv_bm_rle_bits(mdev, p, c);
3502 
3503  /* other variants had been implemented for evaluation,
3504  * but have been dropped as this one turned out to be "best"
3505  * during all our tests. */
3506 
3507  dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3508  drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3509  return -EIO;
3510 }
3511 
3512 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3513  const char *direction, struct bm_xfer_ctx *c)
3514 {
3515  /* what would it take to transfer it "plaintext" */
3516  unsigned plain = sizeof(struct p_header80) *
3518  + c->bm_words * sizeof(long);
3519  unsigned total = c->bytes[0] + c->bytes[1];
3520  unsigned r;
3521 
3522  /* total can not be zero. but just in case: */
3523  if (total == 0)
3524  return;
3525 
3526  /* don't report if not compressed */
3527  if (total >= plain)
3528  return;
3529 
3530  /* total < plain. check for overflow, still */
3531  r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3532  : (1000 * total / plain);
3533 
3534  if (r > 1000)
3535  r = 1000;
3536 
3537  r = 1000 - r;
3538  dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3539  "total %u; compression: %u.%u%%\n",
3540  direction,
3541  c->bytes[1], c->packets[1],
3542  c->bytes[0], c->packets[0],
3543  total, r/10, r % 10);
3544 }
3545 
3546 /* Since we are processing the bitfield from lower addresses to higher,
3547  it does not matter if the process it in 32 bit chunks or 64 bit
3548  chunks as long as it is little endian. (Understand it as byte stream,
3549  beginning with the lowest byte...) If we would use big endian
3550  we would need to process it from the highest address to the lowest,
3551  in order to be agnostic to the 32 vs 64 bits issue.
3552 
3553  returns 0 on failure, 1 if we successfully received it. */
3554 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3555 {
3556  struct bm_xfer_ctx c;
3557  void *buffer;
3558  int err;
3559  int ok = false;
3560  struct p_header80 *h = &mdev->data.rbuf.header.h80;
3561 
3562  drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3563  /* you are supposed to send additional out-of-sync information
3564  * if you actually set bits during this phase */
3565 
3566  /* maybe we should use some per thread scratch page,
3567  * and allocate that during initial device creation? */
3568  buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3569  if (!buffer) {
3570  dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3571  goto out;
3572  }
3573 
3574  c = (struct bm_xfer_ctx) {
3575  .bm_bits = drbd_bm_bits(mdev),
3576  .bm_words = drbd_bm_words(mdev),
3577  };
3578 
3579  for(;;) {
3580  if (cmd == P_BITMAP) {
3581  err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3582  } else if (cmd == P_COMPRESSED_BITMAP) {
3583  /* MAYBE: sanity check that we speak proto >= 90,
3584  * and the feature is enabled! */
3585  struct p_compressed_bm *p;
3586 
3587  if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3588  dev_err(DEV, "ReportCBitmap packet too large\n");
3589  goto out;
3590  }
3591  /* use the page buff */
3592  p = buffer;
3593  memcpy(p, h, sizeof(*h));
3594  if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3595  goto out;
3596  if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3597  dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3598  goto out;
3599  }
3600  err = decode_bitmap_c(mdev, p, &c);
3601  } else {
3602  dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3603  goto out;
3604  }
3605 
3606  c.packets[cmd == P_BITMAP]++;
3607  c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3608 
3609  if (err <= 0) {
3610  if (err < 0)
3611  goto out;
3612  break;
3613  }
3614  if (!drbd_recv_header(mdev, &cmd, &data_size))
3615  goto out;
3616  }
3617 
3618  INFO_bm_xfer_stats(mdev, "receive", &c);
3619 
3620  if (mdev->state.conn == C_WF_BITMAP_T) {
3621  enum drbd_state_rv rv;
3622 
3623  ok = !drbd_send_bitmap(mdev);
3624  if (!ok)
3625  goto out;
3626  /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3627  rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3628  D_ASSERT(rv == SS_SUCCESS);
3629  } else if (mdev->state.conn != C_WF_BITMAP_S) {
3630  /* admin may have requested C_DISCONNECTING,
3631  * other threads may have noticed network errors */
3632  dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3633  drbd_conn_str(mdev->state.conn));
3634  }
3635 
3636  ok = true;
3637  out:
3638  drbd_bm_unlock(mdev);
3639  if (ok && mdev->state.conn == C_WF_BITMAP_S)
3641  free_page((unsigned long) buffer);
3642  return ok;
3643 }
3644 
3645 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3646 {
3647  /* TODO zero copy sink :) */
3648  static char sink[128];
3649  int size, want, r;
3650 
3651  dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3652  cmd, data_size);
3653 
3654  size = data_size;
3655  while (size > 0) {
3656  want = min_t(int, size, sizeof(sink));
3657  r = drbd_recv(mdev, sink, want);
3658  ERR_IF(r <= 0) break;
3659  size -= r;
3660  }
3661  return size == 0;
3662 }
3663 
3664 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3665 {
3666  /* Make sure we've acked all the TCP data associated
3667  * with the data requests being unplugged */
3668  drbd_tcp_quickack(mdev->data.socket);
3669 
3670  return true;
3671 }
3672 
3673 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3674 {
3675  struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3676 
3677  switch (mdev->state.conn) {
3678  case C_WF_SYNC_UUID:
3679  case C_WF_BITMAP_T:
3680  case C_BEHIND:
3681  break;
3682  default:
3683  dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3684  drbd_conn_str(mdev->state.conn));
3685  }
3686 
3688 
3689  return true;
3690 }
3691 
3692 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3693 
3694 struct data_cmd {
3696  size_t pkt_size;
3698 };
3699 
3700 static struct data_cmd drbd_cmd_handler[] = {
3701  [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3702  [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3703  [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3704  [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3705  [P_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3706  [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3707  [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3708  [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3709  [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3710  [P_SYNC_PARAM] = { 1, sizeof(struct p_header80), receive_SyncParam },
3711  [P_SYNC_PARAM89] = { 1, sizeof(struct p_header80), receive_SyncParam },
3712  [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3713  [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3714  [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3715  [P_STATE] = { 0, sizeof(struct p_state), receive_state },
3716  [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3717  [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3718  [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3719  [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3720  [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3721  [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
3722  [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3723  /* anything missing from this table is in
3724  * the asender_tbl, see get_asender_cmd */
3725  [P_MAX_CMD] = { 0, 0, NULL },
3726 };
3727 
3728 /* All handler functions that expect a sub-header get that sub-heder in
3729  mdev->data.rbuf.header.head.payload.
3730 
3731  Usually in mdev->data.rbuf.header.head the callback can find the usual
3732  p_header, but they may not rely on that. Since there is also p_header95 !
3733  */
3734 
3735 static void drbdd(struct drbd_conf *mdev)
3736 {
3737  union p_header *header = &mdev->data.rbuf.header;
3738  unsigned int packet_size;
3739  enum drbd_packets cmd;
3740  size_t shs; /* sub header size */
3741  int rv;
3742 
3743  while (get_t_state(&mdev->receiver) == Running) {
3745  if (!drbd_recv_header(mdev, &cmd, &packet_size))
3746  goto err_out;
3747 
3748  if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3749  dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3750  goto err_out;
3751  }
3752 
3753  shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3754  if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3755  dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3756  goto err_out;
3757  }
3758 
3759  if (shs) {
3760  rv = drbd_recv(mdev, &header->h80.payload, shs);
3761  if (unlikely(rv != shs)) {
3762  if (!signal_pending(current))
3763  dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3764  goto err_out;
3765  }
3766  }
3767 
3768  rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3769 
3770  if (unlikely(!rv)) {
3771  dev_err(DEV, "error receiving %s, l: %d!\n",
3772  cmdname(cmd), packet_size);
3773  goto err_out;
3774  }
3775  }
3776 
3777  if (0) {
3778  err_out:
3779  drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3780  }
3781  /* If we leave here, we probably want to update at least the
3782  * "Connected" indicator on stable storage. Do so explicitly here. */
3783  drbd_md_sync(mdev);
3784 }
3785 
3787 {
3788  struct drbd_wq_barrier barr;
3789 
3790  barr.w.cb = w_prev_work_done;
3791  init_completion(&barr.done);
3792  drbd_queue_work(&mdev->data.work, &barr.w);
3793  wait_for_completion(&barr.done);
3794 }
3795 
3796 void drbd_free_tl_hash(struct drbd_conf *mdev)
3797 {
3798  struct hlist_head *h;
3799 
3800  spin_lock_irq(&mdev->req_lock);
3801 
3802  if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3803  spin_unlock_irq(&mdev->req_lock);
3804  return;
3805  }
3806  /* paranoia code */
3807  for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3808  if (h->first)
3809  dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3810  (int)(h - mdev->ee_hash), h->first);
3811  kfree(mdev->ee_hash);
3812  mdev->ee_hash = NULL;
3813  mdev->ee_hash_s = 0;
3814 
3815  /* We may not have had the chance to wait for all locally pending
3816  * application requests. The hlist_add_fake() prevents access after
3817  * free on master bio completion. */
3818  for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++) {
3819  struct drbd_request *req;
3820  struct hlist_node *pos, *n;
3821  hlist_for_each_entry_safe(req, pos, n, h, collision) {
3822  hlist_del_init(&req->collision);
3823  hlist_add_fake(&req->collision);
3824  }
3825  }
3826 
3827  kfree(mdev->tl_hash);
3828  mdev->tl_hash = NULL;
3829  mdev->tl_hash_s = 0;
3830  spin_unlock_irq(&mdev->req_lock);
3831 }
3832 
3833 static void drbd_disconnect(struct drbd_conf *mdev)
3834 {
3835  enum drbd_fencing_p fp;
3836  union drbd_state os, ns;
3837  int rv = SS_UNKNOWN_ERROR;
3838  unsigned int i;
3839 
3840  if (mdev->state.conn == C_STANDALONE)
3841  return;
3842 
3843  /* We are about to start the cleanup after connection loss.
3844  * Make sure drbd_make_request knows about that.
3845  * Usually we should be in some network failure state already,
3846  * but just in case we are not, we fix it up here.
3847  */
3849 
3850  /* asender does not clean up anything. it must not interfere, either */
3851  drbd_thread_stop(&mdev->asender);
3852  drbd_free_sock(mdev);
3853 
3854  /* wait for current activity to cease. */
3855  spin_lock_irq(&mdev->req_lock);
3856  _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3857  _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3858  _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3859  spin_unlock_irq(&mdev->req_lock);
3860 
3861  /* We do not have data structures that would allow us to
3862  * get the rs_pending_cnt down to 0 again.
3863  * * On C_SYNC_TARGET we do not have any data structures describing
3864  * the pending RSDataRequest's we have sent.
3865  * * On C_SYNC_SOURCE there is no data structure that tracks
3866  * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3867  * And no, it is not the sum of the reference counts in the
3868  * resync_LRU. The resync_LRU tracks the whole operation including
3869  * the disk-IO, while the rs_pending_cnt only tracks the blocks
3870  * on the fly. */
3871  drbd_rs_cancel_all(mdev);
3872  mdev->rs_total = 0;
3873  mdev->rs_failed = 0;
3874  atomic_set(&mdev->rs_pending_cnt, 0);
3875  wake_up(&mdev->misc_wait);
3876 
3877  /* make sure syncer is stopped and w_resume_next_sg queued */
3878  del_timer_sync(&mdev->resync_timer);
3879  resync_timer_fn((unsigned long)mdev);
3880 
3881  /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3882  * w_make_resync_request etc. which may still be on the worker queue
3883  * to be "canceled" */
3884  drbd_flush_workqueue(mdev);
3885 
3886  /* This also does reclaim_net_ee(). If we do this too early, we might
3887  * miss some resync ee and pages.*/
3888  drbd_process_done_ee(mdev);
3889 
3890  kfree(mdev->p_uuid);
3891  mdev->p_uuid = NULL;
3892 
3893  if (!is_susp(mdev->state))
3894  tl_clear(mdev);
3895 
3896  dev_info(DEV, "Connection closed\n");
3897 
3898  drbd_md_sync(mdev);
3899 
3900  fp = FP_DONT_CARE;
3901  if (get_ldev(mdev)) {
3902  fp = mdev->ldev->dc.fencing;
3903  put_ldev(mdev);
3904  }
3905 
3906  if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3908 
3909  spin_lock_irq(&mdev->req_lock);
3910  os = mdev->state;
3911  if (os.conn >= C_UNCONNECTED) {
3912  /* Do not restart in case we are C_DISCONNECTING */
3913  ns = os;
3914  ns.conn = C_UNCONNECTED;
3915  rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3916  }
3917  spin_unlock_irq(&mdev->req_lock);
3918 
3919  if (os.conn == C_DISCONNECTING) {
3920  wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3921 
3922  crypto_free_hash(mdev->cram_hmac_tfm);
3923  mdev->cram_hmac_tfm = NULL;
3924 
3925  kfree(mdev->net_conf);
3926  mdev->net_conf = NULL;
3927  drbd_request_state(mdev, NS(conn, C_STANDALONE));
3928  }
3929 
3930  /* serialize with bitmap writeout triggered by the state change,
3931  * if any. */
3932  wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3933 
3934  /* tcp_close and release of sendpage pages can be deferred. I don't
3935  * want to use SO_LINGER, because apparently it can be deferred for
3936  * more than 20 seconds (longest time I checked).
3937  *
3938  * Actually we don't care for exactly when the network stack does its
3939  * put_page(), but release our reference on these pages right here.
3940  */
3941  i = drbd_release_ee(mdev, &mdev->net_ee);
3942  if (i)
3943  dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3944  i = atomic_read(&mdev->pp_in_use_by_net);
3945  if (i)
3946  dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3947  i = atomic_read(&mdev->pp_in_use);
3948  if (i)
3949  dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3950 
3951  D_ASSERT(list_empty(&mdev->read_ee));
3952  D_ASSERT(list_empty(&mdev->active_ee));
3953  D_ASSERT(list_empty(&mdev->sync_ee));
3954  D_ASSERT(list_empty(&mdev->done_ee));
3955 
3956  /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3957  atomic_set(&mdev->current_epoch->epoch_size, 0);
3958  D_ASSERT(list_empty(&mdev->current_epoch->list));
3959 }
3960 
3961 /*
3962  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3963  * we can agree on is stored in agreed_pro_version.
3964  *
3965  * feature flags and the reserved array should be enough room for future
3966  * enhancements of the handshake protocol, and possible plugins...
3967  *
3968  * for now, they are expected to be zero, but ignored.
3969  */
3970 static int drbd_send_handshake(struct drbd_conf *mdev)
3971 {
3972  /* ASSERT current == mdev->receiver ... */
3973  struct p_handshake *p = &mdev->data.sbuf.handshake;
3974  int ok;
3975 
3976  if (mutex_lock_interruptible(&mdev->data.mutex)) {
3977  dev_err(DEV, "interrupted during initial handshake\n");
3978  return 0; /* interrupted. not ok. */
3979  }
3980 
3981  if (mdev->data.socket == NULL) {
3982  mutex_unlock(&mdev->data.mutex);
3983  return 0;
3984  }
3985 
3986  memset(p, 0, sizeof(*p));
3989  ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3990  (struct p_header80 *)p, sizeof(*p), 0 );
3991  mutex_unlock(&mdev->data.mutex);
3992  return ok;
3993 }
3994 
3995 /*
3996  * return values:
3997  * 1 yes, we have a valid connection
3998  * 0 oops, did not work out, please try again
3999  * -1 peer talks different language,
4000  * no point in trying again, please go standalone.
4001  */
4002 static int drbd_do_handshake(struct drbd_conf *mdev)
4003 {
4004  /* ASSERT current == mdev->receiver ... */
4005  struct p_handshake *p = &mdev->data.rbuf.handshake;
4006  const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
4007  unsigned int length;
4008  enum drbd_packets cmd;
4009  int rv;
4010 
4011  rv = drbd_send_handshake(mdev);
4012  if (!rv)
4013  return 0;
4014 
4015  rv = drbd_recv_header(mdev, &cmd, &length);
4016  if (!rv)
4017  return 0;
4018 
4019  if (cmd != P_HAND_SHAKE) {
4020  dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
4021  cmdname(cmd), cmd);
4022  return -1;
4023  }
4024 
4025  if (length != expect) {
4026  dev_err(DEV, "expected HandShake length: %u, received: %u\n",
4027  expect, length);
4028  return -1;
4029  }
4030 
4031  rv = drbd_recv(mdev, &p->head.payload, expect);
4032 
4033  if (rv != expect) {
4034  if (!signal_pending(current))
4035  dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
4036  return 0;
4037  }
4038 
4041  if (p->protocol_max == 0)
4042  p->protocol_max = p->protocol_min;
4043 
4044  if (PRO_VERSION_MAX < p->protocol_min ||
4046  goto incompat;
4047 
4049 
4050  dev_info(DEV, "Handshake successful: "
4051  "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4052 
4053  return 1;
4054 
4055  incompat:
4056  dev_err(DEV, "incompatible DRBD dialects: "
4057  "I support %d-%d, peer supports %d-%d\n",
4059  p->protocol_min, p->protocol_max);
4060  return -1;
4061 }
4062 
4063 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4064 static int drbd_do_auth(struct drbd_conf *mdev)
4065 {
4066  dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4067  dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4068  return -1;
4069 }
4070 #else
4071 #define CHALLENGE_LEN 64
4072 
4073 /* Return value:
4074  1 - auth succeeded,
4075  0 - failed, try again (network error),
4076  -1 - auth failed, don't try again.
4077 */
4078 
4079 static int drbd_do_auth(struct drbd_conf *mdev)
4080 {
4081  char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4082  struct scatterlist sg;
4083  char *response = NULL;
4084  char *right_response = NULL;
4085  char *peers_ch = NULL;
4086  unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4087  unsigned int resp_size;
4088  struct hash_desc desc;
4089  enum drbd_packets cmd;
4090  unsigned int length;
4091  int rv;
4092 
4093  desc.tfm = mdev->cram_hmac_tfm;
4094  desc.flags = 0;
4095 
4096  rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4097  (u8 *)mdev->net_conf->shared_secret, key_len);
4098  if (rv) {
4099  dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4100  rv = -1;
4101  goto fail;
4102  }
4103 
4104  get_random_bytes(my_challenge, CHALLENGE_LEN);
4105 
4106  rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4107  if (!rv)
4108  goto fail;
4109 
4110  rv = drbd_recv_header(mdev, &cmd, &length);
4111  if (!rv)
4112  goto fail;
4113 
4114  if (cmd != P_AUTH_CHALLENGE) {
4115  dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4116  cmdname(cmd), cmd);
4117  rv = 0;
4118  goto fail;
4119  }
4120 
4121  if (length > CHALLENGE_LEN * 2) {
4122  dev_err(DEV, "expected AuthChallenge payload too big.\n");
4123  rv = -1;
4124  goto fail;
4125  }
4126 
4127  peers_ch = kmalloc(length, GFP_NOIO);
4128  if (peers_ch == NULL) {
4129  dev_err(DEV, "kmalloc of peers_ch failed\n");
4130  rv = -1;
4131  goto fail;
4132  }
4133 
4134  rv = drbd_recv(mdev, peers_ch, length);
4135 
4136  if (rv != length) {
4137  if (!signal_pending(current))
4138  dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4139  rv = 0;
4140  goto fail;
4141  }
4142 
4143  resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4144  response = kmalloc(resp_size, GFP_NOIO);
4145  if (response == NULL) {
4146  dev_err(DEV, "kmalloc of response failed\n");
4147  rv = -1;
4148  goto fail;
4149  }
4150 
4151  sg_init_table(&sg, 1);
4152  sg_set_buf(&sg, peers_ch, length);
4153 
4154  rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4155  if (rv) {
4156  dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4157  rv = -1;
4158  goto fail;
4159  }
4160 
4161  rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4162  if (!rv)
4163  goto fail;
4164 
4165  rv = drbd_recv_header(mdev, &cmd, &length);
4166  if (!rv)
4167  goto fail;
4168 
4169  if (cmd != P_AUTH_RESPONSE) {
4170  dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4171  cmdname(cmd), cmd);
4172  rv = 0;
4173  goto fail;
4174  }
4175 
4176  if (length != resp_size) {
4177  dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4178  rv = 0;
4179  goto fail;
4180  }
4181 
4182  rv = drbd_recv(mdev, response , resp_size);
4183 
4184  if (rv != resp_size) {
4185  if (!signal_pending(current))
4186  dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4187  rv = 0;
4188  goto fail;
4189  }
4190 
4191  right_response = kmalloc(resp_size, GFP_NOIO);
4192  if (right_response == NULL) {
4193  dev_err(DEV, "kmalloc of right_response failed\n");
4194  rv = -1;
4195  goto fail;
4196  }
4197 
4198  sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4199 
4200  rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4201  if (rv) {
4202  dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4203  rv = -1;
4204  goto fail;
4205  }
4206 
4207  rv = !memcmp(response, right_response, resp_size);
4208 
4209  if (rv)
4210  dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4211  resp_size, mdev->net_conf->cram_hmac_alg);
4212  else
4213  rv = -1;
4214 
4215  fail:
4216  kfree(peers_ch);
4217  kfree(response);
4218  kfree(right_response);
4219 
4220  return rv;
4221 }
4222 #endif
4223 
4224 int drbdd_init(struct drbd_thread *thi)
4225 {
4226  struct drbd_conf *mdev = thi->mdev;
4227  unsigned int minor = mdev_to_minor(mdev);
4228  int h;
4229 
4230  sprintf(current->comm, "drbd%d_receiver", minor);
4231 
4232  dev_info(DEV, "receiver (re)started\n");
4233 
4234  do {
4235  h = drbd_connect(mdev);
4236  if (h == 0) {
4237  drbd_disconnect(mdev);
4239  }
4240  if (h == -1) {
4241  dev_warn(DEV, "Discarding network configuration.\n");
4242  drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4243  }
4244  } while (h == 0);
4245 
4246  if (h > 0) {
4247  if (get_net_conf(mdev)) {
4248  drbdd(mdev);
4249  put_net_conf(mdev);
4250  }
4251  }
4252 
4253  drbd_disconnect(mdev);
4254 
4255  dev_info(DEV, "receiver terminated\n");
4256  return 0;
4257 }
4258 
4259 /* ********* acknowledge sender ******** */
4260 
4261 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4262 {
4263  struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4264 
4265  int retcode = be32_to_cpu(p->retcode);
4266 
4267  if (retcode >= SS_SUCCESS) {
4268  set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4269  } else {
4270  set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4271  dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4272  drbd_set_st_err_str(retcode), retcode);
4273  }
4274  wake_up(&mdev->state_wait);
4275 
4276  return true;
4277 }
4278 
4279 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4280 {
4281  return drbd_send_ping_ack(mdev);
4282 
4283 }
4284 
4285 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4286 {
4287  /* restore idle timeout */
4288  mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4289  if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4290  wake_up(&mdev->misc_wait);
4291 
4292  return true;
4293 }
4294 
4295 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4296 {
4297  struct p_block_ack *p = (struct p_block_ack *)h;
4298  sector_t sector = be64_to_cpu(p->sector);
4299  int blksize = be32_to_cpu(p->blksize);
4300 
4301  D_ASSERT(mdev->agreed_pro_version >= 89);
4302 
4303  update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4304 
4305  if (get_ldev(mdev)) {
4306  drbd_rs_complete_io(mdev, sector);
4307  drbd_set_in_sync(mdev, sector, blksize);
4308  /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4309  mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4310  put_ldev(mdev);
4311  }
4312  dec_rs_pending(mdev);
4313  atomic_add(blksize >> 9, &mdev->rs_sect_in);
4314 
4315  return true;
4316 }
4317 
4318 /* when we receive the ACK for a write request,
4319  * verify that we actually know about it */
4320 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4321  u64 id, sector_t sector)
4322 {
4323  struct hlist_head *slot = tl_hash_slot(mdev, sector);
4324  struct hlist_node *n;
4325  struct drbd_request *req;
4326 
4327  hlist_for_each_entry(req, n, slot, collision) {
4328  if ((unsigned long)req == (unsigned long)id) {
4329  if (req->sector != sector) {
4330  dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4331  "wrong sector (%llus versus %llus)\n", req,
4332  (unsigned long long)req->sector,
4333  (unsigned long long)sector);
4334  break;
4335  }
4336  return req;
4337  }
4338  }
4339  return NULL;
4340 }
4341 
4342 typedef struct drbd_request *(req_validator_fn)
4343  (struct drbd_conf *mdev, u64 id, sector_t sector);
4344 
4345 static int validate_req_change_req_state(struct drbd_conf *mdev,
4346  u64 id, sector_t sector, req_validator_fn validator,
4347  const char *func, enum drbd_req_event what)
4348 {
4349  struct drbd_request *req;
4350  struct bio_and_error m;
4351 
4352  spin_lock_irq(&mdev->req_lock);
4353  req = validator(mdev, id, sector);
4354  if (unlikely(!req)) {
4355  spin_unlock_irq(&mdev->req_lock);
4356 
4357  dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,
4358  (void *)(unsigned long)id, (unsigned long long)sector);
4359  return false;
4360  }
4361  __req_mod(req, what, &m);
4362  spin_unlock_irq(&mdev->req_lock);
4363 
4364  if (m.bio)
4365  complete_master_bio(mdev, &m);
4366  return true;
4367 }
4368 
4369 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4370 {
4371  struct p_block_ack *p = (struct p_block_ack *)h;
4372  sector_t sector = be64_to_cpu(p->sector);
4373  int blksize = be32_to_cpu(p->blksize);
4374  enum drbd_req_event what;
4375 
4376  update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4377 
4378  if (is_syncer_block_id(p->block_id)) {
4379  drbd_set_in_sync(mdev, sector, blksize);
4380  dec_rs_pending(mdev);
4381  return true;
4382  }
4383  switch (be16_to_cpu(h->command)) {
4384  case P_RS_WRITE_ACK:
4385  D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4387  break;
4388  case P_WRITE_ACK:
4389  D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4390  what = write_acked_by_peer;
4391  break;
4392  case P_RECV_ACK:
4393  D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4394  what = recv_acked_by_peer;
4395  break;
4396  case P_DISCARD_ACK:
4397  D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4399  break;
4400  default:
4401  D_ASSERT(0);
4402  return false;
4403  }
4404 
4405  return validate_req_change_req_state(mdev, p->block_id, sector,
4406  _ack_id_to_req, __func__ , what);
4407 }
4408 
4409 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4410 {
4411  struct p_block_ack *p = (struct p_block_ack *)h;
4412  sector_t sector = be64_to_cpu(p->sector);
4413  int size = be32_to_cpu(p->blksize);
4414  struct drbd_request *req;
4415  struct bio_and_error m;
4416 
4417  update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4418 
4419  if (is_syncer_block_id(p->block_id)) {
4420  dec_rs_pending(mdev);
4421  drbd_rs_failed_io(mdev, sector, size);
4422  return true;
4423  }
4424 
4425  spin_lock_irq(&mdev->req_lock);
4426  req = _ack_id_to_req(mdev, p->block_id, sector);
4427  if (!req) {
4428  spin_unlock_irq(&mdev->req_lock);
4429  if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4430  mdev->net_conf->wire_protocol == DRBD_PROT_B) {
4431  /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4432  The master bio might already be completed, therefore the
4433  request is no longer in the collision hash.
4434  => Do not try to validate block_id as request. */
4435  /* In Protocol B we might already have got a P_RECV_ACK
4436  but then get a P_NEG_ACK after wards. */
4437  drbd_set_out_of_sync(mdev, sector, size);
4438  return true;
4439  } else {
4440  dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,
4441  (void *)(unsigned long)p->block_id, (unsigned long long)sector);
4442  return false;
4443  }
4444  }
4445  __req_mod(req, neg_acked, &m);
4446  spin_unlock_irq(&mdev->req_lock);
4447 
4448  if (m.bio)
4449  complete_master_bio(mdev, &m);
4450  return true;
4451 }
4452 
4453 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4454 {
4455  struct p_block_ack *p = (struct p_block_ack *)h;
4456  sector_t sector = be64_to_cpu(p->sector);
4457 
4458  update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4459  dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4460  (unsigned long long)sector, be32_to_cpu(p->blksize));
4461 
4462  return validate_req_change_req_state(mdev, p->block_id, sector,
4463  _ar_id_to_req, __func__ , neg_acked);
4464 }
4465 
4466 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4467 {
4468  sector_t sector;
4469  int size;
4470  struct p_block_ack *p = (struct p_block_ack *)h;
4471 
4472  sector = be64_to_cpu(p->sector);
4473  size = be32_to_cpu(p->blksize);
4474 
4475  update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4476 
4477  dec_rs_pending(mdev);
4478 
4479  if (get_ldev_if_state(mdev, D_FAILED)) {
4480  drbd_rs_complete_io(mdev, sector);
4481  switch (be16_to_cpu(h->command)) {
4482  case P_NEG_RS_DREPLY:
4483  drbd_rs_failed_io(mdev, sector, size);
4484  case P_RS_CANCEL:
4485  break;
4486  default:
4487  D_ASSERT(0);
4488  put_ldev(mdev);
4489  return false;
4490  }
4491  put_ldev(mdev);
4492  }
4493 
4494  return true;
4495 }
4496 
4497 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4498 {
4499  struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4500 
4501  tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4502 
4503  if (mdev->state.conn == C_AHEAD &&
4504  atomic_read(&mdev->ap_in_flight) == 0 &&
4506  mdev->start_resync_timer.expires = jiffies + HZ;
4507  add_timer(&mdev->start_resync_timer);
4508  }
4509 
4510  return true;
4511 }
4512 
4513 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4514 {
4515  struct p_block_ack *p = (struct p_block_ack *)h;
4516  struct drbd_work *w;
4517  sector_t sector;
4518  int size;
4519 
4520  sector = be64_to_cpu(p->sector);
4521  size = be32_to_cpu(p->blksize);
4522 
4523  update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4524 
4525  if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4526  drbd_ov_oos_found(mdev, sector, size);
4527  else
4528  ov_oos_print(mdev);
4529 
4530  if (!get_ldev(mdev))
4531  return true;
4532 
4533  drbd_rs_complete_io(mdev, sector);
4534  dec_rs_pending(mdev);
4535 
4536  --mdev->ov_left;
4537 
4538  /* let's advance progress step marks only for every other megabyte */
4539  if ((mdev->ov_left & 0x200) == 0x200)
4540  drbd_advance_rs_marks(mdev, mdev->ov_left);
4541 
4542  if (mdev->ov_left == 0) {
4543  w = kmalloc(sizeof(*w), GFP_NOIO);
4544  if (w) {
4545  w->cb = w_ov_finished;
4546  drbd_queue_work_front(&mdev->data.work, w);
4547  } else {
4548  dev_err(DEV, "kmalloc(w) failed.");
4549  ov_oos_print(mdev);
4550  drbd_resync_finished(mdev);
4551  }
4552  }
4553  put_ldev(mdev);
4554  return true;
4555 }
4556 
4557 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4558 {
4559  return true;
4560 }
4561 
4562 struct asender_cmd {
4563  size_t pkt_size;
4564  int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4565 };
4566 
4567 static struct asender_cmd *get_asender_cmd(int cmd)
4568 {
4569  static struct asender_cmd asender_tbl[] = {
4570  /* anything missing from this table is in
4571  * the drbd_cmd_handler (drbd_default_handler) table,
4572  * see the beginning of drbdd() */
4573  [P_PING] = { sizeof(struct p_header80), got_Ping },
4574  [P_PING_ACK] = { sizeof(struct p_header80), got_PingAck },
4575  [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4576  [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4577  [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4578  [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4579  [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4580  [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4581  [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4582  [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4583  [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4584  [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4585  [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
4586  [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
4587  [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply},
4588  [P_MAX_CMD] = { 0, NULL },
4589  };
4590  if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4591  return NULL;
4592  return &asender_tbl[cmd];
4593 }
4594 
4595 int drbd_asender(struct drbd_thread *thi)
4596 {
4597  struct drbd_conf *mdev = thi->mdev;
4598  struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4599  struct asender_cmd *cmd = NULL;
4600 
4601  int rv, len;
4602  void *buf = h;
4603  int received = 0;
4604  int expect = sizeof(struct p_header80);
4605  int empty;
4606  int ping_timeout_active = 0;
4607 
4608  sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4609 
4610  current->policy = SCHED_RR; /* Make this a realtime task! */
4611  current->rt_priority = 2; /* more important than all other tasks */
4612 
4613  while (get_t_state(thi) == Running) {
4615  if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4616  ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4617  mdev->meta.socket->sk->sk_rcvtimeo =
4618  mdev->net_conf->ping_timeo*HZ/10;
4619  ping_timeout_active = 1;
4620  }
4621 
4622  /* conditionally cork;
4623  * it may hurt latency if we cork without much to send */
4624  if (!mdev->net_conf->no_cork &&
4625  3 < atomic_read(&mdev->unacked_cnt))
4626  drbd_tcp_cork(mdev->meta.socket);
4627  while (1) {
4628  clear_bit(SIGNAL_ASENDER, &mdev->flags);
4630  if (!drbd_process_done_ee(mdev))
4631  goto reconnect;
4632  /* to avoid race with newly queued ACKs */
4633  set_bit(SIGNAL_ASENDER, &mdev->flags);
4634  spin_lock_irq(&mdev->req_lock);
4635  empty = list_empty(&mdev->done_ee);
4636  spin_unlock_irq(&mdev->req_lock);
4637  /* new ack may have been queued right here,
4638  * but then there is also a signal pending,
4639  * and we start over... */
4640  if (empty)
4641  break;
4642  }
4643  /* but unconditionally uncork unless disabled */
4644  if (!mdev->net_conf->no_cork)
4645  drbd_tcp_uncork(mdev->meta.socket);
4646 
4647  /* short circuit, recv_msg would return EINTR anyways. */
4648  if (signal_pending(current))
4649  continue;
4650 
4651  rv = drbd_recv_short(mdev, mdev->meta.socket,
4652  buf, expect-received, 0);
4653  clear_bit(SIGNAL_ASENDER, &mdev->flags);
4654 
4656 
4657  /* Note:
4658  * -EINTR (on meta) we got a signal
4659  * -EAGAIN (on meta) rcvtimeo expired
4660  * -ECONNRESET other side closed the connection
4661  * -ERESTARTSYS (on data) we got a signal
4662  * rv < 0 other than above: unexpected error!
4663  * rv == expected: full header or command
4664  * rv < expected: "woken" by signal during receive
4665  * rv == 0 : "connection shut down by peer"
4666  */
4667  if (likely(rv > 0)) {
4668  received += rv;
4669  buf += rv;
4670  } else if (rv == 0) {
4671  dev_err(DEV, "meta connection shut down by peer.\n");
4672  goto reconnect;
4673  } else if (rv == -EAGAIN) {
4674  /* If the data socket received something meanwhile,
4675  * that is good enough: peer is still alive. */
4676  if (time_after(mdev->last_received,
4677  jiffies - mdev->meta.socket->sk->sk_rcvtimeo))
4678  continue;
4679  if (ping_timeout_active) {
4680  dev_err(DEV, "PingAck did not arrive in time.\n");
4681  goto reconnect;
4682  }
4683  set_bit(SEND_PING, &mdev->flags);
4684  continue;
4685  } else if (rv == -EINTR) {
4686  continue;
4687  } else {
4688  dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4689  goto reconnect;
4690  }
4691 
4692  if (received == expect && cmd == NULL) {
4693  if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4694  dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4695  be32_to_cpu(h->magic),
4696  be16_to_cpu(h->command),
4697  be16_to_cpu(h->length));
4698  goto reconnect;
4699  }
4700  cmd = get_asender_cmd(be16_to_cpu(h->command));
4701  len = be16_to_cpu(h->length);
4702  if (unlikely(cmd == NULL)) {
4703  dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4704  be32_to_cpu(h->magic),
4705  be16_to_cpu(h->command),
4706  be16_to_cpu(h->length));
4707  goto disconnect;
4708  }
4709  expect = cmd->pkt_size;
4710  ERR_IF(len != expect-sizeof(struct p_header80))
4711  goto reconnect;
4712  }
4713  if (received == expect) {
4714  mdev->last_received = jiffies;
4715  D_ASSERT(cmd != NULL);
4716  if (!cmd->process(mdev, h))
4717  goto reconnect;
4718 
4719  /* the idle_timeout (ping-int)
4720  * has been restored in got_PingAck() */
4721  if (cmd == get_asender_cmd(P_PING_ACK))
4722  ping_timeout_active = 0;
4723 
4724  buf = h;
4725  received = 0;
4726  expect = sizeof(struct p_header80);
4727  cmd = NULL;
4728  }
4729  }
4730 
4731  if (0) {
4732 reconnect:
4733  drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4734  drbd_md_sync(mdev);
4735  }
4736  if (0) {
4737 disconnect:
4738  drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4739  drbd_md_sync(mdev);
4740  }
4741  clear_bit(SIGNAL_ASENDER, &mdev->flags);
4742 
4743  D_ASSERT(mdev->state.conn < C_CONNECTED);
4744  dev_info(DEV, "asender terminated\n");
4745 
4746  return 0;
4747 }