1/*
2   drbd_receiver.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
47#include "drbd_protocol.h"
48#include "drbd_req.h"
49#include "drbd_vli.h"
50
51#define PRO_FEATURES (FF_TRIM)
52
53struct packet_info {
54	enum drbd_packet cmd;
55	unsigned int size;
56	unsigned int vnr;
57	void *data;
58};
59
60enum finish_epoch {
61	FE_STILL_LIVE,
62	FE_DESTROYED,
63	FE_RECYCLED,
64};
65
66static int drbd_do_features(struct drbd_connection *connection);
67static int drbd_do_auth(struct drbd_connection *connection);
68static int drbd_disconnected(struct drbd_peer_device *);
69static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71static int e_end_block(struct drbd_work *, int);
72
73
74#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
75
76/*
77 * some helper functions to deal with single linked page lists,
78 * page->private being our "next" pointer.
79 */
80
81/* If at least n pages are linked at head, get n pages off.
82 * Otherwise, don't modify head, and return NULL.
83 * Locking is the responsibility of the caller.
84 */
85static struct page *page_chain_del(struct page **head, int n)
86{
87	struct page *page;
88	struct page *tmp;
89
90	BUG_ON(!n);
91	BUG_ON(!head);
92
93	page = *head;
94
95	if (!page)
96		return NULL;
97
98	while (page) {
99		tmp = page_chain_next(page);
100		if (--n == 0)
101			break; /* found sufficient pages */
102		if (tmp == NULL)
103			/* insufficient pages, don't use any of them. */
104			return NULL;
105		page = tmp;
106	}
107
108	/* add end of list marker for the returned list */
109	set_page_private(page, 0);
110	/* actual return value, and adjustment of head */
111	page = *head;
112	*head = tmp;
113	return page;
114}
115
116/* may be used outside of locks to find the tail of a (usually short)
117 * "private" page chain, before adding it back to a global chain head
118 * with page_chain_add() under a spinlock. */
119static struct page *page_chain_tail(struct page *page, int *len)
120{
121	struct page *tmp;
122	int i = 1;
123	while ((tmp = page_chain_next(page)))
124		++i, page = tmp;
125	if (len)
126		*len = i;
127	return page;
128}
129
130static int page_chain_free(struct page *page)
131{
132	struct page *tmp;
133	int i = 0;
134	page_chain_for_each_safe(page, tmp) {
135		put_page(page);
136		++i;
137	}
138	return i;
139}
140
141static void page_chain_add(struct page **head,
142		struct page *chain_first, struct page *chain_last)
143{
144#if 1
145	struct page *tmp;
146	tmp = page_chain_tail(chain_first, NULL);
147	BUG_ON(tmp != chain_last);
148#endif
149
150	/* add chain to head */
151	set_page_private(chain_last, (unsigned long)*head);
152	*head = chain_first;
153}
154
155static struct page *__drbd_alloc_pages(struct drbd_device *device,
156				       unsigned int number)
157{
158	struct page *page = NULL;
159	struct page *tmp = NULL;
160	unsigned int i = 0;
161
162	/* Yes, testing drbd_pp_vacant outside the lock is racy.
163	 * So what. It saves a spin_lock. */
164	if (drbd_pp_vacant >= number) {
165		spin_lock(&drbd_pp_lock);
166		page = page_chain_del(&drbd_pp_pool, number);
167		if (page)
168			drbd_pp_vacant -= number;
169		spin_unlock(&drbd_pp_lock);
170		if (page)
171			return page;
172	}
173
174	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175	 * "criss-cross" setup, that might cause write-out on some other DRBD,
176	 * which in turn might block on the other node at this very place.  */
177	for (i = 0; i < number; i++) {
178		tmp = alloc_page(GFP_TRY);
179		if (!tmp)
180			break;
181		set_page_private(tmp, (unsigned long)page);
182		page = tmp;
183	}
184
185	if (i == number)
186		return page;
187
188	/* Not enough pages immediately available this time.
189	 * No need to jump around here, drbd_alloc_pages will retry this
190	 * function "soon". */
191	if (page) {
192		tmp = page_chain_tail(page, NULL);
193		spin_lock(&drbd_pp_lock);
194		page_chain_add(&drbd_pp_pool, page, tmp);
195		drbd_pp_vacant += i;
196		spin_unlock(&drbd_pp_lock);
197	}
198	return NULL;
199}
200
201static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202					   struct list_head *to_be_freed)
203{
204	struct drbd_peer_request *peer_req, *tmp;
205
206	/* The EEs are always appended to the end of the list. Since
207	   they are sent in order over the wire, they have to finish
208	   in order. As soon as we see the first not finished we can
209	   stop to examine the list... */
210
211	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212		if (drbd_peer_req_has_active_page(peer_req))
213			break;
214		list_move(&peer_req->w.list, to_be_freed);
215	}
216}
217
218static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
219{
220	LIST_HEAD(reclaimed);
221	struct drbd_peer_request *peer_req, *t;
222
223	spin_lock_irq(&device->resource->req_lock);
224	reclaim_finished_net_peer_reqs(device, &reclaimed);
225	spin_unlock_irq(&device->resource->req_lock);
226
227	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228		drbd_free_net_peer_req(device, peer_req);
229}
230
231/**
232 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233 * @device:	DRBD device.
234 * @number:	number of pages requested
235 * @retry:	whether to retry, if not enough pages are available right now
236 *
237 * Tries to allocate number pages, first from our own page pool, then from
238 * the kernel.
239 * Possibly retry until DRBD frees sufficient pages somewhere else.
240 *
241 * If this allocation would exceed the max_buffers setting, we throttle
242 * allocation (schedule_timeout) to give the system some room to breathe.
243 *
244 * We do not use max-buffers as hard limit, because it could lead to
245 * congestion and further to a distributed deadlock during online-verify or
246 * (checksum based) resync, if the max-buffers, socket buffer sizes and
247 * resync-rate settings are mis-configured.
248 *
249 * Returns a page chain linked via page->private.
250 */
251struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
252			      bool retry)
253{
254	struct drbd_device *device = peer_device->device;
255	struct page *page = NULL;
256	struct net_conf *nc;
257	DEFINE_WAIT(wait);
258	unsigned int mxb;
259
260	rcu_read_lock();
261	nc = rcu_dereference(peer_device->connection->net_conf);
262	mxb = nc ? nc->max_buffers : 1000000;
263	rcu_read_unlock();
264
265	if (atomic_read(&device->pp_in_use) < mxb)
266		page = __drbd_alloc_pages(device, number);
267
268	while (page == NULL) {
269		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
270
271		drbd_kick_lo_and_reclaim_net(device);
272
273		if (atomic_read(&device->pp_in_use) < mxb) {
274			page = __drbd_alloc_pages(device, number);
275			if (page)
276				break;
277		}
278
279		if (!retry)
280			break;
281
282		if (signal_pending(current)) {
283			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
284			break;
285		}
286
287		if (schedule_timeout(HZ/10) == 0)
288			mxb = UINT_MAX;
289	}
290	finish_wait(&drbd_pp_wait, &wait);
291
292	if (page)
293		atomic_add(number, &device->pp_in_use);
294	return page;
295}
296
297/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
298 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
299 * Either links the page chain back to the global pool,
300 * or returns all pages to the system. */
301static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
302{
303	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
304	int i;
305
306	if (page == NULL)
307		return;
308
309	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
310		i = page_chain_free(page);
311	else {
312		struct page *tmp;
313		tmp = page_chain_tail(page, &i);
314		spin_lock(&drbd_pp_lock);
315		page_chain_add(&drbd_pp_pool, page, tmp);
316		drbd_pp_vacant += i;
317		spin_unlock(&drbd_pp_lock);
318	}
319	i = atomic_sub_return(i, a);
320	if (i < 0)
321		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
322			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
323	wake_up(&drbd_pp_wait);
324}
325
326/*
327You need to hold the req_lock:
328 _drbd_wait_ee_list_empty()
329
330You must not have the req_lock:
331 drbd_free_peer_req()
332 drbd_alloc_peer_req()
333 drbd_free_peer_reqs()
334 drbd_ee_fix_bhs()
335 drbd_finish_peer_reqs()
336 drbd_clear_done_ee()
337 drbd_wait_ee_list_empty()
338*/
339
340struct drbd_peer_request *
341drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
342		    unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
343{
344	struct drbd_device *device = peer_device->device;
345	struct drbd_peer_request *peer_req;
346	struct page *page = NULL;
347	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
348
349	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
350		return NULL;
351
352	peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
353	if (!peer_req) {
354		if (!(gfp_mask & __GFP_NOWARN))
355			drbd_err(device, "%s: allocation failed\n", __func__);
356		return NULL;
357	}
358
359	if (has_payload && data_size) {
360		page = drbd_alloc_pages(peer_device, nr_pages,
361					gfpflags_allow_blocking(gfp_mask));
362		if (!page)
363			goto fail;
364	}
365
366	memset(peer_req, 0, sizeof(*peer_req));
367	INIT_LIST_HEAD(&peer_req->w.list);
368	drbd_clear_interval(&peer_req->i);
369	peer_req->i.size = data_size;
370	peer_req->i.sector = sector;
371	peer_req->submit_jif = jiffies;
372	peer_req->peer_device = peer_device;
373	peer_req->pages = page;
374	/*
375	 * The block_id is opaque to the receiver.  It is not endianness
376	 * converted, and sent back to the sender unchanged.
377	 */
378	peer_req->block_id = id;
379
380	return peer_req;
381
382 fail:
383	mempool_free(peer_req, drbd_ee_mempool);
384	return NULL;
385}
386
387void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
388		       int is_net)
389{
390	might_sleep();
391	if (peer_req->flags & EE_HAS_DIGEST)
392		kfree(peer_req->digest);
393	drbd_free_pages(device, peer_req->pages, is_net);
394	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
395	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
396	if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
397		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
398		drbd_al_complete_io(device, &peer_req->i);
399	}
400	mempool_free(peer_req, drbd_ee_mempool);
401}
402
403int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
404{
405	LIST_HEAD(work_list);
406	struct drbd_peer_request *peer_req, *t;
407	int count = 0;
408	int is_net = list == &device->net_ee;
409
410	spin_lock_irq(&device->resource->req_lock);
411	list_splice_init(list, &work_list);
412	spin_unlock_irq(&device->resource->req_lock);
413
414	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
415		__drbd_free_peer_req(device, peer_req, is_net);
416		count++;
417	}
418	return count;
419}
420
421/*
422 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
423 */
424static int drbd_finish_peer_reqs(struct drbd_device *device)
425{
426	LIST_HEAD(work_list);
427	LIST_HEAD(reclaimed);
428	struct drbd_peer_request *peer_req, *t;
429	int err = 0;
430
431	spin_lock_irq(&device->resource->req_lock);
432	reclaim_finished_net_peer_reqs(device, &reclaimed);
433	list_splice_init(&device->done_ee, &work_list);
434	spin_unlock_irq(&device->resource->req_lock);
435
436	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
437		drbd_free_net_peer_req(device, peer_req);
438
439	/* possible callbacks here:
440	 * e_end_block, and e_end_resync_block, e_send_superseded.
441	 * all ignore the last argument.
442	 */
443	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444		int err2;
445
446		/* list_del not necessary, next/prev members not touched */
447		err2 = peer_req->w.cb(&peer_req->w, !!err);
448		if (!err)
449			err = err2;
450		drbd_free_peer_req(device, peer_req);
451	}
452	wake_up(&device->ee_wait);
453
454	return err;
455}
456
457static void _drbd_wait_ee_list_empty(struct drbd_device *device,
458				     struct list_head *head)
459{
460	DEFINE_WAIT(wait);
461
462	/* avoids spin_lock/unlock
463	 * and calling prepare_to_wait in the fast path */
464	while (!list_empty(head)) {
465		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
466		spin_unlock_irq(&device->resource->req_lock);
467		io_schedule();
468		finish_wait(&device->ee_wait, &wait);
469		spin_lock_irq(&device->resource->req_lock);
470	}
471}
472
473static void drbd_wait_ee_list_empty(struct drbd_device *device,
474				    struct list_head *head)
475{
476	spin_lock_irq(&device->resource->req_lock);
477	_drbd_wait_ee_list_empty(device, head);
478	spin_unlock_irq(&device->resource->req_lock);
479}
480
481static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
482{
483	struct kvec iov = {
484		.iov_base = buf,
485		.iov_len = size,
486	};
487	struct msghdr msg = {
488		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
489	};
490	return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
491}
492
493static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
494{
495	int rv;
496
497	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
498
499	if (rv < 0) {
500		if (rv == -ECONNRESET)
501			drbd_info(connection, "sock was reset by peer\n");
502		else if (rv != -ERESTARTSYS)
503			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
504	} else if (rv == 0) {
505		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
506			long t;
507			rcu_read_lock();
508			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
509			rcu_read_unlock();
510
511			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
512
513			if (t)
514				goto out;
515		}
516		drbd_info(connection, "sock was shut down by peer\n");
517	}
518
519	if (rv != size)
520		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
521
522out:
523	return rv;
524}
525
526static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
527{
528	int err;
529
530	err = drbd_recv(connection, buf, size);
531	if (err != size) {
532		if (err >= 0)
533			err = -EIO;
534	} else
535		err = 0;
536	return err;
537}
538
539static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
540{
541	int err;
542
543	err = drbd_recv_all(connection, buf, size);
544	if (err && !signal_pending(current))
545		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
546	return err;
547}
548
549/* quoting tcp(7):
550 *   On individual connections, the socket buffer size must be set prior to the
551 *   listen(2) or connect(2) calls in order to have it take effect.
552 * This is our wrapper to do so.
553 */
554static void drbd_setbufsize(struct socket *sock, unsigned int snd,
555		unsigned int rcv)
556{
557	/* open coded SO_SNDBUF, SO_RCVBUF */
558	if (snd) {
559		sock->sk->sk_sndbuf = snd;
560		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
561	}
562	if (rcv) {
563		sock->sk->sk_rcvbuf = rcv;
564		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
565	}
566}
567
568static struct socket *drbd_try_connect(struct drbd_connection *connection)
569{
570	const char *what;
571	struct socket *sock;
572	struct sockaddr_in6 src_in6;
573	struct sockaddr_in6 peer_in6;
574	struct net_conf *nc;
575	int err, peer_addr_len, my_addr_len;
576	int sndbuf_size, rcvbuf_size, connect_int;
577	int disconnect_on_error = 1;
578
579	rcu_read_lock();
580	nc = rcu_dereference(connection->net_conf);
581	if (!nc) {
582		rcu_read_unlock();
583		return NULL;
584	}
585	sndbuf_size = nc->sndbuf_size;
586	rcvbuf_size = nc->rcvbuf_size;
587	connect_int = nc->connect_int;
588	rcu_read_unlock();
589
590	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
591	memcpy(&src_in6, &connection->my_addr, my_addr_len);
592
593	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
594		src_in6.sin6_port = 0;
595	else
596		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
597
598	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
599	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
600
601	what = "sock_create_kern";
602	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
603			       SOCK_STREAM, IPPROTO_TCP, &sock);
604	if (err < 0) {
605		sock = NULL;
606		goto out;
607	}
608
609	sock->sk->sk_rcvtimeo =
610	sock->sk->sk_sndtimeo = connect_int * HZ;
611	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
612
613       /* explicitly bind to the configured IP as source IP
614	*  for the outgoing connections.
615	*  This is needed for multihomed hosts and to be
616	*  able to use lo: interfaces for drbd.
617	* Make sure to use 0 as port number, so linux selects
618	*  a free one dynamically.
619	*/
620	what = "bind before connect";
621	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
622	if (err < 0)
623		goto out;
624
625	/* connect may fail, peer not yet available.
626	 * stay C_WF_CONNECTION, don't go Disconnecting! */
627	disconnect_on_error = 0;
628	what = "connect";
629	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
630
631out:
632	if (err < 0) {
633		if (sock) {
634			sock_release(sock);
635			sock = NULL;
636		}
637		switch (-err) {
638			/* timeout, busy, signal pending */
639		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
640		case EINTR: case ERESTARTSYS:
641			/* peer not (yet) available, network problem */
642		case ECONNREFUSED: case ENETUNREACH:
643		case EHOSTDOWN:    case EHOSTUNREACH:
644			disconnect_on_error = 0;
645			break;
646		default:
647			drbd_err(connection, "%s failed, err = %d\n", what, err);
648		}
649		if (disconnect_on_error)
650			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
651	}
652
653	return sock;
654}
655
656struct accept_wait_data {
657	struct drbd_connection *connection;
658	struct socket *s_listen;
659	struct completion door_bell;
660	void (*original_sk_state_change)(struct sock *sk);
661
662};
663
664static void drbd_incoming_connection(struct sock *sk)
665{
666	struct accept_wait_data *ad = sk->sk_user_data;
667	void (*state_change)(struct sock *sk);
668
669	state_change = ad->original_sk_state_change;
670	if (sk->sk_state == TCP_ESTABLISHED)
671		complete(&ad->door_bell);
672	state_change(sk);
673}
674
675static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
676{
677	int err, sndbuf_size, rcvbuf_size, my_addr_len;
678	struct sockaddr_in6 my_addr;
679	struct socket *s_listen;
680	struct net_conf *nc;
681	const char *what;
682
683	rcu_read_lock();
684	nc = rcu_dereference(connection->net_conf);
685	if (!nc) {
686		rcu_read_unlock();
687		return -EIO;
688	}
689	sndbuf_size = nc->sndbuf_size;
690	rcvbuf_size = nc->rcvbuf_size;
691	rcu_read_unlock();
692
693	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
694	memcpy(&my_addr, &connection->my_addr, my_addr_len);
695
696	what = "sock_create_kern";
697	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
698			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
699	if (err) {
700		s_listen = NULL;
701		goto out;
702	}
703
704	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
705	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
706
707	what = "bind before listen";
708	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
709	if (err < 0)
710		goto out;
711
712	ad->s_listen = s_listen;
713	write_lock_bh(&s_listen->sk->sk_callback_lock);
714	ad->original_sk_state_change = s_listen->sk->sk_state_change;
715	s_listen->sk->sk_state_change = drbd_incoming_connection;
716	s_listen->sk->sk_user_data = ad;
717	write_unlock_bh(&s_listen->sk->sk_callback_lock);
718
719	what = "listen";
720	err = s_listen->ops->listen(s_listen, 5);
721	if (err < 0)
722		goto out;
723
724	return 0;
725out:
726	if (s_listen)
727		sock_release(s_listen);
728	if (err < 0) {
729		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
730			drbd_err(connection, "%s failed, err = %d\n", what, err);
731			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
732		}
733	}
734
735	return -EIO;
736}
737
738static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
739{
740	write_lock_bh(&sk->sk_callback_lock);
741	sk->sk_state_change = ad->original_sk_state_change;
742	sk->sk_user_data = NULL;
743	write_unlock_bh(&sk->sk_callback_lock);
744}
745
746static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
747{
748	int timeo, connect_int, err = 0;
749	struct socket *s_estab = NULL;
750	struct net_conf *nc;
751
752	rcu_read_lock();
753	nc = rcu_dereference(connection->net_conf);
754	if (!nc) {
755		rcu_read_unlock();
756		return NULL;
757	}
758	connect_int = nc->connect_int;
759	rcu_read_unlock();
760
761	timeo = connect_int * HZ;
762	/* 28.5% random jitter */
763	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
764
765	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
766	if (err <= 0)
767		return NULL;
768
769	err = kernel_accept(ad->s_listen, &s_estab, 0);
770	if (err < 0) {
771		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
772			drbd_err(connection, "accept failed, err = %d\n", err);
773			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
774		}
775	}
776
777	if (s_estab)
778		unregister_state_change(s_estab->sk, ad);
779
780	return s_estab;
781}
782
783static int decode_header(struct drbd_connection *, void *, struct packet_info *);
784
785static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
786			     enum drbd_packet cmd)
787{
788	if (!conn_prepare_command(connection, sock))
789		return -EIO;
790	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
791}
792
793static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
794{
795	unsigned int header_size = drbd_header_size(connection);
796	struct packet_info pi;
797	struct net_conf *nc;
798	int err;
799
800	rcu_read_lock();
801	nc = rcu_dereference(connection->net_conf);
802	if (!nc) {
803		rcu_read_unlock();
804		return -EIO;
805	}
806	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
807	rcu_read_unlock();
808
809	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
810	if (err != header_size) {
811		if (err >= 0)
812			err = -EIO;
813		return err;
814	}
815	err = decode_header(connection, connection->data.rbuf, &pi);
816	if (err)
817		return err;
818	return pi.cmd;
819}
820
821/**
822 * drbd_socket_okay() - Free the socket if its connection is not okay
823 * @sock:	pointer to the pointer to the socket.
824 */
825static bool drbd_socket_okay(struct socket **sock)
826{
827	int rr;
828	char tb[4];
829
830	if (!*sock)
831		return false;
832
833	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
834
835	if (rr > 0 || rr == -EAGAIN) {
836		return true;
837	} else {
838		sock_release(*sock);
839		*sock = NULL;
840		return false;
841	}
842}
843
844static bool connection_established(struct drbd_connection *connection,
845				   struct socket **sock1,
846				   struct socket **sock2)
847{
848	struct net_conf *nc;
849	int timeout;
850	bool ok;
851
852	if (!*sock1 || !*sock2)
853		return false;
854
855	rcu_read_lock();
856	nc = rcu_dereference(connection->net_conf);
857	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
858	rcu_read_unlock();
859	schedule_timeout_interruptible(timeout);
860
861	ok = drbd_socket_okay(sock1);
862	ok = drbd_socket_okay(sock2) && ok;
863
864	return ok;
865}
866
867/* Gets called if a connection is established, or if a new minor gets created
868   in a connection */
869int drbd_connected(struct drbd_peer_device *peer_device)
870{
871	struct drbd_device *device = peer_device->device;
872	int err;
873
874	atomic_set(&device->packet_seq, 0);
875	device->peer_seq = 0;
876
877	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
878		&peer_device->connection->cstate_mutex :
879		&device->own_state_mutex;
880
881	err = drbd_send_sync_param(peer_device);
882	if (!err)
883		err = drbd_send_sizes(peer_device, 0, 0);
884	if (!err)
885		err = drbd_send_uuids(peer_device);
886	if (!err)
887		err = drbd_send_current_state(peer_device);
888	clear_bit(USE_DEGR_WFC_T, &device->flags);
889	clear_bit(RESIZE_PENDING, &device->flags);
890	atomic_set(&device->ap_in_flight, 0);
891	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
892	return err;
893}
894
895/*
896 * return values:
897 *   1 yes, we have a valid connection
898 *   0 oops, did not work out, please try again
899 *  -1 peer talks different language,
900 *     no point in trying again, please go standalone.
901 *  -2 We do not have a network config...
902 */
903static int conn_connect(struct drbd_connection *connection)
904{
905	struct drbd_socket sock, msock;
906	struct drbd_peer_device *peer_device;
907	struct net_conf *nc;
908	int vnr, timeout, h;
909	bool discard_my_data, ok;
910	enum drbd_state_rv rv;
911	struct accept_wait_data ad = {
912		.connection = connection,
913		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
914	};
915
916	clear_bit(DISCONNECT_SENT, &connection->flags);
917	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
918		return -2;
919
920	mutex_init(&sock.mutex);
921	sock.sbuf = connection->data.sbuf;
922	sock.rbuf = connection->data.rbuf;
923	sock.socket = NULL;
924	mutex_init(&msock.mutex);
925	msock.sbuf = connection->meta.sbuf;
926	msock.rbuf = connection->meta.rbuf;
927	msock.socket = NULL;
928
929	/* Assume that the peer only understands protocol 80 until we know better.  */
930	connection->agreed_pro_version = 80;
931
932	if (prepare_listen_socket(connection, &ad))
933		return 0;
934
935	do {
936		struct socket *s;
937
938		s = drbd_try_connect(connection);
939		if (s) {
940			if (!sock.socket) {
941				sock.socket = s;
942				send_first_packet(connection, &sock, P_INITIAL_DATA);
943			} else if (!msock.socket) {
944				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
945				msock.socket = s;
946				send_first_packet(connection, &msock, P_INITIAL_META);
947			} else {
948				drbd_err(connection, "Logic error in conn_connect()\n");
949				goto out_release_sockets;
950			}
951		}
952
953		if (connection_established(connection, &sock.socket, &msock.socket))
954			break;
955
956retry:
957		s = drbd_wait_for_connect(connection, &ad);
958		if (s) {
959			int fp = receive_first_packet(connection, s);
960			drbd_socket_okay(&sock.socket);
961			drbd_socket_okay(&msock.socket);
962			switch (fp) {
963			case P_INITIAL_DATA:
964				if (sock.socket) {
965					drbd_warn(connection, "initial packet S crossed\n");
966					sock_release(sock.socket);
967					sock.socket = s;
968					goto randomize;
969				}
970				sock.socket = s;
971				break;
972			case P_INITIAL_META:
973				set_bit(RESOLVE_CONFLICTS, &connection->flags);
974				if (msock.socket) {
975					drbd_warn(connection, "initial packet M crossed\n");
976					sock_release(msock.socket);
977					msock.socket = s;
978					goto randomize;
979				}
980				msock.socket = s;
981				break;
982			default:
983				drbd_warn(connection, "Error receiving initial packet\n");
984				sock_release(s);
985randomize:
986				if (prandom_u32() & 1)
987					goto retry;
988			}
989		}
990
991		if (connection->cstate <= C_DISCONNECTING)
992			goto out_release_sockets;
993		if (signal_pending(current)) {
994			flush_signals(current);
995			smp_rmb();
996			if (get_t_state(&connection->receiver) == EXITING)
997				goto out_release_sockets;
998		}
999
1000		ok = connection_established(connection, &sock.socket, &msock.socket);
1001	} while (!ok);
1002
1003	if (ad.s_listen)
1004		sock_release(ad.s_listen);
1005
1006	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1007	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1008
1009	sock.socket->sk->sk_allocation = GFP_NOIO;
1010	msock.socket->sk->sk_allocation = GFP_NOIO;
1011
1012	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1013	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1014
1015	/* NOT YET ...
1016	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1017	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1018	 * first set it to the P_CONNECTION_FEATURES timeout,
1019	 * which we set to 4x the configured ping_timeout. */
1020	rcu_read_lock();
1021	nc = rcu_dereference(connection->net_conf);
1022
1023	sock.socket->sk->sk_sndtimeo =
1024	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1025
1026	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1027	timeout = nc->timeout * HZ / 10;
1028	discard_my_data = nc->discard_my_data;
1029	rcu_read_unlock();
1030
1031	msock.socket->sk->sk_sndtimeo = timeout;
1032
1033	/* we don't want delays.
1034	 * we use TCP_CORK where appropriate, though */
1035	drbd_tcp_nodelay(sock.socket);
1036	drbd_tcp_nodelay(msock.socket);
1037
1038	connection->data.socket = sock.socket;
1039	connection->meta.socket = msock.socket;
1040	connection->last_received = jiffies;
1041
1042	h = drbd_do_features(connection);
1043	if (h <= 0)
1044		return h;
1045
1046	if (connection->cram_hmac_tfm) {
1047		/* drbd_request_state(device, NS(conn, WFAuth)); */
1048		switch (drbd_do_auth(connection)) {
1049		case -1:
1050			drbd_err(connection, "Authentication of peer failed\n");
1051			return -1;
1052		case 0:
1053			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1054			return 0;
1055		}
1056	}
1057
1058	connection->data.socket->sk->sk_sndtimeo = timeout;
1059	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1060
1061	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1062		return -1;
1063
1064	/* Prevent a race between resync-handshake and
1065	 * being promoted to Primary.
1066	 *
1067	 * Grab and release the state mutex, so we know that any current
1068	 * drbd_set_role() is finished, and any incoming drbd_set_role
1069	 * will see the STATE_SENT flag, and wait for it to be cleared.
1070	 */
1071	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1072		mutex_lock(peer_device->device->state_mutex);
1073
1074	set_bit(STATE_SENT, &connection->flags);
1075
1076	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1077		mutex_unlock(peer_device->device->state_mutex);
1078
1079	rcu_read_lock();
1080	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1081		struct drbd_device *device = peer_device->device;
1082		kref_get(&device->kref);
1083		rcu_read_unlock();
1084
1085		if (discard_my_data)
1086			set_bit(DISCARD_MY_DATA, &device->flags);
1087		else
1088			clear_bit(DISCARD_MY_DATA, &device->flags);
1089
1090		drbd_connected(peer_device);
1091		kref_put(&device->kref, drbd_destroy_device);
1092		rcu_read_lock();
1093	}
1094	rcu_read_unlock();
1095
1096	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1097	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1098		clear_bit(STATE_SENT, &connection->flags);
1099		return 0;
1100	}
1101
1102	drbd_thread_start(&connection->asender);
1103
1104	mutex_lock(&connection->resource->conf_update);
1105	/* The discard_my_data flag is a single-shot modifier to the next
1106	 * connection attempt, the handshake of which is now well underway.
1107	 * No need for rcu style copying of the whole struct
1108	 * just to clear a single value. */
1109	connection->net_conf->discard_my_data = 0;
1110	mutex_unlock(&connection->resource->conf_update);
1111
1112	return h;
1113
1114out_release_sockets:
1115	if (ad.s_listen)
1116		sock_release(ad.s_listen);
1117	if (sock.socket)
1118		sock_release(sock.socket);
1119	if (msock.socket)
1120		sock_release(msock.socket);
1121	return -1;
1122}
1123
1124static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1125{
1126	unsigned int header_size = drbd_header_size(connection);
1127
1128	if (header_size == sizeof(struct p_header100) &&
1129	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1130		struct p_header100 *h = header;
1131		if (h->pad != 0) {
1132			drbd_err(connection, "Header padding is not zero\n");
1133			return -EINVAL;
1134		}
1135		pi->vnr = be16_to_cpu(h->volume);
1136		pi->cmd = be16_to_cpu(h->command);
1137		pi->size = be32_to_cpu(h->length);
1138	} else if (header_size == sizeof(struct p_header95) &&
1139		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1140		struct p_header95 *h = header;
1141		pi->cmd = be16_to_cpu(h->command);
1142		pi->size = be32_to_cpu(h->length);
1143		pi->vnr = 0;
1144	} else if (header_size == sizeof(struct p_header80) &&
1145		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1146		struct p_header80 *h = header;
1147		pi->cmd = be16_to_cpu(h->command);
1148		pi->size = be16_to_cpu(h->length);
1149		pi->vnr = 0;
1150	} else {
1151		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1152			 be32_to_cpu(*(__be32 *)header),
1153			 connection->agreed_pro_version);
1154		return -EINVAL;
1155	}
1156	pi->data = header + header_size;
1157	return 0;
1158}
1159
1160static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1161{
1162	void *buffer = connection->data.rbuf;
1163	int err;
1164
1165	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1166	if (err)
1167		return err;
1168
1169	err = decode_header(connection, buffer, pi);
1170	connection->last_received = jiffies;
1171
1172	return err;
1173}
1174
1175static void drbd_flush(struct drbd_connection *connection)
1176{
1177	int rv;
1178	struct drbd_peer_device *peer_device;
1179	int vnr;
1180
1181	if (connection->resource->write_ordering >= WO_bdev_flush) {
1182		rcu_read_lock();
1183		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1184			struct drbd_device *device = peer_device->device;
1185
1186			if (!get_ldev(device))
1187				continue;
1188			kref_get(&device->kref);
1189			rcu_read_unlock();
1190
1191			/* Right now, we have only this one synchronous code path
1192			 * for flushes between request epochs.
1193			 * We may want to make those asynchronous,
1194			 * or at least parallelize the flushes to the volume devices.
1195			 */
1196			device->flush_jif = jiffies;
1197			set_bit(FLUSH_PENDING, &device->flags);
1198			rv = blkdev_issue_flush(device->ldev->backing_bdev,
1199					GFP_NOIO, NULL);
1200			clear_bit(FLUSH_PENDING, &device->flags);
1201			if (rv) {
1202				drbd_info(device, "local disk flush failed with status %d\n", rv);
1203				/* would rather check on EOPNOTSUPP, but that is not reliable.
1204				 * don't try again for ANY return value != 0
1205				 * if (rv == -EOPNOTSUPP) */
1206				drbd_bump_write_ordering(connection->resource, NULL, WO_drain_io);
1207			}
1208			put_ldev(device);
1209			kref_put(&device->kref, drbd_destroy_device);
1210
1211			rcu_read_lock();
1212			if (rv)
1213				break;
1214		}
1215		rcu_read_unlock();
1216	}
1217}
1218
1219/**
1220 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1221 * @device:	DRBD device.
1222 * @epoch:	Epoch object.
1223 * @ev:		Epoch event.
1224 */
1225static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1226					       struct drbd_epoch *epoch,
1227					       enum epoch_event ev)
1228{
1229	int epoch_size;
1230	struct drbd_epoch *next_epoch;
1231	enum finish_epoch rv = FE_STILL_LIVE;
1232
1233	spin_lock(&connection->epoch_lock);
1234	do {
1235		next_epoch = NULL;
1236
1237		epoch_size = atomic_read(&epoch->epoch_size);
1238
1239		switch (ev & ~EV_CLEANUP) {
1240		case EV_PUT:
1241			atomic_dec(&epoch->active);
1242			break;
1243		case EV_GOT_BARRIER_NR:
1244			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1245			break;
1246		case EV_BECAME_LAST:
1247			/* nothing to do*/
1248			break;
1249		}
1250
1251		if (epoch_size != 0 &&
1252		    atomic_read(&epoch->active) == 0 &&
1253		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1254			if (!(ev & EV_CLEANUP)) {
1255				spin_unlock(&connection->epoch_lock);
1256				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1257				spin_lock(&connection->epoch_lock);
1258			}
1259#if 0
1260			/* FIXME: dec unacked on connection, once we have
1261			 * something to count pending connection packets in. */
1262			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1263				dec_unacked(epoch->connection);
1264#endif
1265
1266			if (connection->current_epoch != epoch) {
1267				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1268				list_del(&epoch->list);
1269				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1270				connection->epochs--;
1271				kfree(epoch);
1272
1273				if (rv == FE_STILL_LIVE)
1274					rv = FE_DESTROYED;
1275			} else {
1276				epoch->flags = 0;
1277				atomic_set(&epoch->epoch_size, 0);
1278				/* atomic_set(&epoch->active, 0); is already zero */
1279				if (rv == FE_STILL_LIVE)
1280					rv = FE_RECYCLED;
1281			}
1282		}
1283
1284		if (!next_epoch)
1285			break;
1286
1287		epoch = next_epoch;
1288	} while (1);
1289
1290	spin_unlock(&connection->epoch_lock);
1291
1292	return rv;
1293}
1294
1295static enum write_ordering_e
1296max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1297{
1298	struct disk_conf *dc;
1299
1300	dc = rcu_dereference(bdev->disk_conf);
1301
1302	if (wo == WO_bdev_flush && !dc->disk_flushes)
1303		wo = WO_drain_io;
1304	if (wo == WO_drain_io && !dc->disk_drain)
1305		wo = WO_none;
1306
1307	return wo;
1308}
1309
1310/**
1311 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1312 * @connection:	DRBD connection.
1313 * @wo:		Write ordering method to try.
1314 */
1315void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1316			      enum write_ordering_e wo)
1317{
1318	struct drbd_device *device;
1319	enum write_ordering_e pwo;
1320	int vnr;
1321	static char *write_ordering_str[] = {
1322		[WO_none] = "none",
1323		[WO_drain_io] = "drain",
1324		[WO_bdev_flush] = "flush",
1325	};
1326
1327	pwo = resource->write_ordering;
1328	if (wo != WO_bdev_flush)
1329		wo = min(pwo, wo);
1330	rcu_read_lock();
1331	idr_for_each_entry(&resource->devices, device, vnr) {
1332		if (get_ldev(device)) {
1333			wo = max_allowed_wo(device->ldev, wo);
1334			if (device->ldev == bdev)
1335				bdev = NULL;
1336			put_ldev(device);
1337		}
1338	}
1339
1340	if (bdev)
1341		wo = max_allowed_wo(bdev, wo);
1342
1343	rcu_read_unlock();
1344
1345	resource->write_ordering = wo;
1346	if (pwo != resource->write_ordering || wo == WO_bdev_flush)
1347		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1348}
1349
1350/**
1351 * drbd_submit_peer_request()
1352 * @device:	DRBD device.
1353 * @peer_req:	peer request
1354 * @rw:		flag field, see bio->bi_rw
1355 *
1356 * May spread the pages to multiple bios,
1357 * depending on bio_add_page restrictions.
1358 *
1359 * Returns 0 if all bios have been submitted,
1360 * -ENOMEM if we could not allocate enough bios,
1361 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1362 *  single page to an empty bio (which should never happen and likely indicates
1363 *  that the lower level IO stack is in some way broken). This has been observed
1364 *  on certain Xen deployments.
1365 */
1366/* TODO allocate from our own bio_set. */
1367int drbd_submit_peer_request(struct drbd_device *device,
1368			     struct drbd_peer_request *peer_req,
1369			     const unsigned rw, const int fault_type)
1370{
1371	struct bio *bios = NULL;
1372	struct bio *bio;
1373	struct page *page = peer_req->pages;
1374	sector_t sector = peer_req->i.sector;
1375	unsigned data_size = peer_req->i.size;
1376	unsigned n_bios = 0;
1377	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1378	int err = -ENOMEM;
1379
1380	if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1381		/* wait for all pending IO completions, before we start
1382		 * zeroing things out. */
1383		conn_wait_active_ee_empty(first_peer_device(device)->connection);
1384		/* add it to the active list now,
1385		 * so we can find it to present it in debugfs */
1386		peer_req->submit_jif = jiffies;
1387		peer_req->flags |= EE_SUBMITTED;
1388		spin_lock_irq(&device->resource->req_lock);
1389		list_add_tail(&peer_req->w.list, &device->active_ee);
1390		spin_unlock_irq(&device->resource->req_lock);
1391		if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1392			sector, data_size >> 9, GFP_NOIO, false))
1393			peer_req->flags |= EE_WAS_ERROR;
1394		drbd_endio_write_sec_final(peer_req);
1395		return 0;
1396	}
1397
1398	/* Discards don't have any payload.
1399	 * But the scsi layer still expects a bio_vec it can use internally,
1400	 * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1401	if (peer_req->flags & EE_IS_TRIM)
1402		nr_pages = 1;
1403
1404	/* In most cases, we will only need one bio.  But in case the lower
1405	 * level restrictions happen to be different at this offset on this
1406	 * side than those of the sending peer, we may need to submit the
1407	 * request in more than one bio.
1408	 *
1409	 * Plain bio_alloc is good enough here, this is no DRBD internally
1410	 * generated bio, but a bio allocated on behalf of the peer.
1411	 */
1412next_bio:
1413	bio = bio_alloc(GFP_NOIO, nr_pages);
1414	if (!bio) {
1415		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1416		goto fail;
1417	}
1418	/* > peer_req->i.sector, unless this is the first bio */
1419	bio->bi_iter.bi_sector = sector;
1420	bio->bi_bdev = device->ldev->backing_bdev;
1421	bio->bi_rw = rw;
1422	bio->bi_private = peer_req;
1423	bio->bi_end_io = drbd_peer_request_endio;
1424
1425	bio->bi_next = bios;
1426	bios = bio;
1427	++n_bios;
1428
1429	if (rw & REQ_DISCARD) {
1430		bio->bi_iter.bi_size = data_size;
1431		goto submit;
1432	}
1433
1434	page_chain_for_each(page) {
1435		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1436		if (!bio_add_page(bio, page, len, 0)) {
1437			/* A single page must always be possible!
1438			 * But in case it fails anyways,
1439			 * we deal with it, and complain (below). */
1440			if (bio->bi_vcnt == 0) {
1441				drbd_err(device,
1442					"bio_add_page failed for len=%u, "
1443					"bi_vcnt=0 (bi_sector=%llu)\n",
1444					len, (uint64_t)bio->bi_iter.bi_sector);
1445				err = -ENOSPC;
1446				goto fail;
1447			}
1448			goto next_bio;
1449		}
1450		data_size -= len;
1451		sector += len >> 9;
1452		--nr_pages;
1453	}
1454	D_ASSERT(device, data_size == 0);
1455submit:
1456	D_ASSERT(device, page == NULL);
1457
1458	atomic_set(&peer_req->pending_bios, n_bios);
1459	/* for debugfs: update timestamp, mark as submitted */
1460	peer_req->submit_jif = jiffies;
1461	peer_req->flags |= EE_SUBMITTED;
1462	do {
1463		bio = bios;
1464		bios = bios->bi_next;
1465		bio->bi_next = NULL;
1466
1467		drbd_generic_make_request(device, fault_type, bio);
1468	} while (bios);
1469	return 0;
1470
1471fail:
1472	while (bios) {
1473		bio = bios;
1474		bios = bios->bi_next;
1475		bio_put(bio);
1476	}
1477	return err;
1478}
1479
1480static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1481					     struct drbd_peer_request *peer_req)
1482{
1483	struct drbd_interval *i = &peer_req->i;
1484
1485	drbd_remove_interval(&device->write_requests, i);
1486	drbd_clear_interval(i);
1487
1488	/* Wake up any processes waiting for this peer request to complete.  */
1489	if (i->waiting)
1490		wake_up(&device->misc_wait);
1491}
1492
1493static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1494{
1495	struct drbd_peer_device *peer_device;
1496	int vnr;
1497
1498	rcu_read_lock();
1499	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1500		struct drbd_device *device = peer_device->device;
1501
1502		kref_get(&device->kref);
1503		rcu_read_unlock();
1504		drbd_wait_ee_list_empty(device, &device->active_ee);
1505		kref_put(&device->kref, drbd_destroy_device);
1506		rcu_read_lock();
1507	}
1508	rcu_read_unlock();
1509}
1510
1511static struct drbd_peer_device *
1512conn_peer_device(struct drbd_connection *connection, int volume_number)
1513{
1514	return idr_find(&connection->peer_devices, volume_number);
1515}
1516
1517static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1518{
1519	int rv;
1520	struct p_barrier *p = pi->data;
1521	struct drbd_epoch *epoch;
1522
1523	/* FIXME these are unacked on connection,
1524	 * not a specific (peer)device.
1525	 */
1526	connection->current_epoch->barrier_nr = p->barrier;
1527	connection->current_epoch->connection = connection;
1528	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1529
1530	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1531	 * the activity log, which means it would not be resynced in case the
1532	 * R_PRIMARY crashes now.
1533	 * Therefore we must send the barrier_ack after the barrier request was
1534	 * completed. */
1535	switch (connection->resource->write_ordering) {
1536	case WO_none:
1537		if (rv == FE_RECYCLED)
1538			return 0;
1539
1540		/* receiver context, in the writeout path of the other node.
1541		 * avoid potential distributed deadlock */
1542		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1543		if (epoch)
1544			break;
1545		else
1546			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1547			/* Fall through */
1548
1549	case WO_bdev_flush:
1550	case WO_drain_io:
1551		conn_wait_active_ee_empty(connection);
1552		drbd_flush(connection);
1553
1554		if (atomic_read(&connection->current_epoch->epoch_size)) {
1555			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1556			if (epoch)
1557				break;
1558		}
1559
1560		return 0;
1561	default:
1562		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1563			 connection->resource->write_ordering);
1564		return -EIO;
1565	}
1566
1567	epoch->flags = 0;
1568	atomic_set(&epoch->epoch_size, 0);
1569	atomic_set(&epoch->active, 0);
1570
1571	spin_lock(&connection->epoch_lock);
1572	if (atomic_read(&connection->current_epoch->epoch_size)) {
1573		list_add(&epoch->list, &connection->current_epoch->list);
1574		connection->current_epoch = epoch;
1575		connection->epochs++;
1576	} else {
1577		/* The current_epoch got recycled while we allocated this one... */
1578		kfree(epoch);
1579	}
1580	spin_unlock(&connection->epoch_lock);
1581
1582	return 0;
1583}
1584
1585/* used from receive_RSDataReply (recv_resync_read)
1586 * and from receive_Data */
1587static struct drbd_peer_request *
1588read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1589	      struct packet_info *pi) __must_hold(local)
1590{
1591	struct drbd_device *device = peer_device->device;
1592	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1593	struct drbd_peer_request *peer_req;
1594	struct page *page;
1595	int digest_size, err;
1596	unsigned int data_size = pi->size, ds;
1597	void *dig_in = peer_device->connection->int_dig_in;
1598	void *dig_vv = peer_device->connection->int_dig_vv;
1599	unsigned long *data;
1600	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1601
1602	digest_size = 0;
1603	if (!trim && peer_device->connection->peer_integrity_tfm) {
1604		digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1605		/*
1606		 * FIXME: Receive the incoming digest into the receive buffer
1607		 *	  here, together with its struct p_data?
1608		 */
1609		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1610		if (err)
1611			return NULL;
1612		data_size -= digest_size;
1613	}
1614
1615	if (trim) {
1616		D_ASSERT(peer_device, data_size == 0);
1617		data_size = be32_to_cpu(trim->size);
1618	}
1619
1620	if (!expect(IS_ALIGNED(data_size, 512)))
1621		return NULL;
1622	/* prepare for larger trim requests. */
1623	if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1624		return NULL;
1625
1626	/* even though we trust out peer,
1627	 * we sometimes have to double check. */
1628	if (sector + (data_size>>9) > capacity) {
1629		drbd_err(device, "request from peer beyond end of local disk: "
1630			"capacity: %llus < sector: %llus + size: %u\n",
1631			(unsigned long long)capacity,
1632			(unsigned long long)sector, data_size);
1633		return NULL;
1634	}
1635
1636	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1637	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1638	 * which in turn might block on the other node at this very place.  */
1639	peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1640	if (!peer_req)
1641		return NULL;
1642
1643	peer_req->flags |= EE_WRITE;
1644	if (trim)
1645		return peer_req;
1646
1647	ds = data_size;
1648	page = peer_req->pages;
1649	page_chain_for_each(page) {
1650		unsigned len = min_t(int, ds, PAGE_SIZE);
1651		data = kmap(page);
1652		err = drbd_recv_all_warn(peer_device->connection, data, len);
1653		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1654			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1655			data[0] = data[0] ^ (unsigned long)-1;
1656		}
1657		kunmap(page);
1658		if (err) {
1659			drbd_free_peer_req(device, peer_req);
1660			return NULL;
1661		}
1662		ds -= len;
1663	}
1664
1665	if (digest_size) {
1666		drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1667		if (memcmp(dig_in, dig_vv, digest_size)) {
1668			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1669				(unsigned long long)sector, data_size);
1670			drbd_free_peer_req(device, peer_req);
1671			return NULL;
1672		}
1673	}
1674	device->recv_cnt += data_size >> 9;
1675	return peer_req;
1676}
1677
1678/* drbd_drain_block() just takes a data block
1679 * out of the socket input buffer, and discards it.
1680 */
1681static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1682{
1683	struct page *page;
1684	int err = 0;
1685	void *data;
1686
1687	if (!data_size)
1688		return 0;
1689
1690	page = drbd_alloc_pages(peer_device, 1, 1);
1691
1692	data = kmap(page);
1693	while (data_size) {
1694		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1695
1696		err = drbd_recv_all_warn(peer_device->connection, data, len);
1697		if (err)
1698			break;
1699		data_size -= len;
1700	}
1701	kunmap(page);
1702	drbd_free_pages(peer_device->device, page, 0);
1703	return err;
1704}
1705
1706static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1707			   sector_t sector, int data_size)
1708{
1709	struct bio_vec bvec;
1710	struct bvec_iter iter;
1711	struct bio *bio;
1712	int digest_size, err, expect;
1713	void *dig_in = peer_device->connection->int_dig_in;
1714	void *dig_vv = peer_device->connection->int_dig_vv;
1715
1716	digest_size = 0;
1717	if (peer_device->connection->peer_integrity_tfm) {
1718		digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1719		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1720		if (err)
1721			return err;
1722		data_size -= digest_size;
1723	}
1724
1725	/* optimistically update recv_cnt.  if receiving fails below,
1726	 * we disconnect anyways, and counters will be reset. */
1727	peer_device->device->recv_cnt += data_size>>9;
1728
1729	bio = req->master_bio;
1730	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1731
1732	bio_for_each_segment(bvec, bio, iter) {
1733		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1734		expect = min_t(int, data_size, bvec.bv_len);
1735		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1736		kunmap(bvec.bv_page);
1737		if (err)
1738			return err;
1739		data_size -= expect;
1740	}
1741
1742	if (digest_size) {
1743		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1744		if (memcmp(dig_in, dig_vv, digest_size)) {
1745			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1746			return -EINVAL;
1747		}
1748	}
1749
1750	D_ASSERT(peer_device->device, data_size == 0);
1751	return 0;
1752}
1753
1754/*
1755 * e_end_resync_block() is called in asender context via
1756 * drbd_finish_peer_reqs().
1757 */
1758static int e_end_resync_block(struct drbd_work *w, int unused)
1759{
1760	struct drbd_peer_request *peer_req =
1761		container_of(w, struct drbd_peer_request, w);
1762	struct drbd_peer_device *peer_device = peer_req->peer_device;
1763	struct drbd_device *device = peer_device->device;
1764	sector_t sector = peer_req->i.sector;
1765	int err;
1766
1767	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1768
1769	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1770		drbd_set_in_sync(device, sector, peer_req->i.size);
1771		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1772	} else {
1773		/* Record failure to sync */
1774		drbd_rs_failed_io(device, sector, peer_req->i.size);
1775
1776		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1777	}
1778	dec_unacked(device);
1779
1780	return err;
1781}
1782
1783static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1784			    struct packet_info *pi) __releases(local)
1785{
1786	struct drbd_device *device = peer_device->device;
1787	struct drbd_peer_request *peer_req;
1788
1789	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1790	if (!peer_req)
1791		goto fail;
1792
1793	dec_rs_pending(device);
1794
1795	inc_unacked(device);
1796	/* corresponding dec_unacked() in e_end_resync_block()
1797	 * respective _drbd_clear_done_ee */
1798
1799	peer_req->w.cb = e_end_resync_block;
1800	peer_req->submit_jif = jiffies;
1801
1802	spin_lock_irq(&device->resource->req_lock);
1803	list_add_tail(&peer_req->w.list, &device->sync_ee);
1804	spin_unlock_irq(&device->resource->req_lock);
1805
1806	atomic_add(pi->size >> 9, &device->rs_sect_ev);
1807	if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1808		return 0;
1809
1810	/* don't care for the reason here */
1811	drbd_err(device, "submit failed, triggering re-connect\n");
1812	spin_lock_irq(&device->resource->req_lock);
1813	list_del(&peer_req->w.list);
1814	spin_unlock_irq(&device->resource->req_lock);
1815
1816	drbd_free_peer_req(device, peer_req);
1817fail:
1818	put_ldev(device);
1819	return -EIO;
1820}
1821
1822static struct drbd_request *
1823find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1824	     sector_t sector, bool missing_ok, const char *func)
1825{
1826	struct drbd_request *req;
1827
1828	/* Request object according to our peer */
1829	req = (struct drbd_request *)(unsigned long)id;
1830	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1831		return req;
1832	if (!missing_ok) {
1833		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1834			(unsigned long)id, (unsigned long long)sector);
1835	}
1836	return NULL;
1837}
1838
1839static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1840{
1841	struct drbd_peer_device *peer_device;
1842	struct drbd_device *device;
1843	struct drbd_request *req;
1844	sector_t sector;
1845	int err;
1846	struct p_data *p = pi->data;
1847
1848	peer_device = conn_peer_device(connection, pi->vnr);
1849	if (!peer_device)
1850		return -EIO;
1851	device = peer_device->device;
1852
1853	sector = be64_to_cpu(p->sector);
1854
1855	spin_lock_irq(&device->resource->req_lock);
1856	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1857	spin_unlock_irq(&device->resource->req_lock);
1858	if (unlikely(!req))
1859		return -EIO;
1860
1861	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1862	 * special casing it there for the various failure cases.
1863	 * still no race with drbd_fail_pending_reads */
1864	err = recv_dless_read(peer_device, req, sector, pi->size);
1865	if (!err)
1866		req_mod(req, DATA_RECEIVED);
1867	/* else: nothing. handled from drbd_disconnect...
1868	 * I don't think we may complete this just yet
1869	 * in case we are "on-disconnect: freeze" */
1870
1871	return err;
1872}
1873
1874static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1875{
1876	struct drbd_peer_device *peer_device;
1877	struct drbd_device *device;
1878	sector_t sector;
1879	int err;
1880	struct p_data *p = pi->data;
1881
1882	peer_device = conn_peer_device(connection, pi->vnr);
1883	if (!peer_device)
1884		return -EIO;
1885	device = peer_device->device;
1886
1887	sector = be64_to_cpu(p->sector);
1888	D_ASSERT(device, p->block_id == ID_SYNCER);
1889
1890	if (get_ldev(device)) {
1891		/* data is submitted to disk within recv_resync_read.
1892		 * corresponding put_ldev done below on error,
1893		 * or in drbd_peer_request_endio. */
1894		err = recv_resync_read(peer_device, sector, pi);
1895	} else {
1896		if (__ratelimit(&drbd_ratelimit_state))
1897			drbd_err(device, "Can not write resync data to local disk.\n");
1898
1899		err = drbd_drain_block(peer_device, pi->size);
1900
1901		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1902	}
1903
1904	atomic_add(pi->size >> 9, &device->rs_sect_in);
1905
1906	return err;
1907}
1908
1909static void restart_conflicting_writes(struct drbd_device *device,
1910				       sector_t sector, int size)
1911{
1912	struct drbd_interval *i;
1913	struct drbd_request *req;
1914
1915	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1916		if (!i->local)
1917			continue;
1918		req = container_of(i, struct drbd_request, i);
1919		if (req->rq_state & RQ_LOCAL_PENDING ||
1920		    !(req->rq_state & RQ_POSTPONED))
1921			continue;
1922		/* as it is RQ_POSTPONED, this will cause it to
1923		 * be queued on the retry workqueue. */
1924		__req_mod(req, CONFLICT_RESOLVED, NULL);
1925	}
1926}
1927
1928/*
1929 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1930 */
1931static int e_end_block(struct drbd_work *w, int cancel)
1932{
1933	struct drbd_peer_request *peer_req =
1934		container_of(w, struct drbd_peer_request, w);
1935	struct drbd_peer_device *peer_device = peer_req->peer_device;
1936	struct drbd_device *device = peer_device->device;
1937	sector_t sector = peer_req->i.sector;
1938	int err = 0, pcmd;
1939
1940	if (peer_req->flags & EE_SEND_WRITE_ACK) {
1941		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1942			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1943				device->state.conn <= C_PAUSED_SYNC_T &&
1944				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1945				P_RS_WRITE_ACK : P_WRITE_ACK;
1946			err = drbd_send_ack(peer_device, pcmd, peer_req);
1947			if (pcmd == P_RS_WRITE_ACK)
1948				drbd_set_in_sync(device, sector, peer_req->i.size);
1949		} else {
1950			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1951			/* we expect it to be marked out of sync anyways...
1952			 * maybe assert this?  */
1953		}
1954		dec_unacked(device);
1955	}
1956
1957	/* we delete from the conflict detection hash _after_ we sent out the
1958	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1959	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1960		spin_lock_irq(&device->resource->req_lock);
1961		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1962		drbd_remove_epoch_entry_interval(device, peer_req);
1963		if (peer_req->flags & EE_RESTART_REQUESTS)
1964			restart_conflicting_writes(device, sector, peer_req->i.size);
1965		spin_unlock_irq(&device->resource->req_lock);
1966	} else
1967		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1968
1969	drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1970
1971	return err;
1972}
1973
1974static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1975{
1976	struct drbd_peer_request *peer_req =
1977		container_of(w, struct drbd_peer_request, w);
1978	struct drbd_peer_device *peer_device = peer_req->peer_device;
1979	int err;
1980
1981	err = drbd_send_ack(peer_device, ack, peer_req);
1982	dec_unacked(peer_device->device);
1983
1984	return err;
1985}
1986
1987static int e_send_superseded(struct drbd_work *w, int unused)
1988{
1989	return e_send_ack(w, P_SUPERSEDED);
1990}
1991
1992static int e_send_retry_write(struct drbd_work *w, int unused)
1993{
1994	struct drbd_peer_request *peer_req =
1995		container_of(w, struct drbd_peer_request, w);
1996	struct drbd_connection *connection = peer_req->peer_device->connection;
1997
1998	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1999			     P_RETRY_WRITE : P_SUPERSEDED);
2000}
2001
2002static bool seq_greater(u32 a, u32 b)
2003{
2004	/*
2005	 * We assume 32-bit wrap-around here.
2006	 * For 24-bit wrap-around, we would have to shift:
2007	 *  a <<= 8; b <<= 8;
2008	 */
2009	return (s32)a - (s32)b > 0;
2010}
2011
2012static u32 seq_max(u32 a, u32 b)
2013{
2014	return seq_greater(a, b) ? a : b;
2015}
2016
2017static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2018{
2019	struct drbd_device *device = peer_device->device;
2020	unsigned int newest_peer_seq;
2021
2022	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2023		spin_lock(&device->peer_seq_lock);
2024		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2025		device->peer_seq = newest_peer_seq;
2026		spin_unlock(&device->peer_seq_lock);
2027		/* wake up only if we actually changed device->peer_seq */
2028		if (peer_seq == newest_peer_seq)
2029			wake_up(&device->seq_wait);
2030	}
2031}
2032
2033static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2034{
2035	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2036}
2037
2038/* maybe change sync_ee into interval trees as well? */
2039static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2040{
2041	struct drbd_peer_request *rs_req;
2042	bool rv = 0;
2043
2044	spin_lock_irq(&device->resource->req_lock);
2045	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2046		if (overlaps(peer_req->i.sector, peer_req->i.size,
2047			     rs_req->i.sector, rs_req->i.size)) {
2048			rv = 1;
2049			break;
2050		}
2051	}
2052	spin_unlock_irq(&device->resource->req_lock);
2053
2054	return rv;
2055}
2056
2057/* Called from receive_Data.
2058 * Synchronize packets on sock with packets on msock.
2059 *
2060 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2061 * packet traveling on msock, they are still processed in the order they have
2062 * been sent.
2063 *
2064 * Note: we don't care for Ack packets overtaking P_DATA packets.
2065 *
2066 * In case packet_seq is larger than device->peer_seq number, there are
2067 * outstanding packets on the msock. We wait for them to arrive.
2068 * In case we are the logically next packet, we update device->peer_seq
2069 * ourselves. Correctly handles 32bit wrap around.
2070 *
2071 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2072 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2073 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2074 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2075 *
2076 * returns 0 if we may process the packet,
2077 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2078static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2079{
2080	struct drbd_device *device = peer_device->device;
2081	DEFINE_WAIT(wait);
2082	long timeout;
2083	int ret = 0, tp;
2084
2085	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2086		return 0;
2087
2088	spin_lock(&device->peer_seq_lock);
2089	for (;;) {
2090		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2091			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2092			break;
2093		}
2094
2095		if (signal_pending(current)) {
2096			ret = -ERESTARTSYS;
2097			break;
2098		}
2099
2100		rcu_read_lock();
2101		tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2102		rcu_read_unlock();
2103
2104		if (!tp)
2105			break;
2106
2107		/* Only need to wait if two_primaries is enabled */
2108		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2109		spin_unlock(&device->peer_seq_lock);
2110		rcu_read_lock();
2111		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2112		rcu_read_unlock();
2113		timeout = schedule_timeout(timeout);
2114		spin_lock(&device->peer_seq_lock);
2115		if (!timeout) {
2116			ret = -ETIMEDOUT;
2117			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2118			break;
2119		}
2120	}
2121	spin_unlock(&device->peer_seq_lock);
2122	finish_wait(&device->seq_wait, &wait);
2123	return ret;
2124}
2125
2126/* see also bio_flags_to_wire()
2127 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2128 * flags and back. We may replicate to other kernel versions. */
2129static unsigned long wire_flags_to_bio(u32 dpf)
2130{
2131	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2132		(dpf & DP_FUA ? REQ_FUA : 0) |
2133		(dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2134		(dpf & DP_DISCARD ? REQ_DISCARD : 0);
2135}
2136
2137static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2138				    unsigned int size)
2139{
2140	struct drbd_interval *i;
2141
2142    repeat:
2143	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2144		struct drbd_request *req;
2145		struct bio_and_error m;
2146
2147		if (!i->local)
2148			continue;
2149		req = container_of(i, struct drbd_request, i);
2150		if (!(req->rq_state & RQ_POSTPONED))
2151			continue;
2152		req->rq_state &= ~RQ_POSTPONED;
2153		__req_mod(req, NEG_ACKED, &m);
2154		spin_unlock_irq(&device->resource->req_lock);
2155		if (m.bio)
2156			complete_master_bio(device, &m);
2157		spin_lock_irq(&device->resource->req_lock);
2158		goto repeat;
2159	}
2160}
2161
2162static int handle_write_conflicts(struct drbd_device *device,
2163				  struct drbd_peer_request *peer_req)
2164{
2165	struct drbd_connection *connection = peer_req->peer_device->connection;
2166	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2167	sector_t sector = peer_req->i.sector;
2168	const unsigned int size = peer_req->i.size;
2169	struct drbd_interval *i;
2170	bool equal;
2171	int err;
2172
2173	/*
2174	 * Inserting the peer request into the write_requests tree will prevent
2175	 * new conflicting local requests from being added.
2176	 */
2177	drbd_insert_interval(&device->write_requests, &peer_req->i);
2178
2179    repeat:
2180	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2181		if (i == &peer_req->i)
2182			continue;
2183		if (i->completed)
2184			continue;
2185
2186		if (!i->local) {
2187			/*
2188			 * Our peer has sent a conflicting remote request; this
2189			 * should not happen in a two-node setup.  Wait for the
2190			 * earlier peer request to complete.
2191			 */
2192			err = drbd_wait_misc(device, i);
2193			if (err)
2194				goto out;
2195			goto repeat;
2196		}
2197
2198		equal = i->sector == sector && i->size == size;
2199		if (resolve_conflicts) {
2200			/*
2201			 * If the peer request is fully contained within the
2202			 * overlapping request, it can be considered overwritten
2203			 * and thus superseded; otherwise, it will be retried
2204			 * once all overlapping requests have completed.
2205			 */
2206			bool superseded = i->sector <= sector && i->sector +
2207				       (i->size >> 9) >= sector + (size >> 9);
2208
2209			if (!equal)
2210				drbd_alert(device, "Concurrent writes detected: "
2211					       "local=%llus +%u, remote=%llus +%u, "
2212					       "assuming %s came first\n",
2213					  (unsigned long long)i->sector, i->size,
2214					  (unsigned long long)sector, size,
2215					  superseded ? "local" : "remote");
2216
2217			peer_req->w.cb = superseded ? e_send_superseded :
2218						   e_send_retry_write;
2219			list_add_tail(&peer_req->w.list, &device->done_ee);
2220			wake_asender(connection);
2221
2222			err = -ENOENT;
2223			goto out;
2224		} else {
2225			struct drbd_request *req =
2226				container_of(i, struct drbd_request, i);
2227
2228			if (!equal)
2229				drbd_alert(device, "Concurrent writes detected: "
2230					       "local=%llus +%u, remote=%llus +%u\n",
2231					  (unsigned long long)i->sector, i->size,
2232					  (unsigned long long)sector, size);
2233
2234			if (req->rq_state & RQ_LOCAL_PENDING ||
2235			    !(req->rq_state & RQ_POSTPONED)) {
2236				/*
2237				 * Wait for the node with the discard flag to
2238				 * decide if this request has been superseded
2239				 * or needs to be retried.
2240				 * Requests that have been superseded will
2241				 * disappear from the write_requests tree.
2242				 *
2243				 * In addition, wait for the conflicting
2244				 * request to finish locally before submitting
2245				 * the conflicting peer request.
2246				 */
2247				err = drbd_wait_misc(device, &req->i);
2248				if (err) {
2249					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2250					fail_postponed_requests(device, sector, size);
2251					goto out;
2252				}
2253				goto repeat;
2254			}
2255			/*
2256			 * Remember to restart the conflicting requests after
2257			 * the new peer request has completed.
2258			 */
2259			peer_req->flags |= EE_RESTART_REQUESTS;
2260		}
2261	}
2262	err = 0;
2263
2264    out:
2265	if (err)
2266		drbd_remove_epoch_entry_interval(device, peer_req);
2267	return err;
2268}
2269
2270/* mirrored write */
2271static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2272{
2273	struct drbd_peer_device *peer_device;
2274	struct drbd_device *device;
2275	struct net_conf *nc;
2276	sector_t sector;
2277	struct drbd_peer_request *peer_req;
2278	struct p_data *p = pi->data;
2279	u32 peer_seq = be32_to_cpu(p->seq_num);
2280	int rw = WRITE;
2281	u32 dp_flags;
2282	int err, tp;
2283
2284	peer_device = conn_peer_device(connection, pi->vnr);
2285	if (!peer_device)
2286		return -EIO;
2287	device = peer_device->device;
2288
2289	if (!get_ldev(device)) {
2290		int err2;
2291
2292		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2293		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2294		atomic_inc(&connection->current_epoch->epoch_size);
2295		err2 = drbd_drain_block(peer_device, pi->size);
2296		if (!err)
2297			err = err2;
2298		return err;
2299	}
2300
2301	/*
2302	 * Corresponding put_ldev done either below (on various errors), or in
2303	 * drbd_peer_request_endio, if we successfully submit the data at the
2304	 * end of this function.
2305	 */
2306
2307	sector = be64_to_cpu(p->sector);
2308	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2309	if (!peer_req) {
2310		put_ldev(device);
2311		return -EIO;
2312	}
2313
2314	peer_req->w.cb = e_end_block;
2315	peer_req->submit_jif = jiffies;
2316	peer_req->flags |= EE_APPLICATION;
2317
2318	dp_flags = be32_to_cpu(p->dp_flags);
2319	rw |= wire_flags_to_bio(dp_flags);
2320	if (pi->cmd == P_TRIM) {
2321		struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2322		peer_req->flags |= EE_IS_TRIM;
2323		if (!blk_queue_discard(q))
2324			peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2325		D_ASSERT(peer_device, peer_req->i.size > 0);
2326		D_ASSERT(peer_device, rw & REQ_DISCARD);
2327		D_ASSERT(peer_device, peer_req->pages == NULL);
2328	} else if (peer_req->pages == NULL) {
2329		D_ASSERT(device, peer_req->i.size == 0);
2330		D_ASSERT(device, dp_flags & DP_FLUSH);
2331	}
2332
2333	if (dp_flags & DP_MAY_SET_IN_SYNC)
2334		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2335
2336	spin_lock(&connection->epoch_lock);
2337	peer_req->epoch = connection->current_epoch;
2338	atomic_inc(&peer_req->epoch->epoch_size);
2339	atomic_inc(&peer_req->epoch->active);
2340	spin_unlock(&connection->epoch_lock);
2341
2342	rcu_read_lock();
2343	nc = rcu_dereference(peer_device->connection->net_conf);
2344	tp = nc->two_primaries;
2345	if (peer_device->connection->agreed_pro_version < 100) {
2346		switch (nc->wire_protocol) {
2347		case DRBD_PROT_C:
2348			dp_flags |= DP_SEND_WRITE_ACK;
2349			break;
2350		case DRBD_PROT_B:
2351			dp_flags |= DP_SEND_RECEIVE_ACK;
2352			break;
2353		}
2354	}
2355	rcu_read_unlock();
2356
2357	if (dp_flags & DP_SEND_WRITE_ACK) {
2358		peer_req->flags |= EE_SEND_WRITE_ACK;
2359		inc_unacked(device);
2360		/* corresponding dec_unacked() in e_end_block()
2361		 * respective _drbd_clear_done_ee */
2362	}
2363
2364	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2365		/* I really don't like it that the receiver thread
2366		 * sends on the msock, but anyways */
2367		drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2368	}
2369
2370	if (tp) {
2371		/* two primaries implies protocol C */
2372		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2373		peer_req->flags |= EE_IN_INTERVAL_TREE;
2374		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2375		if (err)
2376			goto out_interrupted;
2377		spin_lock_irq(&device->resource->req_lock);
2378		err = handle_write_conflicts(device, peer_req);
2379		if (err) {
2380			spin_unlock_irq(&device->resource->req_lock);
2381			if (err == -ENOENT) {
2382				put_ldev(device);
2383				return 0;
2384			}
2385			goto out_interrupted;
2386		}
2387	} else {
2388		update_peer_seq(peer_device, peer_seq);
2389		spin_lock_irq(&device->resource->req_lock);
2390	}
2391	/* if we use the zeroout fallback code, we process synchronously
2392	 * and we wait for all pending requests, respectively wait for
2393	 * active_ee to become empty in drbd_submit_peer_request();
2394	 * better not add ourselves here. */
2395	if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2396		list_add_tail(&peer_req->w.list, &device->active_ee);
2397	spin_unlock_irq(&device->resource->req_lock);
2398
2399	if (device->state.conn == C_SYNC_TARGET)
2400		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2401
2402	if (device->state.pdsk < D_INCONSISTENT) {
2403		/* In case we have the only disk of the cluster, */
2404		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2405		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2406		drbd_al_begin_io(device, &peer_req->i);
2407		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2408	}
2409
2410	err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2411	if (!err)
2412		return 0;
2413
2414	/* don't care for the reason here */
2415	drbd_err(device, "submit failed, triggering re-connect\n");
2416	spin_lock_irq(&device->resource->req_lock);
2417	list_del(&peer_req->w.list);
2418	drbd_remove_epoch_entry_interval(device, peer_req);
2419	spin_unlock_irq(&device->resource->req_lock);
2420	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2421		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2422		drbd_al_complete_io(device, &peer_req->i);
2423	}
2424
2425out_interrupted:
2426	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2427	put_ldev(device);
2428	drbd_free_peer_req(device, peer_req);
2429	return err;
2430}
2431
2432/* We may throttle resync, if the lower device seems to be busy,
2433 * and current sync rate is above c_min_rate.
2434 *
2435 * To decide whether or not the lower device is busy, we use a scheme similar
2436 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2437 * (more than 64 sectors) of activity we cannot account for with our own resync
2438 * activity, it obviously is "busy".
2439 *
2440 * The current sync rate used here uses only the most recent two step marks,
2441 * to have a short time average so we can react faster.
2442 */
2443bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2444		bool throttle_if_app_is_waiting)
2445{
2446	struct lc_element *tmp;
2447	bool throttle = drbd_rs_c_min_rate_throttle(device);
2448
2449	if (!throttle || throttle_if_app_is_waiting)
2450		return throttle;
2451
2452	spin_lock_irq(&device->al_lock);
2453	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2454	if (tmp) {
2455		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2456		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2457			throttle = false;
2458		/* Do not slow down if app IO is already waiting for this extent,
2459		 * and our progress is necessary for application IO to complete. */
2460	}
2461	spin_unlock_irq(&device->al_lock);
2462
2463	return throttle;
2464}
2465
2466bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2467{
2468	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2469	unsigned long db, dt, dbdt;
2470	unsigned int c_min_rate;
2471	int curr_events;
2472
2473	rcu_read_lock();
2474	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2475	rcu_read_unlock();
2476
2477	/* feature disabled? */
2478	if (c_min_rate == 0)
2479		return false;
2480
2481	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2482		      (int)part_stat_read(&disk->part0, sectors[1]) -
2483			atomic_read(&device->rs_sect_ev);
2484
2485	if (atomic_read(&device->ap_actlog_cnt)
2486	    || curr_events - device->rs_last_events > 64) {
2487		unsigned long rs_left;
2488		int i;
2489
2490		device->rs_last_events = curr_events;
2491
2492		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2493		 * approx. */
2494		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2495
2496		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2497			rs_left = device->ov_left;
2498		else
2499			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2500
2501		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2502		if (!dt)
2503			dt++;
2504		db = device->rs_mark_left[i] - rs_left;
2505		dbdt = Bit2KB(db/dt);
2506
2507		if (dbdt > c_min_rate)
2508			return true;
2509	}
2510	return false;
2511}
2512
2513static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2514{
2515	struct drbd_peer_device *peer_device;
2516	struct drbd_device *device;
2517	sector_t sector;
2518	sector_t capacity;
2519	struct drbd_peer_request *peer_req;
2520	struct digest_info *di = NULL;
2521	int size, verb;
2522	unsigned int fault_type;
2523	struct p_block_req *p =	pi->data;
2524
2525	peer_device = conn_peer_device(connection, pi->vnr);
2526	if (!peer_device)
2527		return -EIO;
2528	device = peer_device->device;
2529	capacity = drbd_get_capacity(device->this_bdev);
2530
2531	sector = be64_to_cpu(p->sector);
2532	size   = be32_to_cpu(p->blksize);
2533
2534	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2535		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2536				(unsigned long long)sector, size);
2537		return -EINVAL;
2538	}
2539	if (sector + (size>>9) > capacity) {
2540		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2541				(unsigned long long)sector, size);
2542		return -EINVAL;
2543	}
2544
2545	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2546		verb = 1;
2547		switch (pi->cmd) {
2548		case P_DATA_REQUEST:
2549			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2550			break;
2551		case P_RS_DATA_REQUEST:
2552		case P_CSUM_RS_REQUEST:
2553		case P_OV_REQUEST:
2554			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2555			break;
2556		case P_OV_REPLY:
2557			verb = 0;
2558			dec_rs_pending(device);
2559			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2560			break;
2561		default:
2562			BUG();
2563		}
2564		if (verb && __ratelimit(&drbd_ratelimit_state))
2565			drbd_err(device, "Can not satisfy peer's read request, "
2566			    "no local data.\n");
2567
2568		/* drain possibly payload */
2569		return drbd_drain_block(peer_device, pi->size);
2570	}
2571
2572	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2573	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2574	 * which in turn might block on the other node at this very place.  */
2575	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2576			true /* has real payload */, GFP_NOIO);
2577	if (!peer_req) {
2578		put_ldev(device);
2579		return -ENOMEM;
2580	}
2581
2582	switch (pi->cmd) {
2583	case P_DATA_REQUEST:
2584		peer_req->w.cb = w_e_end_data_req;
2585		fault_type = DRBD_FAULT_DT_RD;
2586		/* application IO, don't drbd_rs_begin_io */
2587		peer_req->flags |= EE_APPLICATION;
2588		goto submit;
2589
2590	case P_RS_DATA_REQUEST:
2591		peer_req->w.cb = w_e_end_rsdata_req;
2592		fault_type = DRBD_FAULT_RS_RD;
2593		/* used in the sector offset progress display */
2594		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2595		break;
2596
2597	case P_OV_REPLY:
2598	case P_CSUM_RS_REQUEST:
2599		fault_type = DRBD_FAULT_RS_RD;
2600		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2601		if (!di)
2602			goto out_free_e;
2603
2604		di->digest_size = pi->size;
2605		di->digest = (((char *)di)+sizeof(struct digest_info));
2606
2607		peer_req->digest = di;
2608		peer_req->flags |= EE_HAS_DIGEST;
2609
2610		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2611			goto out_free_e;
2612
2613		if (pi->cmd == P_CSUM_RS_REQUEST) {
2614			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2615			peer_req->w.cb = w_e_end_csum_rs_req;
2616			/* used in the sector offset progress display */
2617			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2618			/* remember to report stats in drbd_resync_finished */
2619			device->use_csums = true;
2620		} else if (pi->cmd == P_OV_REPLY) {
2621			/* track progress, we may need to throttle */
2622			atomic_add(size >> 9, &device->rs_sect_in);
2623			peer_req->w.cb = w_e_end_ov_reply;
2624			dec_rs_pending(device);
2625			/* drbd_rs_begin_io done when we sent this request,
2626			 * but accounting still needs to be done. */
2627			goto submit_for_resync;
2628		}
2629		break;
2630
2631	case P_OV_REQUEST:
2632		if (device->ov_start_sector == ~(sector_t)0 &&
2633		    peer_device->connection->agreed_pro_version >= 90) {
2634			unsigned long now = jiffies;
2635			int i;
2636			device->ov_start_sector = sector;
2637			device->ov_position = sector;
2638			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2639			device->rs_total = device->ov_left;
2640			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2641				device->rs_mark_left[i] = device->ov_left;
2642				device->rs_mark_time[i] = now;
2643			}
2644			drbd_info(device, "Online Verify start sector: %llu\n",
2645					(unsigned long long)sector);
2646		}
2647		peer_req->w.cb = w_e_end_ov_req;
2648		fault_type = DRBD_FAULT_RS_RD;
2649		break;
2650
2651	default:
2652		BUG();
2653	}
2654
2655	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2656	 * wrt the receiver, but it is not as straightforward as it may seem.
2657	 * Various places in the resync start and stop logic assume resync
2658	 * requests are processed in order, requeuing this on the worker thread
2659	 * introduces a bunch of new code for synchronization between threads.
2660	 *
2661	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2662	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2663	 * for application writes for the same time.  For now, just throttle
2664	 * here, where the rest of the code expects the receiver to sleep for
2665	 * a while, anyways.
2666	 */
2667
2668	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2669	 * this defers syncer requests for some time, before letting at least
2670	 * on request through.  The resync controller on the receiving side
2671	 * will adapt to the incoming rate accordingly.
2672	 *
2673	 * We cannot throttle here if remote is Primary/SyncTarget:
2674	 * we would also throttle its application reads.
2675	 * In that case, throttling is done on the SyncTarget only.
2676	 */
2677
2678	/* Even though this may be a resync request, we do add to "read_ee";
2679	 * "sync_ee" is only used for resync WRITEs.
2680	 * Add to list early, so debugfs can find this request
2681	 * even if we have to sleep below. */
2682	spin_lock_irq(&device->resource->req_lock);
2683	list_add_tail(&peer_req->w.list, &device->read_ee);
2684	spin_unlock_irq(&device->resource->req_lock);
2685
2686	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2687	if (device->state.peer != R_PRIMARY
2688	&& drbd_rs_should_slow_down(device, sector, false))
2689		schedule_timeout_uninterruptible(HZ/10);
2690	update_receiver_timing_details(connection, drbd_rs_begin_io);
2691	if (drbd_rs_begin_io(device, sector))
2692		goto out_free_e;
2693
2694submit_for_resync:
2695	atomic_add(size >> 9, &device->rs_sect_ev);
2696
2697submit:
2698	update_receiver_timing_details(connection, drbd_submit_peer_request);
2699	inc_unacked(device);
2700	if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2701		return 0;
2702
2703	/* don't care for the reason here */
2704	drbd_err(device, "submit failed, triggering re-connect\n");
2705
2706out_free_e:
2707	spin_lock_irq(&device->resource->req_lock);
2708	list_del(&peer_req->w.list);
2709	spin_unlock_irq(&device->resource->req_lock);
2710	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2711
2712	put_ldev(device);
2713	drbd_free_peer_req(device, peer_req);
2714	return -EIO;
2715}
2716
2717/**
2718 * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2719 */
2720static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2721{
2722	struct drbd_device *device = peer_device->device;
2723	int self, peer, rv = -100;
2724	unsigned long ch_self, ch_peer;
2725	enum drbd_after_sb_p after_sb_0p;
2726
2727	self = device->ldev->md.uuid[UI_BITMAP] & 1;
2728	peer = device->p_uuid[UI_BITMAP] & 1;
2729
2730	ch_peer = device->p_uuid[UI_SIZE];
2731	ch_self = device->comm_bm_set;
2732
2733	rcu_read_lock();
2734	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2735	rcu_read_unlock();
2736	switch (after_sb_0p) {
2737	case ASB_CONSENSUS:
2738	case ASB_DISCARD_SECONDARY:
2739	case ASB_CALL_HELPER:
2740	case ASB_VIOLENTLY:
2741		drbd_err(device, "Configuration error.\n");
2742		break;
2743	case ASB_DISCONNECT:
2744		break;
2745	case ASB_DISCARD_YOUNGER_PRI:
2746		if (self == 0 && peer == 1) {
2747			rv = -1;
2748			break;
2749		}
2750		if (self == 1 && peer == 0) {
2751			rv =  1;
2752			break;
2753		}
2754		/* Else fall through to one of the other strategies... */
2755	case ASB_DISCARD_OLDER_PRI:
2756		if (self == 0 && peer == 1) {
2757			rv = 1;
2758			break;
2759		}
2760		if (self == 1 && peer == 0) {
2761			rv = -1;
2762			break;
2763		}
2764		/* Else fall through to one of the other strategies... */
2765		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2766		     "Using discard-least-changes instead\n");
2767	case ASB_DISCARD_ZERO_CHG:
2768		if (ch_peer == 0 && ch_self == 0) {
2769			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2770				? -1 : 1;
2771			break;
2772		} else {
2773			if (ch_peer == 0) { rv =  1; break; }
2774			if (ch_self == 0) { rv = -1; break; }
2775		}
2776		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2777			break;
2778	case ASB_DISCARD_LEAST_CHG:
2779		if	(ch_self < ch_peer)
2780			rv = -1;
2781		else if (ch_self > ch_peer)
2782			rv =  1;
2783		else /* ( ch_self == ch_peer ) */
2784		     /* Well, then use something else. */
2785			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2786				? -1 : 1;
2787		break;
2788	case ASB_DISCARD_LOCAL:
2789		rv = -1;
2790		break;
2791	case ASB_DISCARD_REMOTE:
2792		rv =  1;
2793	}
2794
2795	return rv;
2796}
2797
2798/**
2799 * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2800 */
2801static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2802{
2803	struct drbd_device *device = peer_device->device;
2804	int hg, rv = -100;
2805	enum drbd_after_sb_p after_sb_1p;
2806
2807	rcu_read_lock();
2808	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2809	rcu_read_unlock();
2810	switch (after_sb_1p) {
2811	case ASB_DISCARD_YOUNGER_PRI:
2812	case ASB_DISCARD_OLDER_PRI:
2813	case ASB_DISCARD_LEAST_CHG:
2814	case ASB_DISCARD_LOCAL:
2815	case ASB_DISCARD_REMOTE:
2816	case ASB_DISCARD_ZERO_CHG:
2817		drbd_err(device, "Configuration error.\n");
2818		break;
2819	case ASB_DISCONNECT:
2820		break;
2821	case ASB_CONSENSUS:
2822		hg = drbd_asb_recover_0p(peer_device);
2823		if (hg == -1 && device->state.role == R_SECONDARY)
2824			rv = hg;
2825		if (hg == 1  && device->state.role == R_PRIMARY)
2826			rv = hg;
2827		break;
2828	case ASB_VIOLENTLY:
2829		rv = drbd_asb_recover_0p(peer_device);
2830		break;
2831	case ASB_DISCARD_SECONDARY:
2832		return device->state.role == R_PRIMARY ? 1 : -1;
2833	case ASB_CALL_HELPER:
2834		hg = drbd_asb_recover_0p(peer_device);
2835		if (hg == -1 && device->state.role == R_PRIMARY) {
2836			enum drbd_state_rv rv2;
2837
2838			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2839			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2840			  * we do not need to wait for the after state change work either. */
2841			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2842			if (rv2 != SS_SUCCESS) {
2843				drbd_khelper(device, "pri-lost-after-sb");
2844			} else {
2845				drbd_warn(device, "Successfully gave up primary role.\n");
2846				rv = hg;
2847			}
2848		} else
2849			rv = hg;
2850	}
2851
2852	return rv;
2853}
2854
2855/**
2856 * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2857 */
2858static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2859{
2860	struct drbd_device *device = peer_device->device;
2861	int hg, rv = -100;
2862	enum drbd_after_sb_p after_sb_2p;
2863
2864	rcu_read_lock();
2865	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2866	rcu_read_unlock();
2867	switch (after_sb_2p) {
2868	case ASB_DISCARD_YOUNGER_PRI:
2869	case ASB_DISCARD_OLDER_PRI:
2870	case ASB_DISCARD_LEAST_CHG:
2871	case ASB_DISCARD_LOCAL:
2872	case ASB_DISCARD_REMOTE:
2873	case ASB_CONSENSUS:
2874	case ASB_DISCARD_SECONDARY:
2875	case ASB_DISCARD_ZERO_CHG:
2876		drbd_err(device, "Configuration error.\n");
2877		break;
2878	case ASB_VIOLENTLY:
2879		rv = drbd_asb_recover_0p(peer_device);
2880		break;
2881	case ASB_DISCONNECT:
2882		break;
2883	case ASB_CALL_HELPER:
2884		hg = drbd_asb_recover_0p(peer_device);
2885		if (hg == -1) {
2886			enum drbd_state_rv rv2;
2887
2888			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2889			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2890			  * we do not need to wait for the after state change work either. */
2891			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2892			if (rv2 != SS_SUCCESS) {
2893				drbd_khelper(device, "pri-lost-after-sb");
2894			} else {
2895				drbd_warn(device, "Successfully gave up primary role.\n");
2896				rv = hg;
2897			}
2898		} else
2899			rv = hg;
2900	}
2901
2902	return rv;
2903}
2904
2905static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2906			   u64 bits, u64 flags)
2907{
2908	if (!uuid) {
2909		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2910		return;
2911	}
2912	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2913	     text,
2914	     (unsigned long long)uuid[UI_CURRENT],
2915	     (unsigned long long)uuid[UI_BITMAP],
2916	     (unsigned long long)uuid[UI_HISTORY_START],
2917	     (unsigned long long)uuid[UI_HISTORY_END],
2918	     (unsigned long long)bits,
2919	     (unsigned long long)flags);
2920}
2921
2922/*
2923  100	after split brain try auto recover
2924    2	C_SYNC_SOURCE set BitMap
2925    1	C_SYNC_SOURCE use BitMap
2926    0	no Sync
2927   -1	C_SYNC_TARGET use BitMap
2928   -2	C_SYNC_TARGET set BitMap
2929 -100	after split brain, disconnect
2930-1000	unrelated data
2931-1091   requires proto 91
2932-1096   requires proto 96
2933 */
2934static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
2935{
2936	struct drbd_peer_device *const peer_device = first_peer_device(device);
2937	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
2938	u64 self, peer;
2939	int i, j;
2940
2941	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2942	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2943
2944	*rule_nr = 10;
2945	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2946		return 0;
2947
2948	*rule_nr = 20;
2949	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2950	     peer != UUID_JUST_CREATED)
2951		return -2;
2952
2953	*rule_nr = 30;
2954	if (self != UUID_JUST_CREATED &&
2955	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2956		return 2;
2957
2958	if (self == peer) {
2959		int rct, dc; /* roles at crash time */
2960
2961		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2962
2963			if (connection->agreed_pro_version < 91)
2964				return -1091;
2965
2966			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2967			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2968				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2969				drbd_uuid_move_history(device);
2970				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2971				device->ldev->md.uuid[UI_BITMAP] = 0;
2972
2973				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2974					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2975				*rule_nr = 34;
2976			} else {
2977				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2978				*rule_nr = 36;
2979			}
2980
2981			return 1;
2982		}
2983
2984		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2985
2986			if (connection->agreed_pro_version < 91)
2987				return -1091;
2988
2989			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2990			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2991				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2992
2993				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2994				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2995				device->p_uuid[UI_BITMAP] = 0UL;
2996
2997				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2998				*rule_nr = 35;
2999			} else {
3000				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3001				*rule_nr = 37;
3002			}
3003
3004			return -1;
3005		}
3006
3007		/* Common power [off|failure] */
3008		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3009			(device->p_uuid[UI_FLAGS] & 2);
3010		/* lowest bit is set when we were primary,
3011		 * next bit (weight 2) is set when peer was primary */
3012		*rule_nr = 40;
3013
3014		switch (rct) {
3015		case 0: /* !self_pri && !peer_pri */ return 0;
3016		case 1: /*  self_pri && !peer_pri */ return 1;
3017		case 2: /* !self_pri &&  peer_pri */ return -1;
3018		case 3: /*  self_pri &&  peer_pri */
3019			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3020			return dc ? -1 : 1;
3021		}
3022	}
3023
3024	*rule_nr = 50;
3025	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3026	if (self == peer)
3027		return -1;
3028
3029	*rule_nr = 51;
3030	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3031	if (self == peer) {
3032		if (connection->agreed_pro_version < 96 ?
3033		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3034		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3035		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3036			/* The last P_SYNC_UUID did not get though. Undo the last start of
3037			   resync as sync source modifications of the peer's UUIDs. */
3038
3039			if (connection->agreed_pro_version < 91)
3040				return -1091;
3041
3042			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3043			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3044
3045			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3046			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3047
3048			return -1;
3049		}
3050	}
3051
3052	*rule_nr = 60;
3053	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3054	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3055		peer = device->p_uuid[i] & ~((u64)1);
3056		if (self == peer)
3057			return -2;
3058	}
3059
3060	*rule_nr = 70;
3061	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3062	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3063	if (self == peer)
3064		return 1;
3065
3066	*rule_nr = 71;
3067	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3068	if (self == peer) {
3069		if (connection->agreed_pro_version < 96 ?
3070		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3071		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3072		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3073			/* The last P_SYNC_UUID did not get though. Undo the last start of
3074			   resync as sync source modifications of our UUIDs. */
3075
3076			if (connection->agreed_pro_version < 91)
3077				return -1091;
3078
3079			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3080			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3081
3082			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3083			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3084				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3085
3086			return 1;
3087		}
3088	}
3089
3090
3091	*rule_nr = 80;
3092	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3093	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3094		self = device->ldev->md.uuid[i] & ~((u64)1);
3095		if (self == peer)
3096			return 2;
3097	}
3098
3099	*rule_nr = 90;
3100	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3101	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3102	if (self == peer && self != ((u64)0))
3103		return 100;
3104
3105	*rule_nr = 100;
3106	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3107		self = device->ldev->md.uuid[i] & ~((u64)1);
3108		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3109			peer = device->p_uuid[j] & ~((u64)1);
3110			if (self == peer)
3111				return -100;
3112		}
3113	}
3114
3115	return -1000;
3116}
3117
3118/* drbd_sync_handshake() returns the new conn state on success, or
3119   CONN_MASK (-1) on failure.
3120 */
3121static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3122					   enum drbd_role peer_role,
3123					   enum drbd_disk_state peer_disk) __must_hold(local)
3124{
3125	struct drbd_device *device = peer_device->device;
3126	enum drbd_conns rv = C_MASK;
3127	enum drbd_disk_state mydisk;
3128	struct net_conf *nc;
3129	int hg, rule_nr, rr_conflict, tentative;
3130
3131	mydisk = device->state.disk;
3132	if (mydisk == D_NEGOTIATING)
3133		mydisk = device->new_state_tmp.disk;
3134
3135	drbd_info(device, "drbd_sync_handshake:\n");
3136
3137	spin_lock_irq(&device->ldev->md.uuid_lock);
3138	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3139	drbd_uuid_dump(device, "peer", device->p_uuid,
3140		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3141
3142	hg = drbd_uuid_compare(device, &rule_nr);
3143	spin_unlock_irq(&device->ldev->md.uuid_lock);
3144
3145	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3146
3147	if (hg == -1000) {
3148		drbd_alert(device, "Unrelated data, aborting!\n");
3149		return C_MASK;
3150	}
3151	if (hg < -1000) {
3152		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3153		return C_MASK;
3154	}
3155
3156	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3157	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3158		int f = (hg == -100) || abs(hg) == 2;
3159		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3160		if (f)
3161			hg = hg*2;
3162		drbd_info(device, "Becoming sync %s due to disk states.\n",
3163		     hg > 0 ? "source" : "target");
3164	}
3165
3166	if (abs(hg) == 100)
3167		drbd_khelper(device, "initial-split-brain");
3168
3169	rcu_read_lock();
3170	nc = rcu_dereference(peer_device->connection->net_conf);
3171
3172	if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3173		int pcount = (device->state.role == R_PRIMARY)
3174			   + (peer_role == R_PRIMARY);
3175		int forced = (hg == -100);
3176
3177		switch (pcount) {
3178		case 0:
3179			hg = drbd_asb_recover_0p(peer_device);
3180			break;
3181		case 1:
3182			hg = drbd_asb_recover_1p(peer_device);
3183			break;
3184		case 2:
3185			hg = drbd_asb_recover_2p(peer_device);
3186			break;
3187		}
3188		if (abs(hg) < 100) {
3189			drbd_warn(device, "Split-Brain detected, %d primaries, "
3190			     "automatically solved. Sync from %s node\n",
3191			     pcount, (hg < 0) ? "peer" : "this");
3192			if (forced) {
3193				drbd_warn(device, "Doing a full sync, since"
3194				     " UUIDs where ambiguous.\n");
3195				hg = hg*2;
3196			}
3197		}
3198	}
3199
3200	if (hg == -100) {
3201		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3202			hg = -1;
3203		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3204			hg = 1;
3205
3206		if (abs(hg) < 100)
3207			drbd_warn(device, "Split-Brain detected, manually solved. "
3208			     "Sync from %s node\n",
3209			     (hg < 0) ? "peer" : "this");
3210	}
3211	rr_conflict = nc->rr_conflict;
3212	tentative = nc->tentative;
3213	rcu_read_unlock();
3214
3215	if (hg == -100) {
3216		/* FIXME this log message is not correct if we end up here
3217		 * after an attempted attach on a diskless node.
3218		 * We just refuse to attach -- well, we drop the "connection"
3219		 * to that disk, in a way... */
3220		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3221		drbd_khelper(device, "split-brain");
3222		return C_MASK;
3223	}
3224
3225	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3226		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3227		return C_MASK;
3228	}
3229
3230	if (hg < 0 && /* by intention we do not use mydisk here. */
3231	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3232		switch (rr_conflict) {
3233		case ASB_CALL_HELPER:
3234			drbd_khelper(device, "pri-lost");
3235			/* fall through */
3236		case ASB_DISCONNECT:
3237			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3238			return C_MASK;
3239		case ASB_VIOLENTLY:
3240			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3241			     "assumption\n");
3242		}
3243	}
3244
3245	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3246		if (hg == 0)
3247			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3248		else
3249			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3250				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3251				 abs(hg) >= 2 ? "full" : "bit-map based");
3252		return C_MASK;
3253	}
3254
3255	if (abs(hg) >= 2) {
3256		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3257		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3258					BM_LOCKED_SET_ALLOWED))
3259			return C_MASK;
3260	}
3261
3262	if (hg > 0) { /* become sync source. */
3263		rv = C_WF_BITMAP_S;
3264	} else if (hg < 0) { /* become sync target */
3265		rv = C_WF_BITMAP_T;
3266	} else {
3267		rv = C_CONNECTED;
3268		if (drbd_bm_total_weight(device)) {
3269			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3270			     drbd_bm_total_weight(device));
3271		}
3272	}
3273
3274	return rv;
3275}
3276
3277static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3278{
3279	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3280	if (peer == ASB_DISCARD_REMOTE)
3281		return ASB_DISCARD_LOCAL;
3282
3283	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3284	if (peer == ASB_DISCARD_LOCAL)
3285		return ASB_DISCARD_REMOTE;
3286
3287	/* everything else is valid if they are equal on both sides. */
3288	return peer;
3289}
3290
3291static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3292{
3293	struct p_protocol *p = pi->data;
3294	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3295	int p_proto, p_discard_my_data, p_two_primaries, cf;
3296	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3297	char integrity_alg[SHARED_SECRET_MAX] = "";
3298	struct crypto_hash *peer_integrity_tfm = NULL;
3299	void *int_dig_in = NULL, *int_dig_vv = NULL;
3300
3301	p_proto		= be32_to_cpu(p->protocol);
3302	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3303	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3304	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3305	p_two_primaries = be32_to_cpu(p->two_primaries);
3306	cf		= be32_to_cpu(p->conn_flags);
3307	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3308
3309	if (connection->agreed_pro_version >= 87) {
3310		int err;
3311
3312		if (pi->size > sizeof(integrity_alg))
3313			return -EIO;
3314		err = drbd_recv_all(connection, integrity_alg, pi->size);
3315		if (err)
3316			return err;
3317		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3318	}
3319
3320	if (pi->cmd != P_PROTOCOL_UPDATE) {
3321		clear_bit(CONN_DRY_RUN, &connection->flags);
3322
3323		if (cf & CF_DRY_RUN)
3324			set_bit(CONN_DRY_RUN, &connection->flags);
3325
3326		rcu_read_lock();
3327		nc = rcu_dereference(connection->net_conf);
3328
3329		if (p_proto != nc->wire_protocol) {
3330			drbd_err(connection, "incompatible %s settings\n", "protocol");
3331			goto disconnect_rcu_unlock;
3332		}
3333
3334		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3335			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3336			goto disconnect_rcu_unlock;
3337		}
3338
3339		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3340			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3341			goto disconnect_rcu_unlock;
3342		}
3343
3344		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3345			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3346			goto disconnect_rcu_unlock;
3347		}
3348
3349		if (p_discard_my_data && nc->discard_my_data) {
3350			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3351			goto disconnect_rcu_unlock;
3352		}
3353
3354		if (p_two_primaries != nc->two_primaries) {
3355			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3356			goto disconnect_rcu_unlock;
3357		}
3358
3359		if (strcmp(integrity_alg, nc->integrity_alg)) {
3360			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3361			goto disconnect_rcu_unlock;
3362		}
3363
3364		rcu_read_unlock();
3365	}
3366
3367	if (integrity_alg[0]) {
3368		int hash_size;
3369
3370		/*
3371		 * We can only change the peer data integrity algorithm
3372		 * here.  Changing our own data integrity algorithm
3373		 * requires that we send a P_PROTOCOL_UPDATE packet at
3374		 * the same time; otherwise, the peer has no way to
3375		 * tell between which packets the algorithm should
3376		 * change.
3377		 */
3378
3379		peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3380		if (!peer_integrity_tfm) {
3381			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3382				 integrity_alg);
3383			goto disconnect;
3384		}
3385
3386		hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3387		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3388		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3389		if (!(int_dig_in && int_dig_vv)) {
3390			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3391			goto disconnect;
3392		}
3393	}
3394
3395	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3396	if (!new_net_conf) {
3397		drbd_err(connection, "Allocation of new net_conf failed\n");
3398		goto disconnect;
3399	}
3400
3401	mutex_lock(&connection->data.mutex);
3402	mutex_lock(&connection->resource->conf_update);
3403	old_net_conf = connection->net_conf;
3404	*new_net_conf = *old_net_conf;
3405
3406	new_net_conf->wire_protocol = p_proto;
3407	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3408	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3409	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3410	new_net_conf->two_primaries = p_two_primaries;
3411
3412	rcu_assign_pointer(connection->net_conf, new_net_conf);
3413	mutex_unlock(&connection->resource->conf_update);
3414	mutex_unlock(&connection->data.mutex);
3415
3416	crypto_free_hash(connection->peer_integrity_tfm);
3417	kfree(connection->int_dig_in);
3418	kfree(connection->int_dig_vv);
3419	connection->peer_integrity_tfm = peer_integrity_tfm;
3420	connection->int_dig_in = int_dig_in;
3421	connection->int_dig_vv = int_dig_vv;
3422
3423	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3424		drbd_info(connection, "peer data-integrity-alg: %s\n",
3425			  integrity_alg[0] ? integrity_alg : "(none)");
3426
3427	synchronize_rcu();
3428	kfree(old_net_conf);
3429	return 0;
3430
3431disconnect_rcu_unlock:
3432	rcu_read_unlock();
3433disconnect:
3434	crypto_free_hash(peer_integrity_tfm);
3435	kfree(int_dig_in);
3436	kfree(int_dig_vv);
3437	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3438	return -EIO;
3439}
3440
3441/* helper function
3442 * input: alg name, feature name
3443 * return: NULL (alg name was "")
3444 *         ERR_PTR(error) if something goes wrong
3445 *         or the crypto hash ptr, if it worked out ok. */
3446static struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3447		const char *alg, const char *name)
3448{
3449	struct crypto_hash *tfm;
3450
3451	if (!alg[0])
3452		return NULL;
3453
3454	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3455	if (IS_ERR(tfm)) {
3456		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3457			alg, name, PTR_ERR(tfm));
3458		return tfm;
3459	}
3460	return tfm;
3461}
3462
3463static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3464{
3465	void *buffer = connection->data.rbuf;
3466	int size = pi->size;
3467
3468	while (size) {
3469		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3470		s = drbd_recv(connection, buffer, s);
3471		if (s <= 0) {
3472			if (s < 0)
3473				return s;
3474			break;
3475		}
3476		size -= s;
3477	}
3478	if (size)
3479		return -EIO;
3480	return 0;
3481}
3482
3483/*
3484 * config_unknown_volume  -  device configuration command for unknown volume
3485 *
3486 * When a device is added to an existing connection, the node on which the
3487 * device is added first will send configuration commands to its peer but the
3488 * peer will not know about the device yet.  It will warn and ignore these
3489 * commands.  Once the device is added on the second node, the second node will
3490 * send the same device configuration commands, but in the other direction.
3491 *
3492 * (We can also end up here if drbd is misconfigured.)
3493 */
3494static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3495{
3496	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3497		  cmdname(pi->cmd), pi->vnr);
3498	return ignore_remaining_packet(connection, pi);
3499}
3500
3501static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3502{
3503	struct drbd_peer_device *peer_device;
3504	struct drbd_device *device;
3505	struct p_rs_param_95 *p;
3506	unsigned int header_size, data_size, exp_max_sz;
3507	struct crypto_hash *verify_tfm = NULL;
3508	struct crypto_hash *csums_tfm = NULL;
3509	struct net_conf *old_net_conf, *new_net_conf = NULL;
3510	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3511	const int apv = connection->agreed_pro_version;
3512	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3513	int fifo_size = 0;
3514	int err;
3515
3516	peer_device = conn_peer_device(connection, pi->vnr);
3517	if (!peer_device)
3518		return config_unknown_volume(connection, pi);
3519	device = peer_device->device;
3520
3521	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3522		    : apv == 88 ? sizeof(struct p_rs_param)
3523					+ SHARED_SECRET_MAX
3524		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3525		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3526
3527	if (pi->size > exp_max_sz) {
3528		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3529		    pi->size, exp_max_sz);
3530		return -EIO;
3531	}
3532
3533	if (apv <= 88) {
3534		header_size = sizeof(struct p_rs_param);
3535		data_size = pi->size - header_size;
3536	} else if (apv <= 94) {
3537		header_size = sizeof(struct p_rs_param_89);
3538		data_size = pi->size - header_size;
3539		D_ASSERT(device, data_size == 0);
3540	} else {
3541		header_size = sizeof(struct p_rs_param_95);
3542		data_size = pi->size - header_size;
3543		D_ASSERT(device, data_size == 0);
3544	}
3545
3546	/* initialize verify_alg and csums_alg */
3547	p = pi->data;
3548	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3549
3550	err = drbd_recv_all(peer_device->connection, p, header_size);
3551	if (err)
3552		return err;
3553
3554	mutex_lock(&connection->resource->conf_update);
3555	old_net_conf = peer_device->connection->net_conf;
3556	if (get_ldev(device)) {
3557		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3558		if (!new_disk_conf) {
3559			put_ldev(device);
3560			mutex_unlock(&connection->resource->conf_update);
3561			drbd_err(device, "Allocation of new disk_conf failed\n");
3562			return -ENOMEM;
3563		}
3564
3565		old_disk_conf = device->ldev->disk_conf;
3566		*new_disk_conf = *old_disk_conf;
3567
3568		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3569	}
3570
3571	if (apv >= 88) {
3572		if (apv == 88) {
3573			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3574				drbd_err(device, "verify-alg of wrong size, "
3575					"peer wants %u, accepting only up to %u byte\n",
3576					data_size, SHARED_SECRET_MAX);
3577				err = -EIO;
3578				goto reconnect;
3579			}
3580
3581			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3582			if (err)
3583				goto reconnect;
3584			/* we expect NUL terminated string */
3585			/* but just in case someone tries to be evil */
3586			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3587			p->verify_alg[data_size-1] = 0;
3588
3589		} else /* apv >= 89 */ {
3590			/* we still expect NUL terminated strings */
3591			/* but just in case someone tries to be evil */
3592			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3593			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3594			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3595			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3596		}
3597
3598		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3599			if (device->state.conn == C_WF_REPORT_PARAMS) {
3600				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3601				    old_net_conf->verify_alg, p->verify_alg);
3602				goto disconnect;
3603			}
3604			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3605					p->verify_alg, "verify-alg");
3606			if (IS_ERR(verify_tfm)) {
3607				verify_tfm = NULL;
3608				goto disconnect;
3609			}
3610		}
3611
3612		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3613			if (device->state.conn == C_WF_REPORT_PARAMS) {
3614				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3615				    old_net_conf->csums_alg, p->csums_alg);
3616				goto disconnect;
3617			}
3618			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3619					p->csums_alg, "csums-alg");
3620			if (IS_ERR(csums_tfm)) {
3621				csums_tfm = NULL;
3622				goto disconnect;
3623			}
3624		}
3625
3626		if (apv > 94 && new_disk_conf) {
3627			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3628			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3629			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3630			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3631
3632			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3633			if (fifo_size != device->rs_plan_s->size) {
3634				new_plan = fifo_alloc(fifo_size);
3635				if (!new_plan) {
3636					drbd_err(device, "kmalloc of fifo_buffer failed");
3637					put_ldev(device);
3638					goto disconnect;
3639				}
3640			}
3641		}
3642
3643		if (verify_tfm || csums_tfm) {
3644			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3645			if (!new_net_conf) {
3646				drbd_err(device, "Allocation of new net_conf failed\n");
3647				goto disconnect;
3648			}
3649
3650			*new_net_conf = *old_net_conf;
3651
3652			if (verify_tfm) {
3653				strcpy(new_net_conf->verify_alg, p->verify_alg);
3654				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3655				crypto_free_hash(peer_device->connection->verify_tfm);
3656				peer_device->connection->verify_tfm = verify_tfm;
3657				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3658			}
3659			if (csums_tfm) {
3660				strcpy(new_net_conf->csums_alg, p->csums_alg);
3661				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3662				crypto_free_hash(peer_device->connection->csums_tfm);
3663				peer_device->connection->csums_tfm = csums_tfm;
3664				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3665			}
3666			rcu_assign_pointer(connection->net_conf, new_net_conf);
3667		}
3668	}
3669
3670	if (new_disk_conf) {
3671		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3672		put_ldev(device);
3673	}
3674
3675	if (new_plan) {
3676		old_plan = device->rs_plan_s;
3677		rcu_assign_pointer(device->rs_plan_s, new_plan);
3678	}
3679
3680	mutex_unlock(&connection->resource->conf_update);
3681	synchronize_rcu();
3682	if (new_net_conf)
3683		kfree(old_net_conf);
3684	kfree(old_disk_conf);
3685	kfree(old_plan);
3686
3687	return 0;
3688
3689reconnect:
3690	if (new_disk_conf) {
3691		put_ldev(device);
3692		kfree(new_disk_conf);
3693	}
3694	mutex_unlock(&connection->resource->conf_update);
3695	return -EIO;
3696
3697disconnect:
3698	kfree(new_plan);
3699	if (new_disk_conf) {
3700		put_ldev(device);
3701		kfree(new_disk_conf);
3702	}
3703	mutex_unlock(&connection->resource->conf_update);
3704	/* just for completeness: actually not needed,
3705	 * as this is not reached if csums_tfm was ok. */
3706	crypto_free_hash(csums_tfm);
3707	/* but free the verify_tfm again, if csums_tfm did not work out */
3708	crypto_free_hash(verify_tfm);
3709	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3710	return -EIO;
3711}
3712
3713/* warn if the arguments differ by more than 12.5% */
3714static void warn_if_differ_considerably(struct drbd_device *device,
3715	const char *s, sector_t a, sector_t b)
3716{
3717	sector_t d;
3718	if (a == 0 || b == 0)
3719		return;
3720	d = (a > b) ? (a - b) : (b - a);
3721	if (d > (a>>3) || d > (b>>3))
3722		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3723		     (unsigned long long)a, (unsigned long long)b);
3724}
3725
3726static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3727{
3728	struct drbd_peer_device *peer_device;
3729	struct drbd_device *device;
3730	struct p_sizes *p = pi->data;
3731	enum determine_dev_size dd = DS_UNCHANGED;
3732	sector_t p_size, p_usize, p_csize, my_usize;
3733	int ldsc = 0; /* local disk size changed */
3734	enum dds_flags ddsf;
3735
3736	peer_device = conn_peer_device(connection, pi->vnr);
3737	if (!peer_device)
3738		return config_unknown_volume(connection, pi);
3739	device = peer_device->device;
3740
3741	p_size = be64_to_cpu(p->d_size);
3742	p_usize = be64_to_cpu(p->u_size);
3743	p_csize = be64_to_cpu(p->c_size);
3744
3745	/* just store the peer's disk size for now.
3746	 * we still need to figure out whether we accept that. */
3747	device->p_size = p_size;
3748
3749	if (get_ldev(device)) {
3750		rcu_read_lock();
3751		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3752		rcu_read_unlock();
3753
3754		warn_if_differ_considerably(device, "lower level device sizes",
3755			   p_size, drbd_get_max_capacity(device->ldev));
3756		warn_if_differ_considerably(device, "user requested size",
3757					    p_usize, my_usize);
3758
3759		/* if this is the first connect, or an otherwise expected
3760		 * param exchange, choose the minimum */
3761		if (device->state.conn == C_WF_REPORT_PARAMS)
3762			p_usize = min_not_zero(my_usize, p_usize);
3763
3764		/* Never shrink a device with usable data during connect.
3765		   But allow online shrinking if we are connected. */
3766		if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3767		    drbd_get_capacity(device->this_bdev) &&
3768		    device->state.disk >= D_OUTDATED &&
3769		    device->state.conn < C_CONNECTED) {
3770			drbd_err(device, "The peer's disk size is too small!\n");
3771			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3772			put_ldev(device);
3773			return -EIO;
3774		}
3775
3776		if (my_usize != p_usize) {
3777			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3778
3779			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3780			if (!new_disk_conf) {
3781				drbd_err(device, "Allocation of new disk_conf failed\n");
3782				put_ldev(device);
3783				return -ENOMEM;
3784			}
3785
3786			mutex_lock(&connection->resource->conf_update);
3787			old_disk_conf = device->ldev->disk_conf;
3788			*new_disk_conf = *old_disk_conf;
3789			new_disk_conf->disk_size = p_usize;
3790
3791			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3792			mutex_unlock(&connection->resource->conf_update);
3793			synchronize_rcu();
3794			kfree(old_disk_conf);
3795
3796			drbd_info(device, "Peer sets u_size to %lu sectors\n",
3797				 (unsigned long)my_usize);
3798		}
3799
3800		put_ldev(device);
3801	}
3802
3803	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3804	/* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3805	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3806	   drbd_reconsider_max_bio_size(), we can be sure that after
3807	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3808
3809	ddsf = be16_to_cpu(p->dds_flags);
3810	if (get_ldev(device)) {
3811		drbd_reconsider_max_bio_size(device, device->ldev);
3812		dd = drbd_determine_dev_size(device, ddsf, NULL);
3813		put_ldev(device);
3814		if (dd == DS_ERROR)
3815			return -EIO;
3816		drbd_md_sync(device);
3817	} else {
3818		/*
3819		 * I am diskless, need to accept the peer's *current* size.
3820		 * I must NOT accept the peers backing disk size,
3821		 * it may have been larger than mine all along...
3822		 *
3823		 * At this point, the peer knows more about my disk, or at
3824		 * least about what we last agreed upon, than myself.
3825		 * So if his c_size is less than his d_size, the most likely
3826		 * reason is that *my* d_size was smaller last time we checked.
3827		 *
3828		 * However, if he sends a zero current size,
3829		 * take his (user-capped or) backing disk size anyways.
3830		 */
3831		drbd_reconsider_max_bio_size(device, NULL);
3832		drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
3833	}
3834
3835	if (get_ldev(device)) {
3836		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3837			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3838			ldsc = 1;
3839		}
3840
3841		put_ldev(device);
3842	}
3843
3844	if (device->state.conn > C_WF_REPORT_PARAMS) {
3845		if (be64_to_cpu(p->c_size) !=
3846		    drbd_get_capacity(device->this_bdev) || ldsc) {
3847			/* we have different sizes, probably peer
3848			 * needs to know my new size... */
3849			drbd_send_sizes(peer_device, 0, ddsf);
3850		}
3851		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3852		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3853			if (device->state.pdsk >= D_INCONSISTENT &&
3854			    device->state.disk >= D_INCONSISTENT) {
3855				if (ddsf & DDSF_NO_RESYNC)
3856					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3857				else
3858					resync_after_online_grow(device);
3859			} else
3860				set_bit(RESYNC_AFTER_NEG, &device->flags);
3861		}
3862	}
3863
3864	return 0;
3865}
3866
3867static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3868{
3869	struct drbd_peer_device *peer_device;
3870	struct drbd_device *device;
3871	struct p_uuids *p = pi->data;
3872	u64 *p_uuid;
3873	int i, updated_uuids = 0;
3874
3875	peer_device = conn_peer_device(connection, pi->vnr);
3876	if (!peer_device)
3877		return config_unknown_volume(connection, pi);
3878	device = peer_device->device;
3879
3880	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3881	if (!p_uuid) {
3882		drbd_err(device, "kmalloc of p_uuid failed\n");
3883		return false;
3884	}
3885
3886	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3887		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3888
3889	kfree(device->p_uuid);
3890	device->p_uuid = p_uuid;
3891
3892	if (device->state.conn < C_CONNECTED &&
3893	    device->state.disk < D_INCONSISTENT &&
3894	    device->state.role == R_PRIMARY &&
3895	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3896		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3897		    (unsigned long long)device->ed_uuid);
3898		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3899		return -EIO;
3900	}
3901
3902	if (get_ldev(device)) {
3903		int skip_initial_sync =
3904			device->state.conn == C_CONNECTED &&
3905			peer_device->connection->agreed_pro_version >= 90 &&
3906			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3907			(p_uuid[UI_FLAGS] & 8);
3908		if (skip_initial_sync) {
3909			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3910			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3911					"clear_n_write from receive_uuids",
3912					BM_LOCKED_TEST_ALLOWED);
3913			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3914			_drbd_uuid_set(device, UI_BITMAP, 0);
3915			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3916					CS_VERBOSE, NULL);
3917			drbd_md_sync(device);
3918			updated_uuids = 1;
3919		}
3920		put_ldev(device);
3921	} else if (device->state.disk < D_INCONSISTENT &&
3922		   device->state.role == R_PRIMARY) {
3923		/* I am a diskless primary, the peer just created a new current UUID
3924		   for me. */
3925		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3926	}
3927
3928	/* Before we test for the disk state, we should wait until an eventually
3929	   ongoing cluster wide state change is finished. That is important if
3930	   we are primary and are detaching from our disk. We need to see the
3931	   new disk state... */
3932	mutex_lock(device->state_mutex);
3933	mutex_unlock(device->state_mutex);
3934	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3935		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3936
3937	if (updated_uuids)
3938		drbd_print_uuids(device, "receiver updated UUIDs to");
3939
3940	return 0;
3941}
3942
3943/**
3944 * convert_state() - Converts the peer's view of the cluster state to our point of view
3945 * @ps:		The state as seen by the peer.
3946 */
3947static union drbd_state convert_state(union drbd_state ps)
3948{
3949	union drbd_state ms;
3950
3951	static enum drbd_conns c_tab[] = {
3952		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3953		[C_CONNECTED] = C_CONNECTED,
3954
3955		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3956		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3957		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3958		[C_VERIFY_S]       = C_VERIFY_T,
3959		[C_MASK]   = C_MASK,
3960	};
3961
3962	ms.i = ps.i;
3963
3964	ms.conn = c_tab[ps.conn];
3965	ms.peer = ps.role;
3966	ms.role = ps.peer;
3967	ms.pdsk = ps.disk;
3968	ms.disk = ps.pdsk;
3969	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3970
3971	return ms;
3972}
3973
3974static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3975{
3976	struct drbd_peer_device *peer_device;
3977	struct drbd_device *device;
3978	struct p_req_state *p = pi->data;
3979	union drbd_state mask, val;
3980	enum drbd_state_rv rv;
3981
3982	peer_device = conn_peer_device(connection, pi->vnr);
3983	if (!peer_device)
3984		return -EIO;
3985	device = peer_device->device;
3986
3987	mask.i = be32_to_cpu(p->mask);
3988	val.i = be32_to_cpu(p->val);
3989
3990	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3991	    mutex_is_locked(device->state_mutex)) {
3992		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3993		return 0;
3994	}
3995
3996	mask = convert_state(mask);
3997	val = convert_state(val);
3998
3999	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4000	drbd_send_sr_reply(peer_device, rv);
4001
4002	drbd_md_sync(device);
4003
4004	return 0;
4005}
4006
4007static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4008{
4009	struct p_req_state *p = pi->data;
4010	union drbd_state mask, val;
4011	enum drbd_state_rv rv;
4012
4013	mask.i = be32_to_cpu(p->mask);
4014	val.i = be32_to_cpu(p->val);
4015
4016	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4017	    mutex_is_locked(&connection->cstate_mutex)) {
4018		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4019		return 0;
4020	}
4021
4022	mask = convert_state(mask);
4023	val = convert_state(val);
4024
4025	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4026	conn_send_sr_reply(connection, rv);
4027
4028	return 0;
4029}
4030
4031static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4032{
4033	struct drbd_peer_device *peer_device;
4034	struct drbd_device *device;
4035	struct p_state *p = pi->data;
4036	union drbd_state os, ns, peer_state;
4037	enum drbd_disk_state real_peer_disk;
4038	enum chg_state_flags cs_flags;
4039	int rv;
4040
4041	peer_device = conn_peer_device(connection, pi->vnr);
4042	if (!peer_device)
4043		return config_unknown_volume(connection, pi);
4044	device = peer_device->device;
4045
4046	peer_state.i = be32_to_cpu(p->state);
4047
4048	real_peer_disk = peer_state.disk;
4049	if (peer_state.disk == D_NEGOTIATING) {
4050		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4051		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4052	}
4053
4054	spin_lock_irq(&device->resource->req_lock);
4055 retry:
4056	os = ns = drbd_read_state(device);
4057	spin_unlock_irq(&device->resource->req_lock);
4058
4059	/* If some other part of the code (asender thread, timeout)
4060	 * already decided to close the connection again,
4061	 * we must not "re-establish" it here. */
4062	if (os.conn <= C_TEAR_DOWN)
4063		return -ECONNRESET;
4064
4065	/* If this is the "end of sync" confirmation, usually the peer disk
4066	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4067	 * set) resync started in PausedSyncT, or if the timing of pause-/
4068	 * unpause-sync events has been "just right", the peer disk may
4069	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4070	 */
4071	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4072	    real_peer_disk == D_UP_TO_DATE &&
4073	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4074		/* If we are (becoming) SyncSource, but peer is still in sync
4075		 * preparation, ignore its uptodate-ness to avoid flapping, it
4076		 * will change to inconsistent once the peer reaches active
4077		 * syncing states.
4078		 * It may have changed syncer-paused flags, however, so we
4079		 * cannot ignore this completely. */
4080		if (peer_state.conn > C_CONNECTED &&
4081		    peer_state.conn < C_SYNC_SOURCE)
4082			real_peer_disk = D_INCONSISTENT;
4083
4084		/* if peer_state changes to connected at the same time,
4085		 * it explicitly notifies us that it finished resync.
4086		 * Maybe we should finish it up, too? */
4087		else if (os.conn >= C_SYNC_SOURCE &&
4088			 peer_state.conn == C_CONNECTED) {
4089			if (drbd_bm_total_weight(device) <= device->rs_failed)
4090				drbd_resync_finished(device);
4091			return 0;
4092		}
4093	}
4094
4095	/* explicit verify finished notification, stop sector reached. */
4096	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4097	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4098		ov_out_of_sync_print(device);
4099		drbd_resync_finished(device);
4100		return 0;
4101	}
4102
4103	/* peer says his disk is inconsistent, while we think it is uptodate,
4104	 * and this happens while the peer still thinks we have a sync going on,
4105	 * but we think we are already done with the sync.
4106	 * We ignore this to avoid flapping pdsk.
4107	 * This should not happen, if the peer is a recent version of drbd. */
4108	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4109	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4110		real_peer_disk = D_UP_TO_DATE;
4111
4112	if (ns.conn == C_WF_REPORT_PARAMS)
4113		ns.conn = C_CONNECTED;
4114
4115	if (peer_state.conn == C_AHEAD)
4116		ns.conn = C_BEHIND;
4117
4118	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4119	    get_ldev_if_state(device, D_NEGOTIATING)) {
4120		int cr; /* consider resync */
4121
4122		/* if we established a new connection */
4123		cr  = (os.conn < C_CONNECTED);
4124		/* if we had an established connection
4125		 * and one of the nodes newly attaches a disk */
4126		cr |= (os.conn == C_CONNECTED &&
4127		       (peer_state.disk == D_NEGOTIATING ||
4128			os.disk == D_NEGOTIATING));
4129		/* if we have both been inconsistent, and the peer has been
4130		 * forced to be UpToDate with --overwrite-data */
4131		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4132		/* if we had been plain connected, and the admin requested to
4133		 * start a sync by "invalidate" or "invalidate-remote" */
4134		cr |= (os.conn == C_CONNECTED &&
4135				(peer_state.conn >= C_STARTING_SYNC_S &&
4136				 peer_state.conn <= C_WF_BITMAP_T));
4137
4138		if (cr)
4139			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4140
4141		put_ldev(device);
4142		if (ns.conn == C_MASK) {
4143			ns.conn = C_CONNECTED;
4144			if (device->state.disk == D_NEGOTIATING) {
4145				drbd_force_state(device, NS(disk, D_FAILED));
4146			} else if (peer_state.disk == D_NEGOTIATING) {
4147				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4148				peer_state.disk = D_DISKLESS;
4149				real_peer_disk = D_DISKLESS;
4150			} else {
4151				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4152					return -EIO;
4153				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4154				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4155				return -EIO;
4156			}
4157		}
4158	}
4159
4160	spin_lock_irq(&device->resource->req_lock);
4161	if (os.i != drbd_read_state(device).i)
4162		goto retry;
4163	clear_bit(CONSIDER_RESYNC, &device->flags);
4164	ns.peer = peer_state.role;
4165	ns.pdsk = real_peer_disk;
4166	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4167	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4168		ns.disk = device->new_state_tmp.disk;
4169	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4170	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4171	    test_bit(NEW_CUR_UUID, &device->flags)) {
4172		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4173		   for temporal network outages! */
4174		spin_unlock_irq(&device->resource->req_lock);
4175		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4176		tl_clear(peer_device->connection);
4177		drbd_uuid_new_current(device);
4178		clear_bit(NEW_CUR_UUID, &device->flags);
4179		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4180		return -EIO;
4181	}
4182	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4183	ns = drbd_read_state(device);
4184	spin_unlock_irq(&device->resource->req_lock);
4185
4186	if (rv < SS_SUCCESS) {
4187		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4188		return -EIO;
4189	}
4190
4191	if (os.conn > C_WF_REPORT_PARAMS) {
4192		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4193		    peer_state.disk != D_NEGOTIATING ) {
4194			/* we want resync, peer has not yet decided to sync... */
4195			/* Nowadays only used when forcing a node into primary role and
4196			   setting its disk to UpToDate with that */
4197			drbd_send_uuids(peer_device);
4198			drbd_send_current_state(peer_device);
4199		}
4200	}
4201
4202	clear_bit(DISCARD_MY_DATA, &device->flags);
4203
4204	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4205
4206	return 0;
4207}
4208
4209static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4210{
4211	struct drbd_peer_device *peer_device;
4212	struct drbd_device *device;
4213	struct p_rs_uuid *p = pi->data;
4214
4215	peer_device = conn_peer_device(connection, pi->vnr);
4216	if (!peer_device)
4217		return -EIO;
4218	device = peer_device->device;
4219
4220	wait_event(device->misc_wait,
4221		   device->state.conn == C_WF_SYNC_UUID ||
4222		   device->state.conn == C_BEHIND ||
4223		   device->state.conn < C_CONNECTED ||
4224		   device->state.disk < D_NEGOTIATING);
4225
4226	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4227
4228	/* Here the _drbd_uuid_ functions are right, current should
4229	   _not_ be rotated into the history */
4230	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4231		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4232		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4233
4234		drbd_print_uuids(device, "updated sync uuid");
4235		drbd_start_resync(device, C_SYNC_TARGET);
4236
4237		put_ldev(device);
4238	} else
4239		drbd_err(device, "Ignoring SyncUUID packet!\n");
4240
4241	return 0;
4242}
4243
4244/**
4245 * receive_bitmap_plain
4246 *
4247 * Return 0 when done, 1 when another iteration is needed, and a negative error
4248 * code upon failure.
4249 */
4250static int
4251receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4252		     unsigned long *p, struct bm_xfer_ctx *c)
4253{
4254	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4255				 drbd_header_size(peer_device->connection);
4256	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4257				       c->bm_words - c->word_offset);
4258	unsigned int want = num_words * sizeof(*p);
4259	int err;
4260
4261	if (want != size) {
4262		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4263		return -EIO;
4264	}
4265	if (want == 0)
4266		return 0;
4267	err = drbd_recv_all(peer_device->connection, p, want);
4268	if (err)
4269		return err;
4270
4271	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4272
4273	c->word_offset += num_words;
4274	c->bit_offset = c->word_offset * BITS_PER_LONG;
4275	if (c->bit_offset > c->bm_bits)
4276		c->bit_offset = c->bm_bits;
4277
4278	return 1;
4279}
4280
4281static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4282{
4283	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4284}
4285
4286static int dcbp_get_start(struct p_compressed_bm *p)
4287{
4288	return (p->encoding & 0x80) != 0;
4289}
4290
4291static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4292{
4293	return (p->encoding >> 4) & 0x7;
4294}
4295
4296/**
4297 * recv_bm_rle_bits
4298 *
4299 * Return 0 when done, 1 when another iteration is needed, and a negative error
4300 * code upon failure.
4301 */
4302static int
4303recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4304		struct p_compressed_bm *p,
4305		 struct bm_xfer_ctx *c,
4306		 unsigned int len)
4307{
4308	struct bitstream bs;
4309	u64 look_ahead;
4310	u64 rl;
4311	u64 tmp;
4312	unsigned long s = c->bit_offset;
4313	unsigned long e;
4314	int toggle = dcbp_get_start(p);
4315	int have;
4316	int bits;
4317
4318	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4319
4320	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4321	if (bits < 0)
4322		return -EIO;
4323
4324	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4325		bits = vli_decode_bits(&rl, look_ahead);
4326		if (bits <= 0)
4327			return -EIO;
4328
4329		if (toggle) {
4330			e = s + rl -1;
4331			if (e >= c->bm_bits) {
4332				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4333				return -EIO;
4334			}
4335			_drbd_bm_set_bits(peer_device->device, s, e);
4336		}
4337
4338		if (have < bits) {
4339			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4340				have, bits, look_ahead,
4341				(unsigned int)(bs.cur.b - p->code),
4342				(unsigned int)bs.buf_len);
4343			return -EIO;
4344		}
4345		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4346		if (likely(bits < 64))
4347			look_ahead >>= bits;
4348		else
4349			look_ahead = 0;
4350		have -= bits;
4351
4352		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4353		if (bits < 0)
4354			return -EIO;
4355		look_ahead |= tmp << have;
4356		have += bits;
4357	}
4358
4359	c->bit_offset = s;
4360	bm_xfer_ctx_bit_to_word_offset(c);
4361
4362	return (s != c->bm_bits);
4363}
4364
4365/**
4366 * decode_bitmap_c
4367 *
4368 * Return 0 when done, 1 when another iteration is needed, and a negative error
4369 * code upon failure.
4370 */
4371static int
4372decode_bitmap_c(struct drbd_peer_device *peer_device,
4373		struct p_compressed_bm *p,
4374		struct bm_xfer_ctx *c,
4375		unsigned int len)
4376{
4377	if (dcbp_get_code(p) == RLE_VLI_Bits)
4378		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4379
4380	/* other variants had been implemented for evaluation,
4381	 * but have been dropped as this one turned out to be "best"
4382	 * during all our tests. */
4383
4384	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4385	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4386	return -EIO;
4387}
4388
4389void INFO_bm_xfer_stats(struct drbd_device *device,
4390		const char *direction, struct bm_xfer_ctx *c)
4391{
4392	/* what would it take to transfer it "plaintext" */
4393	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4394	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4395	unsigned int plain =
4396		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4397		c->bm_words * sizeof(unsigned long);
4398	unsigned int total = c->bytes[0] + c->bytes[1];
4399	unsigned int r;
4400
4401	/* total can not be zero. but just in case: */
4402	if (total == 0)
4403		return;
4404
4405	/* don't report if not compressed */
4406	if (total >= plain)
4407		return;
4408
4409	/* total < plain. check for overflow, still */
4410	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4411		                    : (1000 * total / plain);
4412
4413	if (r > 1000)
4414		r = 1000;
4415
4416	r = 1000 - r;
4417	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4418	     "total %u; compression: %u.%u%%\n",
4419			direction,
4420			c->bytes[1], c->packets[1],
4421			c->bytes[0], c->packets[0],
4422			total, r/10, r % 10);
4423}
4424
4425/* Since we are processing the bitfield from lower addresses to higher,
4426   it does not matter if the process it in 32 bit chunks or 64 bit
4427   chunks as long as it is little endian. (Understand it as byte stream,
4428   beginning with the lowest byte...) If we would use big endian
4429   we would need to process it from the highest address to the lowest,
4430   in order to be agnostic to the 32 vs 64 bits issue.
4431
4432   returns 0 on failure, 1 if we successfully received it. */
4433static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4434{
4435	struct drbd_peer_device *peer_device;
4436	struct drbd_device *device;
4437	struct bm_xfer_ctx c;
4438	int err;
4439
4440	peer_device = conn_peer_device(connection, pi->vnr);
4441	if (!peer_device)
4442		return -EIO;
4443	device = peer_device->device;
4444
4445	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4446	/* you are supposed to send additional out-of-sync information
4447	 * if you actually set bits during this phase */
4448
4449	c = (struct bm_xfer_ctx) {
4450		.bm_bits = drbd_bm_bits(device),
4451		.bm_words = drbd_bm_words(device),
4452	};
4453
4454	for(;;) {
4455		if (pi->cmd == P_BITMAP)
4456			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4457		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4458			/* MAYBE: sanity check that we speak proto >= 90,
4459			 * and the feature is enabled! */
4460			struct p_compressed_bm *p = pi->data;
4461
4462			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4463				drbd_err(device, "ReportCBitmap packet too large\n");
4464				err = -EIO;
4465				goto out;
4466			}
4467			if (pi->size <= sizeof(*p)) {
4468				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4469				err = -EIO;
4470				goto out;
4471			}
4472			err = drbd_recv_all(peer_device->connection, p, pi->size);
4473			if (err)
4474			       goto out;
4475			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4476		} else {
4477			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4478			err = -EIO;
4479			goto out;
4480		}
4481
4482		c.packets[pi->cmd == P_BITMAP]++;
4483		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4484
4485		if (err <= 0) {
4486			if (err < 0)
4487				goto out;
4488			break;
4489		}
4490		err = drbd_recv_header(peer_device->connection, pi);
4491		if (err)
4492			goto out;
4493	}
4494
4495	INFO_bm_xfer_stats(device, "receive", &c);
4496
4497	if (device->state.conn == C_WF_BITMAP_T) {
4498		enum drbd_state_rv rv;
4499
4500		err = drbd_send_bitmap(device);
4501		if (err)
4502			goto out;
4503		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4504		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4505		D_ASSERT(device, rv == SS_SUCCESS);
4506	} else if (device->state.conn != C_WF_BITMAP_S) {
4507		/* admin may have requested C_DISCONNECTING,
4508		 * other threads may have noticed network errors */
4509		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4510		    drbd_conn_str(device->state.conn));
4511	}
4512	err = 0;
4513
4514 out:
4515	drbd_bm_unlock(device);
4516	if (!err && device->state.conn == C_WF_BITMAP_S)
4517		drbd_start_resync(device, C_SYNC_SOURCE);
4518	return err;
4519}
4520
4521static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4522{
4523	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4524		 pi->cmd, pi->size);
4525
4526	return ignore_remaining_packet(connection, pi);
4527}
4528
4529static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4530{
4531	/* Make sure we've acked all the TCP data associated
4532	 * with the data requests being unplugged */
4533	drbd_tcp_quickack(connection->data.socket);
4534
4535	return 0;
4536}
4537
4538static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4539{
4540	struct drbd_peer_device *peer_device;
4541	struct drbd_device *device;
4542	struct p_block_desc *p = pi->data;
4543
4544	peer_device = conn_peer_device(connection, pi->vnr);
4545	if (!peer_device)
4546		return -EIO;
4547	device = peer_device->device;
4548
4549	switch (device->state.conn) {
4550	case C_WF_SYNC_UUID:
4551	case C_WF_BITMAP_T:
4552	case C_BEHIND:
4553			break;
4554	default:
4555		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4556				drbd_conn_str(device->state.conn));
4557	}
4558
4559	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4560
4561	return 0;
4562}
4563
4564struct data_cmd {
4565	int expect_payload;
4566	size_t pkt_size;
4567	int (*fn)(struct drbd_connection *, struct packet_info *);
4568};
4569
4570static struct data_cmd drbd_cmd_handler[] = {
4571	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
4572	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
4573	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4574	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4575	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
4576	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4577	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4578	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4579	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4580	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
4581	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4582	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4583	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
4584	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
4585	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
4586	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4587	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4588	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4589	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4590	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4591	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4592	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4593	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4594	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4595	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
4596};
4597
4598static void drbdd(struct drbd_connection *connection)
4599{
4600	struct packet_info pi;
4601	size_t shs; /* sub header size */
4602	int err;
4603
4604	while (get_t_state(&connection->receiver) == RUNNING) {
4605		struct data_cmd *cmd;
4606
4607		drbd_thread_current_set_cpu(&connection->receiver);
4608		update_receiver_timing_details(connection, drbd_recv_header);
4609		if (drbd_recv_header(connection, &pi))
4610			goto err_out;
4611
4612		cmd = &drbd_cmd_handler[pi.cmd];
4613		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4614			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4615				 cmdname(pi.cmd), pi.cmd);
4616			goto err_out;
4617		}
4618
4619		shs = cmd->pkt_size;
4620		if (pi.size > shs && !cmd->expect_payload) {
4621			drbd_err(connection, "No payload expected %s l:%d\n",
4622				 cmdname(pi.cmd), pi.size);
4623			goto err_out;
4624		}
4625
4626		if (shs) {
4627			update_receiver_timing_details(connection, drbd_recv_all_warn);
4628			err = drbd_recv_all_warn(connection, pi.data, shs);
4629			if (err)
4630				goto err_out;
4631			pi.size -= shs;
4632		}
4633
4634		update_receiver_timing_details(connection, cmd->fn);
4635		err = cmd->fn(connection, &pi);
4636		if (err) {
4637			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4638				 cmdname(pi.cmd), err, pi.size);
4639			goto err_out;
4640		}
4641	}
4642	return;
4643
4644    err_out:
4645	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4646}
4647
4648static void conn_disconnect(struct drbd_connection *connection)
4649{
4650	struct drbd_peer_device *peer_device;
4651	enum drbd_conns oc;
4652	int vnr;
4653
4654	if (connection->cstate == C_STANDALONE)
4655		return;
4656
4657	/* We are about to start the cleanup after connection loss.
4658	 * Make sure drbd_make_request knows about that.
4659	 * Usually we should be in some network failure state already,
4660	 * but just in case we are not, we fix it up here.
4661	 */
4662	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4663
4664	/* asender does not clean up anything. it must not interfere, either */
4665	drbd_thread_stop(&connection->asender);
4666	drbd_free_sock(connection);
4667
4668	rcu_read_lock();
4669	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4670		struct drbd_device *device = peer_device->device;
4671		kref_get(&device->kref);
4672		rcu_read_unlock();
4673		drbd_disconnected(peer_device);
4674		kref_put(&device->kref, drbd_destroy_device);
4675		rcu_read_lock();
4676	}
4677	rcu_read_unlock();
4678
4679	if (!list_empty(&connection->current_epoch->list))
4680		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4681	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4682	atomic_set(&connection->current_epoch->epoch_size, 0);
4683	connection->send.seen_any_write_yet = false;
4684
4685	drbd_info(connection, "Connection closed\n");
4686
4687	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4688		conn_try_outdate_peer_async(connection);
4689
4690	spin_lock_irq(&connection->resource->req_lock);
4691	oc = connection->cstate;
4692	if (oc >= C_UNCONNECTED)
4693		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4694
4695	spin_unlock_irq(&connection->resource->req_lock);
4696
4697	if (oc == C_DISCONNECTING)
4698		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4699}
4700
4701static int drbd_disconnected(struct drbd_peer_device *peer_device)
4702{
4703	struct drbd_device *device = peer_device->device;
4704	unsigned int i;
4705
4706	/* wait for current activity to cease. */
4707	spin_lock_irq(&device->resource->req_lock);
4708	_drbd_wait_ee_list_empty(device, &device->active_ee);
4709	_drbd_wait_ee_list_empty(device, &device->sync_ee);
4710	_drbd_wait_ee_list_empty(device, &device->read_ee);
4711	spin_unlock_irq(&device->resource->req_lock);
4712
4713	/* We do not have data structures that would allow us to
4714	 * get the rs_pending_cnt down to 0 again.
4715	 *  * On C_SYNC_TARGET we do not have any data structures describing
4716	 *    the pending RSDataRequest's we have sent.
4717	 *  * On C_SYNC_SOURCE there is no data structure that tracks
4718	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4719	 *  And no, it is not the sum of the reference counts in the
4720	 *  resync_LRU. The resync_LRU tracks the whole operation including
4721	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4722	 *  on the fly. */
4723	drbd_rs_cancel_all(device);
4724	device->rs_total = 0;
4725	device->rs_failed = 0;
4726	atomic_set(&device->rs_pending_cnt, 0);
4727	wake_up(&device->misc_wait);
4728
4729	del_timer_sync(&device->resync_timer);
4730	resync_timer_fn((unsigned long)device);
4731
4732	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4733	 * w_make_resync_request etc. which may still be on the worker queue
4734	 * to be "canceled" */
4735	drbd_flush_workqueue(&peer_device->connection->sender_work);
4736
4737	drbd_finish_peer_reqs(device);
4738
4739	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4740	   might have issued a work again. The one before drbd_finish_peer_reqs() is
4741	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4742	drbd_flush_workqueue(&peer_device->connection->sender_work);
4743
4744	/* need to do it again, drbd_finish_peer_reqs() may have populated it
4745	 * again via drbd_try_clear_on_disk_bm(). */
4746	drbd_rs_cancel_all(device);
4747
4748	kfree(device->p_uuid);
4749	device->p_uuid = NULL;
4750
4751	if (!drbd_suspended(device))
4752		tl_clear(peer_device->connection);
4753
4754	drbd_md_sync(device);
4755
4756	/* serialize with bitmap writeout triggered by the state change,
4757	 * if any. */
4758	wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4759
4760	/* tcp_close and release of sendpage pages can be deferred.  I don't
4761	 * want to use SO_LINGER, because apparently it can be deferred for
4762	 * more than 20 seconds (longest time I checked).
4763	 *
4764	 * Actually we don't care for exactly when the network stack does its
4765	 * put_page(), but release our reference on these pages right here.
4766	 */
4767	i = drbd_free_peer_reqs(device, &device->net_ee);
4768	if (i)
4769		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4770	i = atomic_read(&device->pp_in_use_by_net);
4771	if (i)
4772		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4773	i = atomic_read(&device->pp_in_use);
4774	if (i)
4775		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4776
4777	D_ASSERT(device, list_empty(&device->read_ee));
4778	D_ASSERT(device, list_empty(&device->active_ee));
4779	D_ASSERT(device, list_empty(&device->sync_ee));
4780	D_ASSERT(device, list_empty(&device->done_ee));
4781
4782	return 0;
4783}
4784
4785/*
4786 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4787 * we can agree on is stored in agreed_pro_version.
4788 *
4789 * feature flags and the reserved array should be enough room for future
4790 * enhancements of the handshake protocol, and possible plugins...
4791 *
4792 * for now, they are expected to be zero, but ignored.
4793 */
4794static int drbd_send_features(struct drbd_connection *connection)
4795{
4796	struct drbd_socket *sock;
4797	struct p_connection_features *p;
4798
4799	sock = &connection->data;
4800	p = conn_prepare_command(connection, sock);
4801	if (!p)
4802		return -EIO;
4803	memset(p, 0, sizeof(*p));
4804	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4805	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4806	p->feature_flags = cpu_to_be32(PRO_FEATURES);
4807	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4808}
4809
4810/*
4811 * return values:
4812 *   1 yes, we have a valid connection
4813 *   0 oops, did not work out, please try again
4814 *  -1 peer talks different language,
4815 *     no point in trying again, please go standalone.
4816 */
4817static int drbd_do_features(struct drbd_connection *connection)
4818{
4819	/* ASSERT current == connection->receiver ... */
4820	struct p_connection_features *p;
4821	const int expect = sizeof(struct p_connection_features);
4822	struct packet_info pi;
4823	int err;
4824
4825	err = drbd_send_features(connection);
4826	if (err)
4827		return 0;
4828
4829	err = drbd_recv_header(connection, &pi);
4830	if (err)
4831		return 0;
4832
4833	if (pi.cmd != P_CONNECTION_FEATURES) {
4834		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4835			 cmdname(pi.cmd), pi.cmd);
4836		return -1;
4837	}
4838
4839	if (pi.size != expect) {
4840		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4841		     expect, pi.size);
4842		return -1;
4843	}
4844
4845	p = pi.data;
4846	err = drbd_recv_all_warn(connection, p, expect);
4847	if (err)
4848		return 0;
4849
4850	p->protocol_min = be32_to_cpu(p->protocol_min);
4851	p->protocol_max = be32_to_cpu(p->protocol_max);
4852	if (p->protocol_max == 0)
4853		p->protocol_max = p->protocol_min;
4854
4855	if (PRO_VERSION_MAX < p->protocol_min ||
4856	    PRO_VERSION_MIN > p->protocol_max)
4857		goto incompat;
4858
4859	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4860	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4861
4862	drbd_info(connection, "Handshake successful: "
4863	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
4864
4865	drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4866		  connection->agreed_features & FF_TRIM ? " " : " not ");
4867
4868	return 1;
4869
4870 incompat:
4871	drbd_err(connection, "incompatible DRBD dialects: "
4872	    "I support %d-%d, peer supports %d-%d\n",
4873	    PRO_VERSION_MIN, PRO_VERSION_MAX,
4874	    p->protocol_min, p->protocol_max);
4875	return -1;
4876}
4877
4878#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4879static int drbd_do_auth(struct drbd_connection *connection)
4880{
4881	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4882	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4883	return -1;
4884}
4885#else
4886#define CHALLENGE_LEN 64
4887
4888/* Return value:
4889	1 - auth succeeded,
4890	0 - failed, try again (network error),
4891	-1 - auth failed, don't try again.
4892*/
4893
4894static int drbd_do_auth(struct drbd_connection *connection)
4895{
4896	struct drbd_socket *sock;
4897	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4898	struct scatterlist sg;
4899	char *response = NULL;
4900	char *right_response = NULL;
4901	char *peers_ch = NULL;
4902	unsigned int key_len;
4903	char secret[SHARED_SECRET_MAX]; /* 64 byte */
4904	unsigned int resp_size;
4905	struct hash_desc desc;
4906	struct packet_info pi;
4907	struct net_conf *nc;
4908	int err, rv;
4909
4910	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4911
4912	rcu_read_lock();
4913	nc = rcu_dereference(connection->net_conf);
4914	key_len = strlen(nc->shared_secret);
4915	memcpy(secret, nc->shared_secret, key_len);
4916	rcu_read_unlock();
4917
4918	desc.tfm = connection->cram_hmac_tfm;
4919	desc.flags = 0;
4920
4921	rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4922	if (rv) {
4923		drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4924		rv = -1;
4925		goto fail;
4926	}
4927
4928	get_random_bytes(my_challenge, CHALLENGE_LEN);
4929
4930	sock = &connection->data;
4931	if (!conn_prepare_command(connection, sock)) {
4932		rv = 0;
4933		goto fail;
4934	}
4935	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4936				my_challenge, CHALLENGE_LEN);
4937	if (!rv)
4938		goto fail;
4939
4940	err = drbd_recv_header(connection, &pi);
4941	if (err) {
4942		rv = 0;
4943		goto fail;
4944	}
4945
4946	if (pi.cmd != P_AUTH_CHALLENGE) {
4947		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4948			 cmdname(pi.cmd), pi.cmd);
4949		rv = 0;
4950		goto fail;
4951	}
4952
4953	if (pi.size > CHALLENGE_LEN * 2) {
4954		drbd_err(connection, "expected AuthChallenge payload too big.\n");
4955		rv = -1;
4956		goto fail;
4957	}
4958
4959	if (pi.size < CHALLENGE_LEN) {
4960		drbd_err(connection, "AuthChallenge payload too small.\n");
4961		rv = -1;
4962		goto fail;
4963	}
4964
4965	peers_ch = kmalloc(pi.size, GFP_NOIO);
4966	if (peers_ch == NULL) {
4967		drbd_err(connection, "kmalloc of peers_ch failed\n");
4968		rv = -1;
4969		goto fail;
4970	}
4971
4972	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4973	if (err) {
4974		rv = 0;
4975		goto fail;
4976	}
4977
4978	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
4979		drbd_err(connection, "Peer presented the same challenge!\n");
4980		rv = -1;
4981		goto fail;
4982	}
4983
4984	resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4985	response = kmalloc(resp_size, GFP_NOIO);
4986	if (response == NULL) {
4987		drbd_err(connection, "kmalloc of response failed\n");
4988		rv = -1;
4989		goto fail;
4990	}
4991
4992	sg_init_table(&sg, 1);
4993	sg_set_buf(&sg, peers_ch, pi.size);
4994
4995	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4996	if (rv) {
4997		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4998		rv = -1;
4999		goto fail;
5000	}
5001
5002	if (!conn_prepare_command(connection, sock)) {
5003		rv = 0;
5004		goto fail;
5005	}
5006	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5007				response, resp_size);
5008	if (!rv)
5009		goto fail;
5010
5011	err = drbd_recv_header(connection, &pi);
5012	if (err) {
5013		rv = 0;
5014		goto fail;
5015	}
5016
5017	if (pi.cmd != P_AUTH_RESPONSE) {
5018		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5019			 cmdname(pi.cmd), pi.cmd);
5020		rv = 0;
5021		goto fail;
5022	}
5023
5024	if (pi.size != resp_size) {
5025		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5026		rv = 0;
5027		goto fail;
5028	}
5029
5030	err = drbd_recv_all_warn(connection, response , resp_size);
5031	if (err) {
5032		rv = 0;
5033		goto fail;
5034	}
5035
5036	right_response = kmalloc(resp_size, GFP_NOIO);
5037	if (right_response == NULL) {
5038		drbd_err(connection, "kmalloc of right_response failed\n");
5039		rv = -1;
5040		goto fail;
5041	}
5042
5043	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
5044
5045	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
5046	if (rv) {
5047		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5048		rv = -1;
5049		goto fail;
5050	}
5051
5052	rv = !memcmp(response, right_response, resp_size);
5053
5054	if (rv)
5055		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5056		     resp_size);
5057	else
5058		rv = -1;
5059
5060 fail:
5061	kfree(peers_ch);
5062	kfree(response);
5063	kfree(right_response);
5064
5065	return rv;
5066}
5067#endif
5068
5069int drbd_receiver(struct drbd_thread *thi)
5070{
5071	struct drbd_connection *connection = thi->connection;
5072	int h;
5073
5074	drbd_info(connection, "receiver (re)started\n");
5075
5076	do {
5077		h = conn_connect(connection);
5078		if (h == 0) {
5079			conn_disconnect(connection);
5080			schedule_timeout_interruptible(HZ);
5081		}
5082		if (h == -1) {
5083			drbd_warn(connection, "Discarding network configuration.\n");
5084			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5085		}
5086	} while (h == 0);
5087
5088	if (h > 0)
5089		drbdd(connection);
5090
5091	conn_disconnect(connection);
5092
5093	drbd_info(connection, "receiver terminated\n");
5094	return 0;
5095}
5096
5097/* ********* acknowledge sender ******** */
5098
5099static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5100{
5101	struct p_req_state_reply *p = pi->data;
5102	int retcode = be32_to_cpu(p->retcode);
5103
5104	if (retcode >= SS_SUCCESS) {
5105		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5106	} else {
5107		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5108		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5109			 drbd_set_st_err_str(retcode), retcode);
5110	}
5111	wake_up(&connection->ping_wait);
5112
5113	return 0;
5114}
5115
5116static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5117{
5118	struct drbd_peer_device *peer_device;
5119	struct drbd_device *device;
5120	struct p_req_state_reply *p = pi->data;
5121	int retcode = be32_to_cpu(p->retcode);
5122
5123	peer_device = conn_peer_device(connection, pi->vnr);
5124	if (!peer_device)
5125		return -EIO;
5126	device = peer_device->device;
5127
5128	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5129		D_ASSERT(device, connection->agreed_pro_version < 100);
5130		return got_conn_RqSReply(connection, pi);
5131	}
5132
5133	if (retcode >= SS_SUCCESS) {
5134		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5135	} else {
5136		set_bit(CL_ST_CHG_FAIL, &device->flags);
5137		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5138			drbd_set_st_err_str(retcode), retcode);
5139	}
5140	wake_up(&device->state_wait);
5141
5142	return 0;
5143}
5144
5145static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5146{
5147	return drbd_send_ping_ack(connection);
5148
5149}
5150
5151static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5152{
5153	/* restore idle timeout */
5154	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5155	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5156		wake_up(&connection->ping_wait);
5157
5158	return 0;
5159}
5160
5161static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5162{
5163	struct drbd_peer_device *peer_device;
5164	struct drbd_device *device;
5165	struct p_block_ack *p = pi->data;
5166	sector_t sector = be64_to_cpu(p->sector);
5167	int blksize = be32_to_cpu(p->blksize);
5168
5169	peer_device = conn_peer_device(connection, pi->vnr);
5170	if (!peer_device)
5171		return -EIO;
5172	device = peer_device->device;
5173
5174	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5175
5176	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5177
5178	if (get_ldev(device)) {
5179		drbd_rs_complete_io(device, sector);
5180		drbd_set_in_sync(device, sector, blksize);
5181		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5182		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5183		put_ldev(device);
5184	}
5185	dec_rs_pending(device);
5186	atomic_add(blksize >> 9, &device->rs_sect_in);
5187
5188	return 0;
5189}
5190
5191static int
5192validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5193			      struct rb_root *root, const char *func,
5194			      enum drbd_req_event what, bool missing_ok)
5195{
5196	struct drbd_request *req;
5197	struct bio_and_error m;
5198
5199	spin_lock_irq(&device->resource->req_lock);
5200	req = find_request(device, root, id, sector, missing_ok, func);
5201	if (unlikely(!req)) {
5202		spin_unlock_irq(&device->resource->req_lock);
5203		return -EIO;
5204	}
5205	__req_mod(req, what, &m);
5206	spin_unlock_irq(&device->resource->req_lock);
5207
5208	if (m.bio)
5209		complete_master_bio(device, &m);
5210	return 0;
5211}
5212
5213static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5214{
5215	struct drbd_peer_device *peer_device;
5216	struct drbd_device *device;
5217	struct p_block_ack *p = pi->data;
5218	sector_t sector = be64_to_cpu(p->sector);
5219	int blksize = be32_to_cpu(p->blksize);
5220	enum drbd_req_event what;
5221
5222	peer_device = conn_peer_device(connection, pi->vnr);
5223	if (!peer_device)
5224		return -EIO;
5225	device = peer_device->device;
5226
5227	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5228
5229	if (p->block_id == ID_SYNCER) {
5230		drbd_set_in_sync(device, sector, blksize);
5231		dec_rs_pending(device);
5232		return 0;
5233	}
5234	switch (pi->cmd) {
5235	case P_RS_WRITE_ACK:
5236		what = WRITE_ACKED_BY_PEER_AND_SIS;
5237		break;
5238	case P_WRITE_ACK:
5239		what = WRITE_ACKED_BY_PEER;
5240		break;
5241	case P_RECV_ACK:
5242		what = RECV_ACKED_BY_PEER;
5243		break;
5244	case P_SUPERSEDED:
5245		what = CONFLICT_RESOLVED;
5246		break;
5247	case P_RETRY_WRITE:
5248		what = POSTPONE_WRITE;
5249		break;
5250	default:
5251		BUG();
5252	}
5253
5254	return validate_req_change_req_state(device, p->block_id, sector,
5255					     &device->write_requests, __func__,
5256					     what, false);
5257}
5258
5259static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5260{
5261	struct drbd_peer_device *peer_device;
5262	struct drbd_device *device;
5263	struct p_block_ack *p = pi->data;
5264	sector_t sector = be64_to_cpu(p->sector);
5265	int size = be32_to_cpu(p->blksize);
5266	int err;
5267
5268	peer_device = conn_peer_device(connection, pi->vnr);
5269	if (!peer_device)
5270		return -EIO;
5271	device = peer_device->device;
5272
5273	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5274
5275	if (p->block_id == ID_SYNCER) {
5276		dec_rs_pending(device);
5277		drbd_rs_failed_io(device, sector, size);
5278		return 0;
5279	}
5280
5281	err = validate_req_change_req_state(device, p->block_id, sector,
5282					    &device->write_requests, __func__,
5283					    NEG_ACKED, true);
5284	if (err) {
5285		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5286		   The master bio might already be completed, therefore the
5287		   request is no longer in the collision hash. */
5288		/* In Protocol B we might already have got a P_RECV_ACK
5289		   but then get a P_NEG_ACK afterwards. */
5290		drbd_set_out_of_sync(device, sector, size);
5291	}
5292	return 0;
5293}
5294
5295static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5296{
5297	struct drbd_peer_device *peer_device;
5298	struct drbd_device *device;
5299	struct p_block_ack *p = pi->data;
5300	sector_t sector = be64_to_cpu(p->sector);
5301
5302	peer_device = conn_peer_device(connection, pi->vnr);
5303	if (!peer_device)
5304		return -EIO;
5305	device = peer_device->device;
5306
5307	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5308
5309	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5310	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5311
5312	return validate_req_change_req_state(device, p->block_id, sector,
5313					     &device->read_requests, __func__,
5314					     NEG_ACKED, false);
5315}
5316
5317static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5318{
5319	struct drbd_peer_device *peer_device;
5320	struct drbd_device *device;
5321	sector_t sector;
5322	int size;
5323	struct p_block_ack *p = pi->data;
5324
5325	peer_device = conn_peer_device(connection, pi->vnr);
5326	if (!peer_device)
5327		return -EIO;
5328	device = peer_device->device;
5329
5330	sector = be64_to_cpu(p->sector);
5331	size = be32_to_cpu(p->blksize);
5332
5333	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5334
5335	dec_rs_pending(device);
5336
5337	if (get_ldev_if_state(device, D_FAILED)) {
5338		drbd_rs_complete_io(device, sector);
5339		switch (pi->cmd) {
5340		case P_NEG_RS_DREPLY:
5341			drbd_rs_failed_io(device, sector, size);
5342		case P_RS_CANCEL:
5343			break;
5344		default:
5345			BUG();
5346		}
5347		put_ldev(device);
5348	}
5349
5350	return 0;
5351}
5352
5353static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5354{
5355	struct p_barrier_ack *p = pi->data;
5356	struct drbd_peer_device *peer_device;
5357	int vnr;
5358
5359	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5360
5361	rcu_read_lock();
5362	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5363		struct drbd_device *device = peer_device->device;
5364
5365		if (device->state.conn == C_AHEAD &&
5366		    atomic_read(&device->ap_in_flight) == 0 &&
5367		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5368			device->start_resync_timer.expires = jiffies + HZ;
5369			add_timer(&device->start_resync_timer);
5370		}
5371	}
5372	rcu_read_unlock();
5373
5374	return 0;
5375}
5376
5377static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5378{
5379	struct drbd_peer_device *peer_device;
5380	struct drbd_device *device;
5381	struct p_block_ack *p = pi->data;
5382	struct drbd_device_work *dw;
5383	sector_t sector;
5384	int size;
5385
5386	peer_device = conn_peer_device(connection, pi->vnr);
5387	if (!peer_device)
5388		return -EIO;
5389	device = peer_device->device;
5390
5391	sector = be64_to_cpu(p->sector);
5392	size = be32_to_cpu(p->blksize);
5393
5394	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5395
5396	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5397		drbd_ov_out_of_sync_found(device, sector, size);
5398	else
5399		ov_out_of_sync_print(device);
5400
5401	if (!get_ldev(device))
5402		return 0;
5403
5404	drbd_rs_complete_io(device, sector);
5405	dec_rs_pending(device);
5406
5407	--device->ov_left;
5408
5409	/* let's advance progress step marks only for every other megabyte */
5410	if ((device->ov_left & 0x200) == 0x200)
5411		drbd_advance_rs_marks(device, device->ov_left);
5412
5413	if (device->ov_left == 0) {
5414		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5415		if (dw) {
5416			dw->w.cb = w_ov_finished;
5417			dw->device = device;
5418			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5419		} else {
5420			drbd_err(device, "kmalloc(dw) failed.");
5421			ov_out_of_sync_print(device);
5422			drbd_resync_finished(device);
5423		}
5424	}
5425	put_ldev(device);
5426	return 0;
5427}
5428
5429static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5430{
5431	return 0;
5432}
5433
5434static int connection_finish_peer_reqs(struct drbd_connection *connection)
5435{
5436	struct drbd_peer_device *peer_device;
5437	int vnr, not_empty = 0;
5438
5439	do {
5440		clear_bit(SIGNAL_ASENDER, &connection->flags);
5441		flush_signals(current);
5442
5443		rcu_read_lock();
5444		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5445			struct drbd_device *device = peer_device->device;
5446			kref_get(&device->kref);
5447			rcu_read_unlock();
5448			if (drbd_finish_peer_reqs(device)) {
5449				kref_put(&device->kref, drbd_destroy_device);
5450				return 1;
5451			}
5452			kref_put(&device->kref, drbd_destroy_device);
5453			rcu_read_lock();
5454		}
5455		set_bit(SIGNAL_ASENDER, &connection->flags);
5456
5457		spin_lock_irq(&connection->resource->req_lock);
5458		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5459			struct drbd_device *device = peer_device->device;
5460			not_empty = !list_empty(&device->done_ee);
5461			if (not_empty)
5462				break;
5463		}
5464		spin_unlock_irq(&connection->resource->req_lock);
5465		rcu_read_unlock();
5466	} while (not_empty);
5467
5468	return 0;
5469}
5470
5471struct asender_cmd {
5472	size_t pkt_size;
5473	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5474};
5475
5476static struct asender_cmd asender_tbl[] = {
5477	[P_PING]	    = { 0, got_Ping },
5478	[P_PING_ACK]	    = { 0, got_PingAck },
5479	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5480	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5481	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5482	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5483	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5484	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5485	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5486	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5487	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5488	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5489	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5490	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5491	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5492	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5493	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5494};
5495
5496int drbd_asender(struct drbd_thread *thi)
5497{
5498	struct drbd_connection *connection = thi->connection;
5499	struct asender_cmd *cmd = NULL;
5500	struct packet_info pi;
5501	int rv;
5502	void *buf    = connection->meta.rbuf;
5503	int received = 0;
5504	unsigned int header_size = drbd_header_size(connection);
5505	int expect   = header_size;
5506	bool ping_timeout_active = false;
5507	struct net_conf *nc;
5508	int ping_timeo, tcp_cork, ping_int;
5509	struct sched_param param = { .sched_priority = 2 };
5510
5511	rv = sched_setscheduler(current, SCHED_RR, &param);
5512	if (rv < 0)
5513		drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5514
5515	while (get_t_state(thi) == RUNNING) {
5516		drbd_thread_current_set_cpu(thi);
5517
5518		rcu_read_lock();
5519		nc = rcu_dereference(connection->net_conf);
5520		ping_timeo = nc->ping_timeo;
5521		tcp_cork = nc->tcp_cork;
5522		ping_int = nc->ping_int;
5523		rcu_read_unlock();
5524
5525		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5526			if (drbd_send_ping(connection)) {
5527				drbd_err(connection, "drbd_send_ping has failed\n");
5528				goto reconnect;
5529			}
5530			connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5531			ping_timeout_active = true;
5532		}
5533
5534		/* TODO: conditionally cork; it may hurt latency if we cork without
5535		   much to send */
5536		if (tcp_cork)
5537			drbd_tcp_cork(connection->meta.socket);
5538		if (connection_finish_peer_reqs(connection)) {
5539			drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5540			goto reconnect;
5541		}
5542		/* but unconditionally uncork unless disabled */
5543		if (tcp_cork)
5544			drbd_tcp_uncork(connection->meta.socket);
5545
5546		/* short circuit, recv_msg would return EINTR anyways. */
5547		if (signal_pending(current))
5548			continue;
5549
5550		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5551		clear_bit(SIGNAL_ASENDER, &connection->flags);
5552
5553		flush_signals(current);
5554
5555		/* Note:
5556		 * -EINTR	 (on meta) we got a signal
5557		 * -EAGAIN	 (on meta) rcvtimeo expired
5558		 * -ECONNRESET	 other side closed the connection
5559		 * -ERESTARTSYS  (on data) we got a signal
5560		 * rv <  0	 other than above: unexpected error!
5561		 * rv == expected: full header or command
5562		 * rv <  expected: "woken" by signal during receive
5563		 * rv == 0	 : "connection shut down by peer"
5564		 */
5565received_more:
5566		if (likely(rv > 0)) {
5567			received += rv;
5568			buf	 += rv;
5569		} else if (rv == 0) {
5570			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5571				long t;
5572				rcu_read_lock();
5573				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5574				rcu_read_unlock();
5575
5576				t = wait_event_timeout(connection->ping_wait,
5577						       connection->cstate < C_WF_REPORT_PARAMS,
5578						       t);
5579				if (t)
5580					break;
5581			}
5582			drbd_err(connection, "meta connection shut down by peer.\n");
5583			goto reconnect;
5584		} else if (rv == -EAGAIN) {
5585			/* If the data socket received something meanwhile,
5586			 * that is good enough: peer is still alive. */
5587			if (time_after(connection->last_received,
5588				jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5589				continue;
5590			if (ping_timeout_active) {
5591				drbd_err(connection, "PingAck did not arrive in time.\n");
5592				goto reconnect;
5593			}
5594			set_bit(SEND_PING, &connection->flags);
5595			continue;
5596		} else if (rv == -EINTR) {
5597			continue;
5598		} else {
5599			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5600			goto reconnect;
5601		}
5602
5603		if (received == expect && cmd == NULL) {
5604			if (decode_header(connection, connection->meta.rbuf, &pi))
5605				goto reconnect;
5606			cmd = &asender_tbl[pi.cmd];
5607			if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5608				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5609					 cmdname(pi.cmd), pi.cmd);
5610				goto disconnect;
5611			}
5612			expect = header_size + cmd->pkt_size;
5613			if (pi.size != expect - header_size) {
5614				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5615					pi.cmd, pi.size);
5616				goto reconnect;
5617			}
5618		}
5619		if (received == expect) {
5620			bool err;
5621
5622			err = cmd->fn(connection, &pi);
5623			if (err) {
5624				drbd_err(connection, "%pf failed\n", cmd->fn);
5625				goto reconnect;
5626			}
5627
5628			connection->last_received = jiffies;
5629
5630			if (cmd == &asender_tbl[P_PING_ACK]) {
5631				/* restore idle timeout */
5632				connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5633				ping_timeout_active = false;
5634			}
5635
5636			buf	 = connection->meta.rbuf;
5637			received = 0;
5638			expect	 = header_size;
5639			cmd	 = NULL;
5640		}
5641		if (test_bit(SEND_PING, &connection->flags))
5642			continue;
5643		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, MSG_DONTWAIT);
5644		if (rv > 0)
5645			goto received_more;
5646	}
5647
5648	if (0) {
5649reconnect:
5650		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5651		conn_md_sync(connection);
5652	}
5653	if (0) {
5654disconnect:
5655		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5656	}
5657	clear_bit(SIGNAL_ASENDER, &connection->flags);
5658
5659	drbd_info(connection, "asender terminated\n");
5660
5661	return 0;
5662}
5663