1/*
2   drbd_worker.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24*/
25
26#include <linux/module.h>
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
39#include "drbd_protocol.h"
40#include "drbd_req.h"
41
42static int make_ov_request(struct drbd_device *, int);
43static int make_resync_request(struct drbd_device *, int);
44
45/* endio handlers:
46 *   drbd_md_endio (defined here)
47 *   drbd_request_endio (defined here)
48 *   drbd_peer_request_endio (defined here)
49 *   drbd_bm_endio (defined in drbd_bitmap.c)
50 *
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
55 *
56 */
57
58
59/* About the global_state_lock
60   Each state transition on an device holds a read lock. In case we have
61   to evaluate the resync after dependencies, we grab a write lock, because
62   we need stable states on all devices for that.  */
63rwlock_t global_state_lock;
64
65/* used for synchronous meta data and bitmap IO
66 * submitted by drbd_md_sync_page_io()
67 */
68void drbd_md_endio(struct bio *bio, int error)
69{
70	struct drbd_device *device;
71
72	device = bio->bi_private;
73	device->md_io.error = error;
74
75	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
76	 * to timeout on the lower level device, and eventually detach from it.
77	 * If this io completion runs after that timeout expired, this
78	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
79	 * During normal operation, this only puts that extra reference
80	 * down to 1 again.
81	 * Make sure we first drop the reference, and only then signal
82	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
83	 * next drbd_md_sync_page_io(), that we trigger the
84	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
85	 */
86	drbd_md_put_buffer(device);
87	device->md_io.done = 1;
88	wake_up(&device->misc_wait);
89	bio_put(bio);
90	if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
91		put_ldev(device);
92}
93
94/* reads on behalf of the partner,
95 * "submitted" by the receiver
96 */
97static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
98{
99	unsigned long flags = 0;
100	struct drbd_peer_device *peer_device = peer_req->peer_device;
101	struct drbd_device *device = peer_device->device;
102
103	spin_lock_irqsave(&device->resource->req_lock, flags);
104	device->read_cnt += peer_req->i.size >> 9;
105	list_del(&peer_req->w.list);
106	if (list_empty(&device->read_ee))
107		wake_up(&device->ee_wait);
108	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
109		__drbd_chk_io_error(device, DRBD_READ_ERROR);
110	spin_unlock_irqrestore(&device->resource->req_lock, flags);
111
112	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
113	put_ldev(device);
114}
115
116/* writes on behalf of the partner, or resync writes,
117 * "submitted" by the receiver, final stage.  */
118void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
119{
120	unsigned long flags = 0;
121	struct drbd_peer_device *peer_device = peer_req->peer_device;
122	struct drbd_device *device = peer_device->device;
123	struct drbd_interval i;
124	int do_wake;
125	u64 block_id;
126	int do_al_complete_io;
127
128	/* after we moved peer_req to done_ee,
129	 * we may no longer access it,
130	 * it may be freed/reused already!
131	 * (as soon as we release the req_lock) */
132	i = peer_req->i;
133	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
134	block_id = peer_req->block_id;
135	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
136
137	spin_lock_irqsave(&device->resource->req_lock, flags);
138	device->writ_cnt += peer_req->i.size >> 9;
139	list_move_tail(&peer_req->w.list, &device->done_ee);
140
141	/*
142	 * Do not remove from the write_requests tree here: we did not send the
143	 * Ack yet and did not wake possibly waiting conflicting requests.
144	 * Removed from the tree from "drbd_process_done_ee" within the
145	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
146	 * _drbd_clear_done_ee.
147	 */
148
149	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
150
151	/* FIXME do we want to detach for failed REQ_DISCARD?
152	 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
153	if (peer_req->flags & EE_WAS_ERROR)
154		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
155	spin_unlock_irqrestore(&device->resource->req_lock, flags);
156
157	if (block_id == ID_SYNCER)
158		drbd_rs_complete_io(device, i.sector);
159
160	if (do_wake)
161		wake_up(&device->ee_wait);
162
163	if (do_al_complete_io)
164		drbd_al_complete_io(device, &i);
165
166	wake_asender(peer_device->connection);
167	put_ldev(device);
168}
169
170/* writes on behalf of the partner, or resync writes,
171 * "submitted" by the receiver.
172 */
173void drbd_peer_request_endio(struct bio *bio, int error)
174{
175	struct drbd_peer_request *peer_req = bio->bi_private;
176	struct drbd_device *device = peer_req->peer_device->device;
177	int uptodate = bio_flagged(bio, BIO_UPTODATE);
178	int is_write = bio_data_dir(bio) == WRITE;
179	int is_discard = !!(bio->bi_rw & REQ_DISCARD);
180
181	if (error && __ratelimit(&drbd_ratelimit_state))
182		drbd_warn(device, "%s: error=%d s=%llus\n",
183				is_write ? (is_discard ? "discard" : "write")
184					: "read", error,
185				(unsigned long long)peer_req->i.sector);
186	if (!error && !uptodate) {
187		if (__ratelimit(&drbd_ratelimit_state))
188			drbd_warn(device, "%s: setting error to -EIO s=%llus\n",
189					is_write ? "write" : "read",
190					(unsigned long long)peer_req->i.sector);
191		/* strange behavior of some lower level drivers...
192		 * fail the request by clearing the uptodate flag,
193		 * but do not return any error?! */
194		error = -EIO;
195	}
196
197	if (error)
198		set_bit(__EE_WAS_ERROR, &peer_req->flags);
199
200	bio_put(bio); /* no need for the bio anymore */
201	if (atomic_dec_and_test(&peer_req->pending_bios)) {
202		if (is_write)
203			drbd_endio_write_sec_final(peer_req);
204		else
205			drbd_endio_read_sec_final(peer_req);
206	}
207}
208
209/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
210 */
211void drbd_request_endio(struct bio *bio, int error)
212{
213	unsigned long flags;
214	struct drbd_request *req = bio->bi_private;
215	struct drbd_device *device = req->device;
216	struct bio_and_error m;
217	enum drbd_req_event what;
218	int uptodate = bio_flagged(bio, BIO_UPTODATE);
219
220	if (!error && !uptodate) {
221		drbd_warn(device, "p %s: setting error to -EIO\n",
222			 bio_data_dir(bio) == WRITE ? "write" : "read");
223		/* strange behavior of some lower level drivers...
224		 * fail the request by clearing the uptodate flag,
225		 * but do not return any error?! */
226		error = -EIO;
227	}
228
229
230	/* If this request was aborted locally before,
231	 * but now was completed "successfully",
232	 * chances are that this caused arbitrary data corruption.
233	 *
234	 * "aborting" requests, or force-detaching the disk, is intended for
235	 * completely blocked/hung local backing devices which do no longer
236	 * complete requests at all, not even do error completions.  In this
237	 * situation, usually a hard-reset and failover is the only way out.
238	 *
239	 * By "aborting", basically faking a local error-completion,
240	 * we allow for a more graceful swichover by cleanly migrating services.
241	 * Still the affected node has to be rebooted "soon".
242	 *
243	 * By completing these requests, we allow the upper layers to re-use
244	 * the associated data pages.
245	 *
246	 * If later the local backing device "recovers", and now DMAs some data
247	 * from disk into the original request pages, in the best case it will
248	 * just put random data into unused pages; but typically it will corrupt
249	 * meanwhile completely unrelated data, causing all sorts of damage.
250	 *
251	 * Which means delayed successful completion,
252	 * especially for READ requests,
253	 * is a reason to panic().
254	 *
255	 * We assume that a delayed *error* completion is OK,
256	 * though we still will complain noisily about it.
257	 */
258	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
259		if (__ratelimit(&drbd_ratelimit_state))
260			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
261
262		if (!error)
263			panic("possible random memory corruption caused by delayed completion of aborted local request\n");
264	}
265
266	/* to avoid recursion in __req_mod */
267	if (unlikely(error)) {
268		if (bio->bi_rw & REQ_DISCARD)
269			what = (error == -EOPNOTSUPP)
270				? DISCARD_COMPLETED_NOTSUPP
271				: DISCARD_COMPLETED_WITH_ERROR;
272		else
273			what = (bio_data_dir(bio) == WRITE)
274			? WRITE_COMPLETED_WITH_ERROR
275			: (bio_rw(bio) == READ)
276			  ? READ_COMPLETED_WITH_ERROR
277			  : READ_AHEAD_COMPLETED_WITH_ERROR;
278	} else
279		what = COMPLETED_OK;
280
281	bio_put(req->private_bio);
282	req->private_bio = ERR_PTR(error);
283
284	/* not req_mod(), we need irqsave here! */
285	spin_lock_irqsave(&device->resource->req_lock, flags);
286	__req_mod(req, what, &m);
287	spin_unlock_irqrestore(&device->resource->req_lock, flags);
288	put_ldev(device);
289
290	if (m.bio)
291		complete_master_bio(device, &m);
292}
293
294void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
295{
296	struct hash_desc desc;
297	struct scatterlist sg;
298	struct page *page = peer_req->pages;
299	struct page *tmp;
300	unsigned len;
301
302	desc.tfm = tfm;
303	desc.flags = 0;
304
305	sg_init_table(&sg, 1);
306	crypto_hash_init(&desc);
307
308	while ((tmp = page_chain_next(page))) {
309		/* all but the last page will be fully used */
310		sg_set_page(&sg, page, PAGE_SIZE, 0);
311		crypto_hash_update(&desc, &sg, sg.length);
312		page = tmp;
313	}
314	/* and now the last, possibly only partially used page */
315	len = peer_req->i.size & (PAGE_SIZE - 1);
316	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
317	crypto_hash_update(&desc, &sg, sg.length);
318	crypto_hash_final(&desc, digest);
319}
320
321void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
322{
323	struct hash_desc desc;
324	struct scatterlist sg;
325	struct bio_vec bvec;
326	struct bvec_iter iter;
327
328	desc.tfm = tfm;
329	desc.flags = 0;
330
331	sg_init_table(&sg, 1);
332	crypto_hash_init(&desc);
333
334	bio_for_each_segment(bvec, bio, iter) {
335		sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
336		crypto_hash_update(&desc, &sg, sg.length);
337	}
338	crypto_hash_final(&desc, digest);
339}
340
341/* MAYBE merge common code with w_e_end_ov_req */
342static int w_e_send_csum(struct drbd_work *w, int cancel)
343{
344	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
345	struct drbd_peer_device *peer_device = peer_req->peer_device;
346	struct drbd_device *device = peer_device->device;
347	int digest_size;
348	void *digest;
349	int err = 0;
350
351	if (unlikely(cancel))
352		goto out;
353
354	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
355		goto out;
356
357	digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
358	digest = kmalloc(digest_size, GFP_NOIO);
359	if (digest) {
360		sector_t sector = peer_req->i.sector;
361		unsigned int size = peer_req->i.size;
362		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
363		/* Free peer_req and pages before send.
364		 * In case we block on congestion, we could otherwise run into
365		 * some distributed deadlock, if the other side blocks on
366		 * congestion as well, because our receiver blocks in
367		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
368		drbd_free_peer_req(device, peer_req);
369		peer_req = NULL;
370		inc_rs_pending(device);
371		err = drbd_send_drequest_csum(peer_device, sector, size,
372					      digest, digest_size,
373					      P_CSUM_RS_REQUEST);
374		kfree(digest);
375	} else {
376		drbd_err(device, "kmalloc() of digest failed.\n");
377		err = -ENOMEM;
378	}
379
380out:
381	if (peer_req)
382		drbd_free_peer_req(device, peer_req);
383
384	if (unlikely(err))
385		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
386	return err;
387}
388
389#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
390
391static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
392{
393	struct drbd_device *device = peer_device->device;
394	struct drbd_peer_request *peer_req;
395
396	if (!get_ldev(device))
397		return -EIO;
398
399	/* GFP_TRY, because if there is no memory available right now, this may
400	 * be rescheduled for later. It is "only" background resync, after all. */
401	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
402				       size, true /* has real payload */, GFP_TRY);
403	if (!peer_req)
404		goto defer;
405
406	peer_req->w.cb = w_e_send_csum;
407	spin_lock_irq(&device->resource->req_lock);
408	list_add_tail(&peer_req->w.list, &device->read_ee);
409	spin_unlock_irq(&device->resource->req_lock);
410
411	atomic_add(size >> 9, &device->rs_sect_ev);
412	if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
413		return 0;
414
415	/* If it failed because of ENOMEM, retry should help.  If it failed
416	 * because bio_add_page failed (probably broken lower level driver),
417	 * retry may or may not help.
418	 * If it does not, you may need to force disconnect. */
419	spin_lock_irq(&device->resource->req_lock);
420	list_del(&peer_req->w.list);
421	spin_unlock_irq(&device->resource->req_lock);
422
423	drbd_free_peer_req(device, peer_req);
424defer:
425	put_ldev(device);
426	return -EAGAIN;
427}
428
429int w_resync_timer(struct drbd_work *w, int cancel)
430{
431	struct drbd_device *device =
432		container_of(w, struct drbd_device, resync_work);
433
434	switch (device->state.conn) {
435	case C_VERIFY_S:
436		make_ov_request(device, cancel);
437		break;
438	case C_SYNC_TARGET:
439		make_resync_request(device, cancel);
440		break;
441	}
442
443	return 0;
444}
445
446void resync_timer_fn(unsigned long data)
447{
448	struct drbd_device *device = (struct drbd_device *) data;
449
450	drbd_queue_work_if_unqueued(
451		&first_peer_device(device)->connection->sender_work,
452		&device->resync_work);
453}
454
455static void fifo_set(struct fifo_buffer *fb, int value)
456{
457	int i;
458
459	for (i = 0; i < fb->size; i++)
460		fb->values[i] = value;
461}
462
463static int fifo_push(struct fifo_buffer *fb, int value)
464{
465	int ov;
466
467	ov = fb->values[fb->head_index];
468	fb->values[fb->head_index++] = value;
469
470	if (fb->head_index >= fb->size)
471		fb->head_index = 0;
472
473	return ov;
474}
475
476static void fifo_add_val(struct fifo_buffer *fb, int value)
477{
478	int i;
479
480	for (i = 0; i < fb->size; i++)
481		fb->values[i] += value;
482}
483
484struct fifo_buffer *fifo_alloc(int fifo_size)
485{
486	struct fifo_buffer *fb;
487
488	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
489	if (!fb)
490		return NULL;
491
492	fb->head_index = 0;
493	fb->size = fifo_size;
494	fb->total = 0;
495
496	return fb;
497}
498
499static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
500{
501	struct disk_conf *dc;
502	unsigned int want;     /* The number of sectors we want in-flight */
503	int req_sect; /* Number of sectors to request in this turn */
504	int correction; /* Number of sectors more we need in-flight */
505	int cps; /* correction per invocation of drbd_rs_controller() */
506	int steps; /* Number of time steps to plan ahead */
507	int curr_corr;
508	int max_sect;
509	struct fifo_buffer *plan;
510
511	dc = rcu_dereference(device->ldev->disk_conf);
512	plan = rcu_dereference(device->rs_plan_s);
513
514	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
515
516	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
517		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
518	} else { /* normal path */
519		want = dc->c_fill_target ? dc->c_fill_target :
520			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
521	}
522
523	correction = want - device->rs_in_flight - plan->total;
524
525	/* Plan ahead */
526	cps = correction / steps;
527	fifo_add_val(plan, cps);
528	plan->total += cps * steps;
529
530	/* What we do in this step */
531	curr_corr = fifo_push(plan, 0);
532	plan->total -= curr_corr;
533
534	req_sect = sect_in + curr_corr;
535	if (req_sect < 0)
536		req_sect = 0;
537
538	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
539	if (req_sect > max_sect)
540		req_sect = max_sect;
541
542	/*
543	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
544		 sect_in, device->rs_in_flight, want, correction,
545		 steps, cps, device->rs_planed, curr_corr, req_sect);
546	*/
547
548	return req_sect;
549}
550
551static int drbd_rs_number_requests(struct drbd_device *device)
552{
553	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
554	int number, mxb;
555
556	sect_in = atomic_xchg(&device->rs_sect_in, 0);
557	device->rs_in_flight -= sect_in;
558
559	rcu_read_lock();
560	mxb = drbd_get_max_buffers(device) / 2;
561	if (rcu_dereference(device->rs_plan_s)->size) {
562		number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
563		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
564	} else {
565		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
566		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
567	}
568	rcu_read_unlock();
569
570	/* Don't have more than "max-buffers"/2 in-flight.
571	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
572	 * potentially causing a distributed deadlock on congestion during
573	 * online-verify or (checksum-based) resync, if max-buffers,
574	 * socket buffer sizes and resync rate settings are mis-configured. */
575
576	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
577	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
578	 * "number of pages" (typically also 4k),
579	 * but "rs_in_flight" is in "sectors" (512 Byte). */
580	if (mxb - device->rs_in_flight/8 < number)
581		number = mxb - device->rs_in_flight/8;
582
583	return number;
584}
585
586static int make_resync_request(struct drbd_device *const device, int cancel)
587{
588	struct drbd_peer_device *const peer_device = first_peer_device(device);
589	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
590	unsigned long bit;
591	sector_t sector;
592	const sector_t capacity = drbd_get_capacity(device->this_bdev);
593	int max_bio_size;
594	int number, rollback_i, size;
595	int align, requeue = 0;
596	int i = 0;
597
598	if (unlikely(cancel))
599		return 0;
600
601	if (device->rs_total == 0) {
602		/* empty resync? */
603		drbd_resync_finished(device);
604		return 0;
605	}
606
607	if (!get_ldev(device)) {
608		/* Since we only need to access device->rsync a
609		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
610		   to continue resync with a broken disk makes no sense at
611		   all */
612		drbd_err(device, "Disk broke down during resync!\n");
613		return 0;
614	}
615
616	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
617	number = drbd_rs_number_requests(device);
618	if (number <= 0)
619		goto requeue;
620
621	for (i = 0; i < number; i++) {
622		/* Stop generating RS requests when half of the send buffer is filled,
623		 * but notify TCP that we'd like to have more space. */
624		mutex_lock(&connection->data.mutex);
625		if (connection->data.socket) {
626			struct sock *sk = connection->data.socket->sk;
627			int queued = sk->sk_wmem_queued;
628			int sndbuf = sk->sk_sndbuf;
629			if (queued > sndbuf / 2) {
630				requeue = 1;
631				if (sk->sk_socket)
632					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
633			}
634		} else
635			requeue = 1;
636		mutex_unlock(&connection->data.mutex);
637		if (requeue)
638			goto requeue;
639
640next_sector:
641		size = BM_BLOCK_SIZE;
642		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
643
644		if (bit == DRBD_END_OF_BITMAP) {
645			device->bm_resync_fo = drbd_bm_bits(device);
646			put_ldev(device);
647			return 0;
648		}
649
650		sector = BM_BIT_TO_SECT(bit);
651
652		if (drbd_try_rs_begin_io(device, sector)) {
653			device->bm_resync_fo = bit;
654			goto requeue;
655		}
656		device->bm_resync_fo = bit + 1;
657
658		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
659			drbd_rs_complete_io(device, sector);
660			goto next_sector;
661		}
662
663#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
664		/* try to find some adjacent bits.
665		 * we stop if we have already the maximum req size.
666		 *
667		 * Additionally always align bigger requests, in order to
668		 * be prepared for all stripe sizes of software RAIDs.
669		 */
670		align = 1;
671		rollback_i = i;
672		while (i < number) {
673			if (size + BM_BLOCK_SIZE > max_bio_size)
674				break;
675
676			/* Be always aligned */
677			if (sector & ((1<<(align+3))-1))
678				break;
679
680			/* do not cross extent boundaries */
681			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
682				break;
683			/* now, is it actually dirty, after all?
684			 * caution, drbd_bm_test_bit is tri-state for some
685			 * obscure reason; ( b == 0 ) would get the out-of-band
686			 * only accidentally right because of the "oddly sized"
687			 * adjustment below */
688			if (drbd_bm_test_bit(device, bit+1) != 1)
689				break;
690			bit++;
691			size += BM_BLOCK_SIZE;
692			if ((BM_BLOCK_SIZE << align) <= size)
693				align++;
694			i++;
695		}
696		/* if we merged some,
697		 * reset the offset to start the next drbd_bm_find_next from */
698		if (size > BM_BLOCK_SIZE)
699			device->bm_resync_fo = bit + 1;
700#endif
701
702		/* adjust very last sectors, in case we are oddly sized */
703		if (sector + (size>>9) > capacity)
704			size = (capacity-sector)<<9;
705
706		if (device->use_csums) {
707			switch (read_for_csum(peer_device, sector, size)) {
708			case -EIO: /* Disk failure */
709				put_ldev(device);
710				return -EIO;
711			case -EAGAIN: /* allocation failed, or ldev busy */
712				drbd_rs_complete_io(device, sector);
713				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
714				i = rollback_i;
715				goto requeue;
716			case 0:
717				/* everything ok */
718				break;
719			default:
720				BUG();
721			}
722		} else {
723			int err;
724
725			inc_rs_pending(device);
726			err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
727						 sector, size, ID_SYNCER);
728			if (err) {
729				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
730				dec_rs_pending(device);
731				put_ldev(device);
732				return err;
733			}
734		}
735	}
736
737	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
738		/* last syncer _request_ was sent,
739		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
740		 * next sync group will resume), as soon as we receive the last
741		 * resync data block, and the last bit is cleared.
742		 * until then resync "work" is "inactive" ...
743		 */
744		put_ldev(device);
745		return 0;
746	}
747
748 requeue:
749	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
750	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
751	put_ldev(device);
752	return 0;
753}
754
755static int make_ov_request(struct drbd_device *device, int cancel)
756{
757	int number, i, size;
758	sector_t sector;
759	const sector_t capacity = drbd_get_capacity(device->this_bdev);
760	bool stop_sector_reached = false;
761
762	if (unlikely(cancel))
763		return 1;
764
765	number = drbd_rs_number_requests(device);
766
767	sector = device->ov_position;
768	for (i = 0; i < number; i++) {
769		if (sector >= capacity)
770			return 1;
771
772		/* We check for "finished" only in the reply path:
773		 * w_e_end_ov_reply().
774		 * We need to send at least one request out. */
775		stop_sector_reached = i > 0
776			&& verify_can_do_stop_sector(device)
777			&& sector >= device->ov_stop_sector;
778		if (stop_sector_reached)
779			break;
780
781		size = BM_BLOCK_SIZE;
782
783		if (drbd_try_rs_begin_io(device, sector)) {
784			device->ov_position = sector;
785			goto requeue;
786		}
787
788		if (sector + (size>>9) > capacity)
789			size = (capacity-sector)<<9;
790
791		inc_rs_pending(device);
792		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
793			dec_rs_pending(device);
794			return 0;
795		}
796		sector += BM_SECT_PER_BIT;
797	}
798	device->ov_position = sector;
799
800 requeue:
801	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
802	if (i == 0 || !stop_sector_reached)
803		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
804	return 1;
805}
806
807int w_ov_finished(struct drbd_work *w, int cancel)
808{
809	struct drbd_device_work *dw =
810		container_of(w, struct drbd_device_work, w);
811	struct drbd_device *device = dw->device;
812	kfree(dw);
813	ov_out_of_sync_print(device);
814	drbd_resync_finished(device);
815
816	return 0;
817}
818
819static int w_resync_finished(struct drbd_work *w, int cancel)
820{
821	struct drbd_device_work *dw =
822		container_of(w, struct drbd_device_work, w);
823	struct drbd_device *device = dw->device;
824	kfree(dw);
825
826	drbd_resync_finished(device);
827
828	return 0;
829}
830
831static void ping_peer(struct drbd_device *device)
832{
833	struct drbd_connection *connection = first_peer_device(device)->connection;
834
835	clear_bit(GOT_PING_ACK, &connection->flags);
836	request_ping(connection);
837	wait_event(connection->ping_wait,
838		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
839}
840
841int drbd_resync_finished(struct drbd_device *device)
842{
843	unsigned long db, dt, dbdt;
844	unsigned long n_oos;
845	union drbd_state os, ns;
846	struct drbd_device_work *dw;
847	char *khelper_cmd = NULL;
848	int verify_done = 0;
849
850	/* Remove all elements from the resync LRU. Since future actions
851	 * might set bits in the (main) bitmap, then the entries in the
852	 * resync LRU would be wrong. */
853	if (drbd_rs_del_all(device)) {
854		/* In case this is not possible now, most probably because
855		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
856		 * queue (or even the read operations for those packets
857		 * is not finished by now).   Retry in 100ms. */
858
859		schedule_timeout_interruptible(HZ / 10);
860		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
861		if (dw) {
862			dw->w.cb = w_resync_finished;
863			dw->device = device;
864			drbd_queue_work(&first_peer_device(device)->connection->sender_work,
865					&dw->w);
866			return 1;
867		}
868		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
869	}
870
871	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
872	if (dt <= 0)
873		dt = 1;
874
875	db = device->rs_total;
876	/* adjust for verify start and stop sectors, respective reached position */
877	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
878		db -= device->ov_left;
879
880	dbdt = Bit2KB(db/dt);
881	device->rs_paused /= HZ;
882
883	if (!get_ldev(device))
884		goto out;
885
886	ping_peer(device);
887
888	spin_lock_irq(&device->resource->req_lock);
889	os = drbd_read_state(device);
890
891	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
892
893	/* This protects us against multiple calls (that can happen in the presence
894	   of application IO), and against connectivity loss just before we arrive here. */
895	if (os.conn <= C_CONNECTED)
896		goto out_unlock;
897
898	ns = os;
899	ns.conn = C_CONNECTED;
900
901	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
902	     verify_done ? "Online verify" : "Resync",
903	     dt + device->rs_paused, device->rs_paused, dbdt);
904
905	n_oos = drbd_bm_total_weight(device);
906
907	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
908		if (n_oos) {
909			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
910			      n_oos, Bit2KB(1));
911			khelper_cmd = "out-of-sync";
912		}
913	} else {
914		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
915
916		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
917			khelper_cmd = "after-resync-target";
918
919		if (device->use_csums && device->rs_total) {
920			const unsigned long s = device->rs_same_csum;
921			const unsigned long t = device->rs_total;
922			const int ratio =
923				(t == 0)     ? 0 :
924			(t < 100000) ? ((s*100)/t) : (s/(t/100));
925			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
926			     "transferred %luK total %luK\n",
927			     ratio,
928			     Bit2KB(device->rs_same_csum),
929			     Bit2KB(device->rs_total - device->rs_same_csum),
930			     Bit2KB(device->rs_total));
931		}
932	}
933
934	if (device->rs_failed) {
935		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
936
937		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
938			ns.disk = D_INCONSISTENT;
939			ns.pdsk = D_UP_TO_DATE;
940		} else {
941			ns.disk = D_UP_TO_DATE;
942			ns.pdsk = D_INCONSISTENT;
943		}
944	} else {
945		ns.disk = D_UP_TO_DATE;
946		ns.pdsk = D_UP_TO_DATE;
947
948		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
949			if (device->p_uuid) {
950				int i;
951				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
952					_drbd_uuid_set(device, i, device->p_uuid[i]);
953				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
954				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
955			} else {
956				drbd_err(device, "device->p_uuid is NULL! BUG\n");
957			}
958		}
959
960		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
961			/* for verify runs, we don't update uuids here,
962			 * so there would be nothing to report. */
963			drbd_uuid_set_bm(device, 0UL);
964			drbd_print_uuids(device, "updated UUIDs");
965			if (device->p_uuid) {
966				/* Now the two UUID sets are equal, update what we
967				 * know of the peer. */
968				int i;
969				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
970					device->p_uuid[i] = device->ldev->md.uuid[i];
971			}
972		}
973	}
974
975	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
976out_unlock:
977	spin_unlock_irq(&device->resource->req_lock);
978	put_ldev(device);
979out:
980	device->rs_total  = 0;
981	device->rs_failed = 0;
982	device->rs_paused = 0;
983
984	/* reset start sector, if we reached end of device */
985	if (verify_done && device->ov_left == 0)
986		device->ov_start_sector = 0;
987
988	drbd_md_sync(device);
989
990	if (khelper_cmd)
991		drbd_khelper(device, khelper_cmd);
992
993	return 1;
994}
995
996/* helper */
997static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
998{
999	if (drbd_peer_req_has_active_page(peer_req)) {
1000		/* This might happen if sendpage() has not finished */
1001		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1002		atomic_add(i, &device->pp_in_use_by_net);
1003		atomic_sub(i, &device->pp_in_use);
1004		spin_lock_irq(&device->resource->req_lock);
1005		list_add_tail(&peer_req->w.list, &device->net_ee);
1006		spin_unlock_irq(&device->resource->req_lock);
1007		wake_up(&drbd_pp_wait);
1008	} else
1009		drbd_free_peer_req(device, peer_req);
1010}
1011
1012/**
1013 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1014 * @device:	DRBD device.
1015 * @w:		work object.
1016 * @cancel:	The connection will be closed anyways
1017 */
1018int w_e_end_data_req(struct drbd_work *w, int cancel)
1019{
1020	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1021	struct drbd_peer_device *peer_device = peer_req->peer_device;
1022	struct drbd_device *device = peer_device->device;
1023	int err;
1024
1025	if (unlikely(cancel)) {
1026		drbd_free_peer_req(device, peer_req);
1027		dec_unacked(device);
1028		return 0;
1029	}
1030
1031	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1032		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1033	} else {
1034		if (__ratelimit(&drbd_ratelimit_state))
1035			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1036			    (unsigned long long)peer_req->i.sector);
1037
1038		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1039	}
1040
1041	dec_unacked(device);
1042
1043	move_to_net_ee_or_free(device, peer_req);
1044
1045	if (unlikely(err))
1046		drbd_err(device, "drbd_send_block() failed\n");
1047	return err;
1048}
1049
1050/**
1051 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1052 * @w:		work object.
1053 * @cancel:	The connection will be closed anyways
1054 */
1055int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1056{
1057	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1058	struct drbd_peer_device *peer_device = peer_req->peer_device;
1059	struct drbd_device *device = peer_device->device;
1060	int err;
1061
1062	if (unlikely(cancel)) {
1063		drbd_free_peer_req(device, peer_req);
1064		dec_unacked(device);
1065		return 0;
1066	}
1067
1068	if (get_ldev_if_state(device, D_FAILED)) {
1069		drbd_rs_complete_io(device, peer_req->i.sector);
1070		put_ldev(device);
1071	}
1072
1073	if (device->state.conn == C_AHEAD) {
1074		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1075	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1076		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1077			inc_rs_pending(device);
1078			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1079		} else {
1080			if (__ratelimit(&drbd_ratelimit_state))
1081				drbd_err(device, "Not sending RSDataReply, "
1082				    "partner DISKLESS!\n");
1083			err = 0;
1084		}
1085	} else {
1086		if (__ratelimit(&drbd_ratelimit_state))
1087			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1088			    (unsigned long long)peer_req->i.sector);
1089
1090		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1091
1092		/* update resync data with failure */
1093		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1094	}
1095
1096	dec_unacked(device);
1097
1098	move_to_net_ee_or_free(device, peer_req);
1099
1100	if (unlikely(err))
1101		drbd_err(device, "drbd_send_block() failed\n");
1102	return err;
1103}
1104
1105int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1106{
1107	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1108	struct drbd_peer_device *peer_device = peer_req->peer_device;
1109	struct drbd_device *device = peer_device->device;
1110	struct digest_info *di;
1111	int digest_size;
1112	void *digest = NULL;
1113	int err, eq = 0;
1114
1115	if (unlikely(cancel)) {
1116		drbd_free_peer_req(device, peer_req);
1117		dec_unacked(device);
1118		return 0;
1119	}
1120
1121	if (get_ldev(device)) {
1122		drbd_rs_complete_io(device, peer_req->i.sector);
1123		put_ldev(device);
1124	}
1125
1126	di = peer_req->digest;
1127
1128	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1129		/* quick hack to try to avoid a race against reconfiguration.
1130		 * a real fix would be much more involved,
1131		 * introducing more locking mechanisms */
1132		if (peer_device->connection->csums_tfm) {
1133			digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1134			D_ASSERT(device, digest_size == di->digest_size);
1135			digest = kmalloc(digest_size, GFP_NOIO);
1136		}
1137		if (digest) {
1138			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1139			eq = !memcmp(digest, di->digest, digest_size);
1140			kfree(digest);
1141		}
1142
1143		if (eq) {
1144			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1145			/* rs_same_csums unit is BM_BLOCK_SIZE */
1146			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1147			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1148		} else {
1149			inc_rs_pending(device);
1150			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1151			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1152			kfree(di);
1153			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1154		}
1155	} else {
1156		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1157		if (__ratelimit(&drbd_ratelimit_state))
1158			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1159	}
1160
1161	dec_unacked(device);
1162	move_to_net_ee_or_free(device, peer_req);
1163
1164	if (unlikely(err))
1165		drbd_err(device, "drbd_send_block/ack() failed\n");
1166	return err;
1167}
1168
1169int w_e_end_ov_req(struct drbd_work *w, int cancel)
1170{
1171	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1172	struct drbd_peer_device *peer_device = peer_req->peer_device;
1173	struct drbd_device *device = peer_device->device;
1174	sector_t sector = peer_req->i.sector;
1175	unsigned int size = peer_req->i.size;
1176	int digest_size;
1177	void *digest;
1178	int err = 0;
1179
1180	if (unlikely(cancel))
1181		goto out;
1182
1183	digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1184	digest = kmalloc(digest_size, GFP_NOIO);
1185	if (!digest) {
1186		err = 1;	/* terminate the connection in case the allocation failed */
1187		goto out;
1188	}
1189
1190	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1191		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1192	else
1193		memset(digest, 0, digest_size);
1194
1195	/* Free e and pages before send.
1196	 * In case we block on congestion, we could otherwise run into
1197	 * some distributed deadlock, if the other side blocks on
1198	 * congestion as well, because our receiver blocks in
1199	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1200	drbd_free_peer_req(device, peer_req);
1201	peer_req = NULL;
1202	inc_rs_pending(device);
1203	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1204	if (err)
1205		dec_rs_pending(device);
1206	kfree(digest);
1207
1208out:
1209	if (peer_req)
1210		drbd_free_peer_req(device, peer_req);
1211	dec_unacked(device);
1212	return err;
1213}
1214
1215void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1216{
1217	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1218		device->ov_last_oos_size += size>>9;
1219	} else {
1220		device->ov_last_oos_start = sector;
1221		device->ov_last_oos_size = size>>9;
1222	}
1223	drbd_set_out_of_sync(device, sector, size);
1224}
1225
1226int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1227{
1228	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1229	struct drbd_peer_device *peer_device = peer_req->peer_device;
1230	struct drbd_device *device = peer_device->device;
1231	struct digest_info *di;
1232	void *digest;
1233	sector_t sector = peer_req->i.sector;
1234	unsigned int size = peer_req->i.size;
1235	int digest_size;
1236	int err, eq = 0;
1237	bool stop_sector_reached = false;
1238
1239	if (unlikely(cancel)) {
1240		drbd_free_peer_req(device, peer_req);
1241		dec_unacked(device);
1242		return 0;
1243	}
1244
1245	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1246	 * the resync lru has been cleaned up already */
1247	if (get_ldev(device)) {
1248		drbd_rs_complete_io(device, peer_req->i.sector);
1249		put_ldev(device);
1250	}
1251
1252	di = peer_req->digest;
1253
1254	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1255		digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1256		digest = kmalloc(digest_size, GFP_NOIO);
1257		if (digest) {
1258			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1259
1260			D_ASSERT(device, digest_size == di->digest_size);
1261			eq = !memcmp(digest, di->digest, digest_size);
1262			kfree(digest);
1263		}
1264	}
1265
1266	/* Free peer_req and pages before send.
1267	 * In case we block on congestion, we could otherwise run into
1268	 * some distributed deadlock, if the other side blocks on
1269	 * congestion as well, because our receiver blocks in
1270	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1271	drbd_free_peer_req(device, peer_req);
1272	if (!eq)
1273		drbd_ov_out_of_sync_found(device, sector, size);
1274	else
1275		ov_out_of_sync_print(device);
1276
1277	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1278			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1279
1280	dec_unacked(device);
1281
1282	--device->ov_left;
1283
1284	/* let's advance progress step marks only for every other megabyte */
1285	if ((device->ov_left & 0x200) == 0x200)
1286		drbd_advance_rs_marks(device, device->ov_left);
1287
1288	stop_sector_reached = verify_can_do_stop_sector(device) &&
1289		(sector + (size>>9)) >= device->ov_stop_sector;
1290
1291	if (device->ov_left == 0 || stop_sector_reached) {
1292		ov_out_of_sync_print(device);
1293		drbd_resync_finished(device);
1294	}
1295
1296	return err;
1297}
1298
1299/* FIXME
1300 * We need to track the number of pending barrier acks,
1301 * and to be able to wait for them.
1302 * See also comment in drbd_adm_attach before drbd_suspend_io.
1303 */
1304static int drbd_send_barrier(struct drbd_connection *connection)
1305{
1306	struct p_barrier *p;
1307	struct drbd_socket *sock;
1308
1309	sock = &connection->data;
1310	p = conn_prepare_command(connection, sock);
1311	if (!p)
1312		return -EIO;
1313	p->barrier = connection->send.current_epoch_nr;
1314	p->pad = 0;
1315	connection->send.current_epoch_writes = 0;
1316
1317	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1318}
1319
1320int w_send_write_hint(struct drbd_work *w, int cancel)
1321{
1322	struct drbd_device *device =
1323		container_of(w, struct drbd_device, unplug_work);
1324	struct drbd_socket *sock;
1325
1326	if (cancel)
1327		return 0;
1328	sock = &first_peer_device(device)->connection->data;
1329	if (!drbd_prepare_command(first_peer_device(device), sock))
1330		return -EIO;
1331	return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1332}
1333
1334static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1335{
1336	if (!connection->send.seen_any_write_yet) {
1337		connection->send.seen_any_write_yet = true;
1338		connection->send.current_epoch_nr = epoch;
1339		connection->send.current_epoch_writes = 0;
1340	}
1341}
1342
1343static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1344{
1345	/* re-init if first write on this connection */
1346	if (!connection->send.seen_any_write_yet)
1347		return;
1348	if (connection->send.current_epoch_nr != epoch) {
1349		if (connection->send.current_epoch_writes)
1350			drbd_send_barrier(connection);
1351		connection->send.current_epoch_nr = epoch;
1352	}
1353}
1354
1355int w_send_out_of_sync(struct drbd_work *w, int cancel)
1356{
1357	struct drbd_request *req = container_of(w, struct drbd_request, w);
1358	struct drbd_device *device = req->device;
1359	struct drbd_peer_device *const peer_device = first_peer_device(device);
1360	struct drbd_connection *const connection = peer_device->connection;
1361	int err;
1362
1363	if (unlikely(cancel)) {
1364		req_mod(req, SEND_CANCELED);
1365		return 0;
1366	}
1367	req->pre_send_jif = jiffies;
1368
1369	/* this time, no connection->send.current_epoch_writes++;
1370	 * If it was sent, it was the closing barrier for the last
1371	 * replicated epoch, before we went into AHEAD mode.
1372	 * No more barriers will be sent, until we leave AHEAD mode again. */
1373	maybe_send_barrier(connection, req->epoch);
1374
1375	err = drbd_send_out_of_sync(peer_device, req);
1376	req_mod(req, OOS_HANDED_TO_NETWORK);
1377
1378	return err;
1379}
1380
1381/**
1382 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1383 * @w:		work object.
1384 * @cancel:	The connection will be closed anyways
1385 */
1386int w_send_dblock(struct drbd_work *w, int cancel)
1387{
1388	struct drbd_request *req = container_of(w, struct drbd_request, w);
1389	struct drbd_device *device = req->device;
1390	struct drbd_peer_device *const peer_device = first_peer_device(device);
1391	struct drbd_connection *connection = peer_device->connection;
1392	int err;
1393
1394	if (unlikely(cancel)) {
1395		req_mod(req, SEND_CANCELED);
1396		return 0;
1397	}
1398	req->pre_send_jif = jiffies;
1399
1400	re_init_if_first_write(connection, req->epoch);
1401	maybe_send_barrier(connection, req->epoch);
1402	connection->send.current_epoch_writes++;
1403
1404	err = drbd_send_dblock(peer_device, req);
1405	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1406
1407	return err;
1408}
1409
1410/**
1411 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1412 * @w:		work object.
1413 * @cancel:	The connection will be closed anyways
1414 */
1415int w_send_read_req(struct drbd_work *w, int cancel)
1416{
1417	struct drbd_request *req = container_of(w, struct drbd_request, w);
1418	struct drbd_device *device = req->device;
1419	struct drbd_peer_device *const peer_device = first_peer_device(device);
1420	struct drbd_connection *connection = peer_device->connection;
1421	int err;
1422
1423	if (unlikely(cancel)) {
1424		req_mod(req, SEND_CANCELED);
1425		return 0;
1426	}
1427	req->pre_send_jif = jiffies;
1428
1429	/* Even read requests may close a write epoch,
1430	 * if there was any yet. */
1431	maybe_send_barrier(connection, req->epoch);
1432
1433	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1434				 (unsigned long)req);
1435
1436	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1437
1438	return err;
1439}
1440
1441int w_restart_disk_io(struct drbd_work *w, int cancel)
1442{
1443	struct drbd_request *req = container_of(w, struct drbd_request, w);
1444	struct drbd_device *device = req->device;
1445
1446	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1447		drbd_al_begin_io(device, &req->i);
1448
1449	drbd_req_make_private_bio(req, req->master_bio);
1450	req->private_bio->bi_bdev = device->ldev->backing_bdev;
1451	generic_make_request(req->private_bio);
1452
1453	return 0;
1454}
1455
1456static int _drbd_may_sync_now(struct drbd_device *device)
1457{
1458	struct drbd_device *odev = device;
1459	int resync_after;
1460
1461	while (1) {
1462		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1463			return 1;
1464		rcu_read_lock();
1465		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1466		rcu_read_unlock();
1467		if (resync_after == -1)
1468			return 1;
1469		odev = minor_to_device(resync_after);
1470		if (!odev)
1471			return 1;
1472		if ((odev->state.conn >= C_SYNC_SOURCE &&
1473		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1474		    odev->state.aftr_isp || odev->state.peer_isp ||
1475		    odev->state.user_isp)
1476			return 0;
1477	}
1478}
1479
1480/**
1481 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1482 * @device:	DRBD device.
1483 *
1484 * Called from process context only (admin command and after_state_ch).
1485 */
1486static int _drbd_pause_after(struct drbd_device *device)
1487{
1488	struct drbd_device *odev;
1489	int i, rv = 0;
1490
1491	rcu_read_lock();
1492	idr_for_each_entry(&drbd_devices, odev, i) {
1493		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1494			continue;
1495		if (!_drbd_may_sync_now(odev))
1496			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1497			       != SS_NOTHING_TO_DO);
1498	}
1499	rcu_read_unlock();
1500
1501	return rv;
1502}
1503
1504/**
1505 * _drbd_resume_next() - Resume resync on all devices that may resync now
1506 * @device:	DRBD device.
1507 *
1508 * Called from process context only (admin command and worker).
1509 */
1510static int _drbd_resume_next(struct drbd_device *device)
1511{
1512	struct drbd_device *odev;
1513	int i, rv = 0;
1514
1515	rcu_read_lock();
1516	idr_for_each_entry(&drbd_devices, odev, i) {
1517		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1518			continue;
1519		if (odev->state.aftr_isp) {
1520			if (_drbd_may_sync_now(odev))
1521				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1522							CS_HARD, NULL)
1523				       != SS_NOTHING_TO_DO) ;
1524		}
1525	}
1526	rcu_read_unlock();
1527	return rv;
1528}
1529
1530void resume_next_sg(struct drbd_device *device)
1531{
1532	write_lock_irq(&global_state_lock);
1533	_drbd_resume_next(device);
1534	write_unlock_irq(&global_state_lock);
1535}
1536
1537void suspend_other_sg(struct drbd_device *device)
1538{
1539	write_lock_irq(&global_state_lock);
1540	_drbd_pause_after(device);
1541	write_unlock_irq(&global_state_lock);
1542}
1543
1544/* caller must hold global_state_lock */
1545enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1546{
1547	struct drbd_device *odev;
1548	int resync_after;
1549
1550	if (o_minor == -1)
1551		return NO_ERROR;
1552	if (o_minor < -1 || o_minor > MINORMASK)
1553		return ERR_RESYNC_AFTER;
1554
1555	/* check for loops */
1556	odev = minor_to_device(o_minor);
1557	while (1) {
1558		if (odev == device)
1559			return ERR_RESYNC_AFTER_CYCLE;
1560
1561		/* You are free to depend on diskless, non-existing,
1562		 * or not yet/no longer existing minors.
1563		 * We only reject dependency loops.
1564		 * We cannot follow the dependency chain beyond a detached or
1565		 * missing minor.
1566		 */
1567		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1568			return NO_ERROR;
1569
1570		rcu_read_lock();
1571		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1572		rcu_read_unlock();
1573		/* dependency chain ends here, no cycles. */
1574		if (resync_after == -1)
1575			return NO_ERROR;
1576
1577		/* follow the dependency chain */
1578		odev = minor_to_device(resync_after);
1579	}
1580}
1581
1582/* caller must hold global_state_lock */
1583void drbd_resync_after_changed(struct drbd_device *device)
1584{
1585	int changes;
1586
1587	do {
1588		changes  = _drbd_pause_after(device);
1589		changes |= _drbd_resume_next(device);
1590	} while (changes);
1591}
1592
1593void drbd_rs_controller_reset(struct drbd_device *device)
1594{
1595	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1596	struct fifo_buffer *plan;
1597
1598	atomic_set(&device->rs_sect_in, 0);
1599	atomic_set(&device->rs_sect_ev, 0);
1600	device->rs_in_flight = 0;
1601	device->rs_last_events =
1602		(int)part_stat_read(&disk->part0, sectors[0]) +
1603		(int)part_stat_read(&disk->part0, sectors[1]);
1604
1605	/* Updating the RCU protected object in place is necessary since
1606	   this function gets called from atomic context.
1607	   It is valid since all other updates also lead to an completely
1608	   empty fifo */
1609	rcu_read_lock();
1610	plan = rcu_dereference(device->rs_plan_s);
1611	plan->total = 0;
1612	fifo_set(plan, 0);
1613	rcu_read_unlock();
1614}
1615
1616void start_resync_timer_fn(unsigned long data)
1617{
1618	struct drbd_device *device = (struct drbd_device *) data;
1619	drbd_device_post_work(device, RS_START);
1620}
1621
1622static void do_start_resync(struct drbd_device *device)
1623{
1624	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1625		drbd_warn(device, "postponing start_resync ...\n");
1626		device->start_resync_timer.expires = jiffies + HZ/10;
1627		add_timer(&device->start_resync_timer);
1628		return;
1629	}
1630
1631	drbd_start_resync(device, C_SYNC_SOURCE);
1632	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1633}
1634
1635static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1636{
1637	bool csums_after_crash_only;
1638	rcu_read_lock();
1639	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1640	rcu_read_unlock();
1641	return connection->agreed_pro_version >= 89 &&		/* supported? */
1642		connection->csums_tfm &&			/* configured? */
1643		(csums_after_crash_only == 0			/* use for each resync? */
1644		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
1645}
1646
1647/**
1648 * drbd_start_resync() - Start the resync process
1649 * @device:	DRBD device.
1650 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1651 *
1652 * This function might bring you directly into one of the
1653 * C_PAUSED_SYNC_* states.
1654 */
1655void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1656{
1657	struct drbd_peer_device *peer_device = first_peer_device(device);
1658	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1659	union drbd_state ns;
1660	int r;
1661
1662	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1663		drbd_err(device, "Resync already running!\n");
1664		return;
1665	}
1666
1667	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1668		if (side == C_SYNC_TARGET) {
1669			/* Since application IO was locked out during C_WF_BITMAP_T and
1670			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1671			   we check that we might make the data inconsistent. */
1672			r = drbd_khelper(device, "before-resync-target");
1673			r = (r >> 8) & 0xff;
1674			if (r > 0) {
1675				drbd_info(device, "before-resync-target handler returned %d, "
1676					 "dropping connection.\n", r);
1677				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1678				return;
1679			}
1680		} else /* C_SYNC_SOURCE */ {
1681			r = drbd_khelper(device, "before-resync-source");
1682			r = (r >> 8) & 0xff;
1683			if (r > 0) {
1684				if (r == 3) {
1685					drbd_info(device, "before-resync-source handler returned %d, "
1686						 "ignoring. Old userland tools?", r);
1687				} else {
1688					drbd_info(device, "before-resync-source handler returned %d, "
1689						 "dropping connection.\n", r);
1690					conn_request_state(connection,
1691							   NS(conn, C_DISCONNECTING), CS_HARD);
1692					return;
1693				}
1694			}
1695		}
1696	}
1697
1698	if (current == connection->worker.task) {
1699		/* The worker should not sleep waiting for state_mutex,
1700		   that can take long */
1701		if (!mutex_trylock(device->state_mutex)) {
1702			set_bit(B_RS_H_DONE, &device->flags);
1703			device->start_resync_timer.expires = jiffies + HZ/5;
1704			add_timer(&device->start_resync_timer);
1705			return;
1706		}
1707	} else {
1708		mutex_lock(device->state_mutex);
1709	}
1710	clear_bit(B_RS_H_DONE, &device->flags);
1711
1712	/* req_lock: serialize with drbd_send_and_submit() and others
1713	 * global_state_lock: for stable sync-after dependencies */
1714	spin_lock_irq(&device->resource->req_lock);
1715	write_lock(&global_state_lock);
1716	/* Did some connection breakage or IO error race with us? */
1717	if (device->state.conn < C_CONNECTED
1718	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1719		write_unlock(&global_state_lock);
1720		spin_unlock_irq(&device->resource->req_lock);
1721		mutex_unlock(device->state_mutex);
1722		return;
1723	}
1724
1725	ns = drbd_read_state(device);
1726
1727	ns.aftr_isp = !_drbd_may_sync_now(device);
1728
1729	ns.conn = side;
1730
1731	if (side == C_SYNC_TARGET)
1732		ns.disk = D_INCONSISTENT;
1733	else /* side == C_SYNC_SOURCE */
1734		ns.pdsk = D_INCONSISTENT;
1735
1736	r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1737	ns = drbd_read_state(device);
1738
1739	if (ns.conn < C_CONNECTED)
1740		r = SS_UNKNOWN_ERROR;
1741
1742	if (r == SS_SUCCESS) {
1743		unsigned long tw = drbd_bm_total_weight(device);
1744		unsigned long now = jiffies;
1745		int i;
1746
1747		device->rs_failed    = 0;
1748		device->rs_paused    = 0;
1749		device->rs_same_csum = 0;
1750		device->rs_last_sect_ev = 0;
1751		device->rs_total     = tw;
1752		device->rs_start     = now;
1753		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1754			device->rs_mark_left[i] = tw;
1755			device->rs_mark_time[i] = now;
1756		}
1757		_drbd_pause_after(device);
1758		/* Forget potentially stale cached per resync extent bit-counts.
1759		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1760		 * disabled, and know the disk state is ok. */
1761		spin_lock(&device->al_lock);
1762		lc_reset(device->resync);
1763		device->resync_locked = 0;
1764		device->resync_wenr = LC_FREE;
1765		spin_unlock(&device->al_lock);
1766	}
1767	write_unlock(&global_state_lock);
1768	spin_unlock_irq(&device->resource->req_lock);
1769
1770	if (r == SS_SUCCESS) {
1771		wake_up(&device->al_wait); /* for lc_reset() above */
1772		/* reset rs_last_bcast when a resync or verify is started,
1773		 * to deal with potential jiffies wrap. */
1774		device->rs_last_bcast = jiffies - HZ;
1775
1776		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1777		     drbd_conn_str(ns.conn),
1778		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1779		     (unsigned long) device->rs_total);
1780		if (side == C_SYNC_TARGET) {
1781			device->bm_resync_fo = 0;
1782			device->use_csums = use_checksum_based_resync(connection, device);
1783		} else {
1784			device->use_csums = 0;
1785		}
1786
1787		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1788		 * with w_send_oos, or the sync target will get confused as to
1789		 * how much bits to resync.  We cannot do that always, because for an
1790		 * empty resync and protocol < 95, we need to do it here, as we call
1791		 * drbd_resync_finished from here in that case.
1792		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1793		 * and from after_state_ch otherwise. */
1794		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1795			drbd_gen_and_send_sync_uuid(peer_device);
1796
1797		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1798			/* This still has a race (about when exactly the peers
1799			 * detect connection loss) that can lead to a full sync
1800			 * on next handshake. In 8.3.9 we fixed this with explicit
1801			 * resync-finished notifications, but the fix
1802			 * introduces a protocol change.  Sleeping for some
1803			 * time longer than the ping interval + timeout on the
1804			 * SyncSource, to give the SyncTarget the chance to
1805			 * detect connection loss, then waiting for a ping
1806			 * response (implicit in drbd_resync_finished) reduces
1807			 * the race considerably, but does not solve it. */
1808			if (side == C_SYNC_SOURCE) {
1809				struct net_conf *nc;
1810				int timeo;
1811
1812				rcu_read_lock();
1813				nc = rcu_dereference(connection->net_conf);
1814				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1815				rcu_read_unlock();
1816				schedule_timeout_interruptible(timeo);
1817			}
1818			drbd_resync_finished(device);
1819		}
1820
1821		drbd_rs_controller_reset(device);
1822		/* ns.conn may already be != device->state.conn,
1823		 * we may have been paused in between, or become paused until
1824		 * the timer triggers.
1825		 * No matter, that is handled in resync_timer_fn() */
1826		if (ns.conn == C_SYNC_TARGET)
1827			mod_timer(&device->resync_timer, jiffies);
1828
1829		drbd_md_sync(device);
1830	}
1831	put_ldev(device);
1832	mutex_unlock(device->state_mutex);
1833}
1834
1835static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1836{
1837	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1838	device->rs_last_bcast = jiffies;
1839
1840	if (!get_ldev(device))
1841		return;
1842
1843	drbd_bm_write_lazy(device, 0);
1844	if (resync_done && is_sync_state(device->state.conn))
1845		drbd_resync_finished(device);
1846
1847	drbd_bcast_event(device, &sib);
1848	/* update timestamp, in case it took a while to write out stuff */
1849	device->rs_last_bcast = jiffies;
1850	put_ldev(device);
1851}
1852
1853static void drbd_ldev_destroy(struct drbd_device *device)
1854{
1855	lc_destroy(device->resync);
1856	device->resync = NULL;
1857	lc_destroy(device->act_log);
1858	device->act_log = NULL;
1859
1860	__acquire(local);
1861	drbd_free_ldev(device->ldev);
1862	device->ldev = NULL;
1863	__release(local);
1864
1865	clear_bit(GOING_DISKLESS, &device->flags);
1866	wake_up(&device->misc_wait);
1867}
1868
1869static void go_diskless(struct drbd_device *device)
1870{
1871	D_ASSERT(device, device->state.disk == D_FAILED);
1872	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1873	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1874	 * the protected members anymore, though, so once put_ldev reaches zero
1875	 * again, it will be safe to free them. */
1876
1877	/* Try to write changed bitmap pages, read errors may have just
1878	 * set some bits outside the area covered by the activity log.
1879	 *
1880	 * If we have an IO error during the bitmap writeout,
1881	 * we will want a full sync next time, just in case.
1882	 * (Do we want a specific meta data flag for this?)
1883	 *
1884	 * If that does not make it to stable storage either,
1885	 * we cannot do anything about that anymore.
1886	 *
1887	 * We still need to check if both bitmap and ldev are present, we may
1888	 * end up here after a failed attach, before ldev was even assigned.
1889	 */
1890	if (device->bitmap && device->ldev) {
1891		/* An interrupted resync or similar is allowed to recounts bits
1892		 * while we detach.
1893		 * Any modifications would not be expected anymore, though.
1894		 */
1895		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1896					"detach", BM_LOCKED_TEST_ALLOWED)) {
1897			if (test_bit(WAS_READ_ERROR, &device->flags)) {
1898				drbd_md_set_flag(device, MDF_FULL_SYNC);
1899				drbd_md_sync(device);
1900			}
1901		}
1902	}
1903
1904	drbd_force_state(device, NS(disk, D_DISKLESS));
1905}
1906
1907static int do_md_sync(struct drbd_device *device)
1908{
1909	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1910	drbd_md_sync(device);
1911	return 0;
1912}
1913
1914/* only called from drbd_worker thread, no locking */
1915void __update_timing_details(
1916		struct drbd_thread_timing_details *tdp,
1917		unsigned int *cb_nr,
1918		void *cb,
1919		const char *fn, const unsigned int line)
1920{
1921	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1922	struct drbd_thread_timing_details *td = tdp + i;
1923
1924	td->start_jif = jiffies;
1925	td->cb_addr = cb;
1926	td->caller_fn = fn;
1927	td->line = line;
1928	td->cb_nr = *cb_nr;
1929
1930	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1931	td = tdp + i;
1932	memset(td, 0, sizeof(*td));
1933
1934	++(*cb_nr);
1935}
1936
1937static void do_device_work(struct drbd_device *device, const unsigned long todo)
1938{
1939	if (test_bit(MD_SYNC, &todo))
1940		do_md_sync(device);
1941	if (test_bit(RS_DONE, &todo) ||
1942	    test_bit(RS_PROGRESS, &todo))
1943		update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1944	if (test_bit(GO_DISKLESS, &todo))
1945		go_diskless(device);
1946	if (test_bit(DESTROY_DISK, &todo))
1947		drbd_ldev_destroy(device);
1948	if (test_bit(RS_START, &todo))
1949		do_start_resync(device);
1950}
1951
1952#define DRBD_DEVICE_WORK_MASK	\
1953	((1UL << GO_DISKLESS)	\
1954	|(1UL << DESTROY_DISK)	\
1955	|(1UL << MD_SYNC)	\
1956	|(1UL << RS_START)	\
1957	|(1UL << RS_PROGRESS)	\
1958	|(1UL << RS_DONE)	\
1959	)
1960
1961static unsigned long get_work_bits(unsigned long *flags)
1962{
1963	unsigned long old, new;
1964	do {
1965		old = *flags;
1966		new = old & ~DRBD_DEVICE_WORK_MASK;
1967	} while (cmpxchg(flags, old, new) != old);
1968	return old & DRBD_DEVICE_WORK_MASK;
1969}
1970
1971static void do_unqueued_work(struct drbd_connection *connection)
1972{
1973	struct drbd_peer_device *peer_device;
1974	int vnr;
1975
1976	rcu_read_lock();
1977	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1978		struct drbd_device *device = peer_device->device;
1979		unsigned long todo = get_work_bits(&device->flags);
1980		if (!todo)
1981			continue;
1982
1983		kref_get(&device->kref);
1984		rcu_read_unlock();
1985		do_device_work(device, todo);
1986		kref_put(&device->kref, drbd_destroy_device);
1987		rcu_read_lock();
1988	}
1989	rcu_read_unlock();
1990}
1991
1992static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1993{
1994	spin_lock_irq(&queue->q_lock);
1995	list_splice_tail_init(&queue->q, work_list);
1996	spin_unlock_irq(&queue->q_lock);
1997	return !list_empty(work_list);
1998}
1999
2000static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2001{
2002	DEFINE_WAIT(wait);
2003	struct net_conf *nc;
2004	int uncork, cork;
2005
2006	dequeue_work_batch(&connection->sender_work, work_list);
2007	if (!list_empty(work_list))
2008		return;
2009
2010	/* Still nothing to do?
2011	 * Maybe we still need to close the current epoch,
2012	 * even if no new requests are queued yet.
2013	 *
2014	 * Also, poke TCP, just in case.
2015	 * Then wait for new work (or signal). */
2016	rcu_read_lock();
2017	nc = rcu_dereference(connection->net_conf);
2018	uncork = nc ? nc->tcp_cork : 0;
2019	rcu_read_unlock();
2020	if (uncork) {
2021		mutex_lock(&connection->data.mutex);
2022		if (connection->data.socket)
2023			drbd_tcp_uncork(connection->data.socket);
2024		mutex_unlock(&connection->data.mutex);
2025	}
2026
2027	for (;;) {
2028		int send_barrier;
2029		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2030		spin_lock_irq(&connection->resource->req_lock);
2031		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2032		if (!list_empty(&connection->sender_work.q))
2033			list_splice_tail_init(&connection->sender_work.q, work_list);
2034		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2035		if (!list_empty(work_list) || signal_pending(current)) {
2036			spin_unlock_irq(&connection->resource->req_lock);
2037			break;
2038		}
2039
2040		/* We found nothing new to do, no to-be-communicated request,
2041		 * no other work item.  We may still need to close the last
2042		 * epoch.  Next incoming request epoch will be connection ->
2043		 * current transfer log epoch number.  If that is different
2044		 * from the epoch of the last request we communicated, it is
2045		 * safe to send the epoch separating barrier now.
2046		 */
2047		send_barrier =
2048			atomic_read(&connection->current_tle_nr) !=
2049			connection->send.current_epoch_nr;
2050		spin_unlock_irq(&connection->resource->req_lock);
2051
2052		if (send_barrier)
2053			maybe_send_barrier(connection,
2054					connection->send.current_epoch_nr + 1);
2055
2056		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2057			break;
2058
2059		/* drbd_send() may have called flush_signals() */
2060		if (get_t_state(&connection->worker) != RUNNING)
2061			break;
2062
2063		schedule();
2064		/* may be woken up for other things but new work, too,
2065		 * e.g. if the current epoch got closed.
2066		 * In which case we send the barrier above. */
2067	}
2068	finish_wait(&connection->sender_work.q_wait, &wait);
2069
2070	/* someone may have changed the config while we have been waiting above. */
2071	rcu_read_lock();
2072	nc = rcu_dereference(connection->net_conf);
2073	cork = nc ? nc->tcp_cork : 0;
2074	rcu_read_unlock();
2075	mutex_lock(&connection->data.mutex);
2076	if (connection->data.socket) {
2077		if (cork)
2078			drbd_tcp_cork(connection->data.socket);
2079		else if (!uncork)
2080			drbd_tcp_uncork(connection->data.socket);
2081	}
2082	mutex_unlock(&connection->data.mutex);
2083}
2084
2085int drbd_worker(struct drbd_thread *thi)
2086{
2087	struct drbd_connection *connection = thi->connection;
2088	struct drbd_work *w = NULL;
2089	struct drbd_peer_device *peer_device;
2090	LIST_HEAD(work_list);
2091	int vnr;
2092
2093	while (get_t_state(thi) == RUNNING) {
2094		drbd_thread_current_set_cpu(thi);
2095
2096		if (list_empty(&work_list)) {
2097			update_worker_timing_details(connection, wait_for_work);
2098			wait_for_work(connection, &work_list);
2099		}
2100
2101		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2102			update_worker_timing_details(connection, do_unqueued_work);
2103			do_unqueued_work(connection);
2104		}
2105
2106		if (signal_pending(current)) {
2107			flush_signals(current);
2108			if (get_t_state(thi) == RUNNING) {
2109				drbd_warn(connection, "Worker got an unexpected signal\n");
2110				continue;
2111			}
2112			break;
2113		}
2114
2115		if (get_t_state(thi) != RUNNING)
2116			break;
2117
2118		if (!list_empty(&work_list)) {
2119			w = list_first_entry(&work_list, struct drbd_work, list);
2120			list_del_init(&w->list);
2121			update_worker_timing_details(connection, w->cb);
2122			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2123				continue;
2124			if (connection->cstate >= C_WF_REPORT_PARAMS)
2125				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2126		}
2127	}
2128
2129	do {
2130		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2131			update_worker_timing_details(connection, do_unqueued_work);
2132			do_unqueued_work(connection);
2133		}
2134		if (!list_empty(&work_list)) {
2135			w = list_first_entry(&work_list, struct drbd_work, list);
2136			list_del_init(&w->list);
2137			update_worker_timing_details(connection, w->cb);
2138			w->cb(w, 1);
2139		} else
2140			dequeue_work_batch(&connection->sender_work, &work_list);
2141	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2142
2143	rcu_read_lock();
2144	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2145		struct drbd_device *device = peer_device->device;
2146		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2147		kref_get(&device->kref);
2148		rcu_read_unlock();
2149		drbd_device_cleanup(device);
2150		kref_put(&device->kref, drbd_destroy_device);
2151		rcu_read_lock();
2152	}
2153	rcu_read_unlock();
2154
2155	return 0;
2156}
2157