This source file includes following definitions.
- dlm_get_next_mig_cookie
- dlm_set_reco_dead_node
- dlm_set_reco_master
- __dlm_reset_recovery
- dlm_reset_recovery
- dlm_dispatch_work
- dlm_kick_recovery_thread
- dlm_launch_recovery_thread
- dlm_complete_recovery_thread
- dlm_print_reco_node_status
- dlm_recovery_thread
- dlm_reco_master_ready
- dlm_is_node_dead
- dlm_is_node_recovered
- dlm_wait_for_node_death
- dlm_wait_for_node_recovery
- dlm_in_recovery
- dlm_wait_for_recovery
- dlm_begin_recovery
- dlm_end_recovery
- dlm_print_recovery_master
- dlm_do_recovery
- dlm_remaster_locks
- dlm_init_recovery_area
- dlm_destroy_recovery_area
- dlm_request_all_locks
- dlm_request_all_locks_handler
- dlm_request_all_locks_worker
- dlm_send_all_done_msg
- dlm_reco_data_done_handler
- dlm_move_reco_locks_to_list
- dlm_num_locks_in_lockres
- dlm_send_mig_lockres_msg
- dlm_init_migratable_lockres
- dlm_prepare_lvb_for_migration
- dlm_add_lock_to_array
- dlm_add_dummy_lock
- dlm_is_dummy_lock
- dlm_send_one_lockres
- dlm_mig_lockres_handler
- dlm_mig_lockres_worker
- dlm_lockres_master_requery
- dlm_do_master_requery
- dlm_master_requery_handler
- dlm_list_num_to_pointer
- dlm_process_recovery_data
- dlm_move_lockres_to_recovery_list
- dlm_finish_local_lockres_recovery
- dlm_lvb_needs_invalidation
- dlm_revalidate_lvb
- dlm_free_dead_locks
- dlm_do_local_recovery_cleanup
- __dlm_hb_node_down
- dlm_hb_node_down_cb
- dlm_hb_node_up_cb
- dlm_reco_ast
- dlm_reco_bast
- dlm_reco_unlock_ast
- dlm_pick_recovery_master
- dlm_send_begin_reco_message
- dlm_begin_reco_handler
- dlm_send_finalize_reco_message
- dlm_finalize_reco_handler
1
2
3
4
5
6
7
8
9
10
11
12
13 #include <linux/module.h>
14 #include <linux/fs.h>
15 #include <linux/types.h>
16 #include <linux/slab.h>
17 #include <linux/highmem.h>
18 #include <linux/init.h>
19 #include <linux/sysctl.h>
20 #include <linux/random.h>
21 #include <linux/blkdev.h>
22 #include <linux/socket.h>
23 #include <linux/inet.h>
24 #include <linux/timer.h>
25 #include <linux/kthread.h>
26 #include <linux/delay.h>
27
28
29 #include "../cluster/heartbeat.h"
30 #include "../cluster/nodemanager.h"
31 #include "../cluster/tcp.h"
32
33 #include "dlmapi.h"
34 #include "dlmcommon.h"
35 #include "dlmdomain.h"
36
37 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_RECOVERY)
38 #include "../cluster/masklog.h"
39
40 static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node);
41
42 static int dlm_recovery_thread(void *data);
43 static int dlm_do_recovery(struct dlm_ctxt *dlm);
44
45 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
46 static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node);
47 static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node);
48 static int dlm_request_all_locks(struct dlm_ctxt *dlm,
49 u8 request_from, u8 dead_node);
50 static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm);
51
52 static inline int dlm_num_locks_in_lockres(struct dlm_lock_resource *res);
53 static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres,
54 const char *lockname, int namelen,
55 int total_locks, u64 cookie,
56 u8 flags, u8 master);
57 static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
58 struct dlm_migratable_lockres *mres,
59 u8 send_to,
60 struct dlm_lock_resource *res,
61 int total_locks);
62 static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
63 struct dlm_lock_resource *res,
64 struct dlm_migratable_lockres *mres);
65 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
66 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
67 u8 dead_node, u8 send_to);
68 static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node);
69 static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
70 struct list_head *list, u8 dead_node);
71 static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
72 u8 dead_node, u8 new_master);
73 static void dlm_reco_ast(void *astdata);
74 static void dlm_reco_bast(void *astdata, int blocked_type);
75 static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st);
76 static void dlm_request_all_locks_worker(struct dlm_work_item *item,
77 void *data);
78 static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data);
79 static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
80 struct dlm_lock_resource *res,
81 u8 *real_master);
82
83 static u64 dlm_get_next_mig_cookie(void);
84
85 static DEFINE_SPINLOCK(dlm_reco_state_lock);
86 static DEFINE_SPINLOCK(dlm_mig_cookie_lock);
87 static u64 dlm_mig_cookie = 1;
88
89 static u64 dlm_get_next_mig_cookie(void)
90 {
91 u64 c;
92 spin_lock(&dlm_mig_cookie_lock);
93 c = dlm_mig_cookie;
94 if (dlm_mig_cookie == (~0ULL))
95 dlm_mig_cookie = 1;
96 else
97 dlm_mig_cookie++;
98 spin_unlock(&dlm_mig_cookie_lock);
99 return c;
100 }
101
102 static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm,
103 u8 dead_node)
104 {
105 assert_spin_locked(&dlm->spinlock);
106 if (dlm->reco.dead_node != dead_node)
107 mlog(0, "%s: changing dead_node from %u to %u\n",
108 dlm->name, dlm->reco.dead_node, dead_node);
109 dlm->reco.dead_node = dead_node;
110 }
111
112 static inline void dlm_set_reco_master(struct dlm_ctxt *dlm,
113 u8 master)
114 {
115 assert_spin_locked(&dlm->spinlock);
116 mlog(0, "%s: changing new_master from %u to %u\n",
117 dlm->name, dlm->reco.new_master, master);
118 dlm->reco.new_master = master;
119 }
120
121 static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm)
122 {
123 assert_spin_locked(&dlm->spinlock);
124 clear_bit(dlm->reco.dead_node, dlm->recovery_map);
125 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
126 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
127 }
128
129 static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
130 {
131 spin_lock(&dlm->spinlock);
132 __dlm_reset_recovery(dlm);
133 spin_unlock(&dlm->spinlock);
134 }
135
136
137 void dlm_dispatch_work(struct work_struct *work)
138 {
139 struct dlm_ctxt *dlm =
140 container_of(work, struct dlm_ctxt, dispatched_work);
141 LIST_HEAD(tmp_list);
142 struct dlm_work_item *item, *next;
143 dlm_workfunc_t *workfunc;
144 int tot=0;
145
146 spin_lock(&dlm->work_lock);
147 list_splice_init(&dlm->work_list, &tmp_list);
148 spin_unlock(&dlm->work_lock);
149
150 list_for_each_entry(item, &tmp_list, list) {
151 tot++;
152 }
153 mlog(0, "%s: work thread has %d work items\n", dlm->name, tot);
154
155 list_for_each_entry_safe(item, next, &tmp_list, list) {
156 workfunc = item->func;
157 list_del_init(&item->list);
158
159
160
161 BUG_ON(item->dlm != dlm);
162
163
164
165 workfunc(item, item->data);
166
167 dlm_put(dlm);
168 kfree(item);
169 }
170 }
171
172
173
174
175
176 void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
177 {
178
179
180
181
182
183
184 wake_up(&dlm->dlm_reco_thread_wq);
185 }
186
187
188 int dlm_launch_recovery_thread(struct dlm_ctxt *dlm)
189 {
190 mlog(0, "starting dlm recovery thread...\n");
191
192 dlm->dlm_reco_thread_task = kthread_run(dlm_recovery_thread, dlm,
193 "dlm_reco-%s", dlm->name);
194 if (IS_ERR(dlm->dlm_reco_thread_task)) {
195 mlog_errno(PTR_ERR(dlm->dlm_reco_thread_task));
196 dlm->dlm_reco_thread_task = NULL;
197 return -EINVAL;
198 }
199
200 return 0;
201 }
202
203 void dlm_complete_recovery_thread(struct dlm_ctxt *dlm)
204 {
205 if (dlm->dlm_reco_thread_task) {
206 mlog(0, "waiting for dlm recovery thread to exit\n");
207 kthread_stop(dlm->dlm_reco_thread_task);
208 dlm->dlm_reco_thread_task = NULL;
209 }
210 }
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237 static void dlm_print_reco_node_status(struct dlm_ctxt *dlm)
238 {
239 struct dlm_reco_node_data *ndata;
240 struct dlm_lock_resource *res;
241
242 mlog(ML_NOTICE, "%s(%d): recovery info, state=%s, dead=%u, master=%u\n",
243 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task),
244 dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive",
245 dlm->reco.dead_node, dlm->reco.new_master);
246
247 list_for_each_entry(ndata, &dlm->reco.node_data, list) {
248 char *st = "unknown";
249 switch (ndata->state) {
250 case DLM_RECO_NODE_DATA_INIT:
251 st = "init";
252 break;
253 case DLM_RECO_NODE_DATA_REQUESTING:
254 st = "requesting";
255 break;
256 case DLM_RECO_NODE_DATA_DEAD:
257 st = "dead";
258 break;
259 case DLM_RECO_NODE_DATA_RECEIVING:
260 st = "receiving";
261 break;
262 case DLM_RECO_NODE_DATA_REQUESTED:
263 st = "requested";
264 break;
265 case DLM_RECO_NODE_DATA_DONE:
266 st = "done";
267 break;
268 case DLM_RECO_NODE_DATA_FINALIZE_SENT:
269 st = "finalize-sent";
270 break;
271 default:
272 st = "bad";
273 break;
274 }
275 mlog(ML_NOTICE, "%s: reco state, node %u, state=%s\n",
276 dlm->name, ndata->node_num, st);
277 }
278 list_for_each_entry(res, &dlm->reco.resources, recovering) {
279 mlog(ML_NOTICE, "%s: lockres %.*s on recovering list\n",
280 dlm->name, res->lockname.len, res->lockname.name);
281 }
282 }
283
284 #define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000)
285
286 static int dlm_recovery_thread(void *data)
287 {
288 int status;
289 struct dlm_ctxt *dlm = data;
290 unsigned long timeout = msecs_to_jiffies(DLM_RECO_THREAD_TIMEOUT_MS);
291
292 mlog(0, "dlm thread running for %s...\n", dlm->name);
293
294 while (!kthread_should_stop()) {
295 if (dlm_domain_fully_joined(dlm)) {
296 status = dlm_do_recovery(dlm);
297 if (status == -EAGAIN) {
298
299 continue;
300 }
301 if (status < 0)
302 mlog_errno(status);
303 }
304
305 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq,
306 kthread_should_stop(),
307 timeout);
308 }
309
310 mlog(0, "quitting DLM recovery thread\n");
311 return 0;
312 }
313
314
315 static int dlm_reco_master_ready(struct dlm_ctxt *dlm)
316 {
317 int ready;
318 spin_lock(&dlm->spinlock);
319 ready = (dlm->reco.new_master != O2NM_INVALID_NODE_NUM);
320 spin_unlock(&dlm->spinlock);
321 return ready;
322 }
323
324
325
326 int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
327 {
328 int dead;
329 spin_lock(&dlm->spinlock);
330 dead = !test_bit(node, dlm->domain_map);
331 spin_unlock(&dlm->spinlock);
332 return dead;
333 }
334
335
336
337 static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node)
338 {
339 int recovered;
340 spin_lock(&dlm->spinlock);
341 recovered = !test_bit(node, dlm->recovery_map);
342 spin_unlock(&dlm->spinlock);
343 return recovered;
344 }
345
346
347 void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
348 {
349 if (dlm_is_node_dead(dlm, node))
350 return;
351
352 printk(KERN_NOTICE "o2dlm: Waiting on the death of node %u in "
353 "domain %s\n", node, dlm->name);
354
355 if (timeout)
356 wait_event_timeout(dlm->dlm_reco_thread_wq,
357 dlm_is_node_dead(dlm, node),
358 msecs_to_jiffies(timeout));
359 else
360 wait_event(dlm->dlm_reco_thread_wq,
361 dlm_is_node_dead(dlm, node));
362 }
363
364 void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout)
365 {
366 if (dlm_is_node_recovered(dlm, node))
367 return;
368
369 printk(KERN_NOTICE "o2dlm: Waiting on the recovery of node %u in "
370 "domain %s\n", node, dlm->name);
371
372 if (timeout)
373 wait_event_timeout(dlm->dlm_reco_thread_wq,
374 dlm_is_node_recovered(dlm, node),
375 msecs_to_jiffies(timeout));
376 else
377 wait_event(dlm->dlm_reco_thread_wq,
378 dlm_is_node_recovered(dlm, node));
379 }
380
381
382
383
384
385
386
387 static int dlm_in_recovery(struct dlm_ctxt *dlm)
388 {
389 int in_recovery;
390 spin_lock(&dlm->spinlock);
391 in_recovery = !!(dlm->reco.state & DLM_RECO_STATE_ACTIVE);
392 spin_unlock(&dlm->spinlock);
393 return in_recovery;
394 }
395
396
397 void dlm_wait_for_recovery(struct dlm_ctxt *dlm)
398 {
399 if (dlm_in_recovery(dlm)) {
400 mlog(0, "%s: reco thread %d in recovery: "
401 "state=%d, master=%u, dead=%u\n",
402 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task),
403 dlm->reco.state, dlm->reco.new_master,
404 dlm->reco.dead_node);
405 }
406 wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
407 }
408
409 static void dlm_begin_recovery(struct dlm_ctxt *dlm)
410 {
411 assert_spin_locked(&dlm->spinlock);
412 BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE);
413 printk(KERN_NOTICE "o2dlm: Begin recovery on domain %s for node %u\n",
414 dlm->name, dlm->reco.dead_node);
415 dlm->reco.state |= DLM_RECO_STATE_ACTIVE;
416 }
417
418 static void dlm_end_recovery(struct dlm_ctxt *dlm)
419 {
420 spin_lock(&dlm->spinlock);
421 BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE));
422 dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE;
423 spin_unlock(&dlm->spinlock);
424 printk(KERN_NOTICE "o2dlm: End recovery on domain %s\n", dlm->name);
425 wake_up(&dlm->reco.event);
426 }
427
428 static void dlm_print_recovery_master(struct dlm_ctxt *dlm)
429 {
430 printk(KERN_NOTICE "o2dlm: Node %u (%s) is the Recovery Master for the "
431 "dead node %u in domain %s\n", dlm->reco.new_master,
432 (dlm->node_num == dlm->reco.new_master ? "me" : "he"),
433 dlm->reco.dead_node, dlm->name);
434 }
435
436 static int dlm_do_recovery(struct dlm_ctxt *dlm)
437 {
438 int status = 0;
439 int ret;
440
441 spin_lock(&dlm->spinlock);
442
443 if (dlm->migrate_done) {
444 mlog(0, "%s: no need do recovery after migrating all "
445 "lock resources\n", dlm->name);
446 spin_unlock(&dlm->spinlock);
447 return 0;
448 }
449
450
451 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM &&
452 test_bit(dlm->reco.new_master, dlm->recovery_map)) {
453 mlog(0, "new master %u died while recovering %u!\n",
454 dlm->reco.new_master, dlm->reco.dead_node);
455
456 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
457 }
458
459
460 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
461 int bit;
462
463 bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES, 0);
464 if (bit >= O2NM_MAX_NODES || bit < 0)
465 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
466 else
467 dlm_set_reco_dead_node(dlm, bit);
468 } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
469
470 mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n",
471 dlm->reco.dead_node);
472 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
473 }
474
475 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
476
477 spin_unlock(&dlm->spinlock);
478
479 return 0;
480 }
481 mlog(0, "%s(%d):recovery thread found node %u in the recovery map!\n",
482 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task),
483 dlm->reco.dead_node);
484
485
486
487 dlm_begin_recovery(dlm);
488
489 spin_unlock(&dlm->spinlock);
490
491 if (dlm->reco.new_master == dlm->node_num)
492 goto master_here;
493
494 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
495
496
497
498
499 ret = dlm_pick_recovery_master(dlm);
500 if (!ret) {
501
502 goto master_here;
503 }
504 mlog(0, "another node will master this recovery session.\n");
505 }
506
507 dlm_print_recovery_master(dlm);
508
509
510
511
512 dlm_end_recovery(dlm);
513
514
515 return 0;
516
517 master_here:
518 dlm_print_recovery_master(dlm);
519
520 status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
521 if (status < 0) {
522
523 mlog(ML_ERROR, "%s: Error %d remastering locks for node %u, "
524 "retrying.\n", dlm->name, status, dlm->reco.dead_node);
525
526
527 msleep(100);
528 } else {
529
530 mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n",
531 dlm->name, dlm->reco.dead_node, dlm->node_num);
532 spin_lock(&dlm->spinlock);
533 __dlm_reset_recovery(dlm);
534 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
535 spin_unlock(&dlm->spinlock);
536 }
537 dlm_end_recovery(dlm);
538
539
540 return -EAGAIN;
541 }
542
543 static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
544 {
545 int status = 0;
546 struct dlm_reco_node_data *ndata;
547 int all_nodes_done;
548 int destroy = 0;
549 int pass = 0;
550
551 do {
552
553
554 status = dlm_init_recovery_area(dlm, dead_node);
555 if (status < 0) {
556 mlog(ML_ERROR, "%s: failed to alloc recovery area, "
557 "retrying\n", dlm->name);
558 msleep(1000);
559 }
560 } while (status != 0);
561
562
563
564 list_for_each_entry(ndata, &dlm->reco.node_data, list) {
565 BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT);
566 ndata->state = DLM_RECO_NODE_DATA_REQUESTING;
567
568 mlog(0, "%s: Requesting lock info from node %u\n", dlm->name,
569 ndata->node_num);
570
571 if (ndata->node_num == dlm->node_num) {
572 ndata->state = DLM_RECO_NODE_DATA_DONE;
573 continue;
574 }
575
576 do {
577 status = dlm_request_all_locks(dlm, ndata->node_num,
578 dead_node);
579 if (status < 0) {
580 mlog_errno(status);
581 if (dlm_is_host_down(status)) {
582
583 status = 0;
584 ndata->state = DLM_RECO_NODE_DATA_DEAD;
585
586
587 wait_event_timeout(dlm->dlm_reco_thread_wq,
588 dlm_is_node_dead(dlm,
589 ndata->node_num),
590 msecs_to_jiffies(1000));
591 mlog(0, "waited 1 sec for %u, "
592 "dead? %s\n", ndata->node_num,
593 dlm_is_node_dead(dlm, ndata->node_num) ?
594 "yes" : "no");
595 } else {
596
597 mlog(0, "%s: node %u returned "
598 "%d during recovery, retrying "
599 "after a short wait\n",
600 dlm->name, ndata->node_num,
601 status);
602 msleep(100);
603 }
604 }
605 } while (status != 0);
606
607 spin_lock(&dlm_reco_state_lock);
608 switch (ndata->state) {
609 case DLM_RECO_NODE_DATA_INIT:
610 case DLM_RECO_NODE_DATA_FINALIZE_SENT:
611 case DLM_RECO_NODE_DATA_REQUESTED:
612 BUG();
613 break;
614 case DLM_RECO_NODE_DATA_DEAD:
615 mlog(0, "node %u died after requesting "
616 "recovery info for node %u\n",
617 ndata->node_num, dead_node);
618
619
620 break;
621 case DLM_RECO_NODE_DATA_REQUESTING:
622 ndata->state = DLM_RECO_NODE_DATA_REQUESTED;
623 mlog(0, "now receiving recovery data from "
624 "node %u for dead node %u\n",
625 ndata->node_num, dead_node);
626 break;
627 case DLM_RECO_NODE_DATA_RECEIVING:
628 mlog(0, "already receiving recovery data from "
629 "node %u for dead node %u\n",
630 ndata->node_num, dead_node);
631 break;
632 case DLM_RECO_NODE_DATA_DONE:
633 mlog(0, "already DONE receiving recovery data "
634 "from node %u for dead node %u\n",
635 ndata->node_num, dead_node);
636 break;
637 }
638 spin_unlock(&dlm_reco_state_lock);
639 }
640
641 mlog(0, "%s: Done requesting all lock info\n", dlm->name);
642
643
644
645
646 while (1) {
647
648
649 all_nodes_done = 1;
650 spin_lock(&dlm_reco_state_lock);
651 list_for_each_entry(ndata, &dlm->reco.node_data, list) {
652 mlog(0, "checking recovery state of node %u\n",
653 ndata->node_num);
654 switch (ndata->state) {
655 case DLM_RECO_NODE_DATA_INIT:
656 case DLM_RECO_NODE_DATA_REQUESTING:
657 mlog(ML_ERROR, "bad ndata state for "
658 "node %u: state=%d\n",
659 ndata->node_num, ndata->state);
660 BUG();
661 break;
662 case DLM_RECO_NODE_DATA_DEAD:
663 mlog(0, "node %u died after "
664 "requesting recovery info for "
665 "node %u\n", ndata->node_num,
666 dead_node);
667 break;
668 case DLM_RECO_NODE_DATA_RECEIVING:
669 case DLM_RECO_NODE_DATA_REQUESTED:
670 mlog(0, "%s: node %u still in state %s\n",
671 dlm->name, ndata->node_num,
672 ndata->state==DLM_RECO_NODE_DATA_RECEIVING ?
673 "receiving" : "requested");
674 all_nodes_done = 0;
675 break;
676 case DLM_RECO_NODE_DATA_DONE:
677 mlog(0, "%s: node %u state is done\n",
678 dlm->name, ndata->node_num);
679 break;
680 case DLM_RECO_NODE_DATA_FINALIZE_SENT:
681 mlog(0, "%s: node %u state is finalize\n",
682 dlm->name, ndata->node_num);
683 break;
684 }
685 }
686 spin_unlock(&dlm_reco_state_lock);
687
688 mlog(0, "pass #%d, all_nodes_done?: %s\n", ++pass,
689 all_nodes_done?"yes":"no");
690 if (all_nodes_done) {
691 int ret;
692
693
694
695
696
697 spin_lock(&dlm->spinlock);
698 dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
699 spin_unlock(&dlm->spinlock);
700
701
702
703
704 mlog(0, "all nodes are done! send finalize\n");
705 ret = dlm_send_finalize_reco_message(dlm);
706 if (ret < 0)
707 mlog_errno(ret);
708
709 spin_lock(&dlm->spinlock);
710 dlm_finish_local_lockres_recovery(dlm, dead_node,
711 dlm->node_num);
712 spin_unlock(&dlm->spinlock);
713 mlog(0, "should be done with recovery!\n");
714
715 mlog(0, "finishing recovery of %s at %lu, "
716 "dead=%u, this=%u, new=%u\n", dlm->name,
717 jiffies, dlm->reco.dead_node,
718 dlm->node_num, dlm->reco.new_master);
719 destroy = 1;
720 status = 0;
721
722 dlm_kick_thread(dlm, NULL);
723 break;
724 }
725
726
727 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq,
728 kthread_should_stop(),
729 msecs_to_jiffies(DLM_RECO_THREAD_TIMEOUT_MS));
730
731 }
732
733 if (destroy)
734 dlm_destroy_recovery_area(dlm);
735
736 return status;
737 }
738
739 static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
740 {
741 int num=0;
742 struct dlm_reco_node_data *ndata;
743
744 spin_lock(&dlm->spinlock);
745 memcpy(dlm->reco.node_map, dlm->domain_map, sizeof(dlm->domain_map));
746
747
748 spin_unlock(&dlm->spinlock);
749
750 while (1) {
751 num = find_next_bit (dlm->reco.node_map, O2NM_MAX_NODES, num);
752 if (num >= O2NM_MAX_NODES) {
753 break;
754 }
755 BUG_ON(num == dead_node);
756
757 ndata = kzalloc(sizeof(*ndata), GFP_NOFS);
758 if (!ndata) {
759 dlm_destroy_recovery_area(dlm);
760 return -ENOMEM;
761 }
762 ndata->node_num = num;
763 ndata->state = DLM_RECO_NODE_DATA_INIT;
764 spin_lock(&dlm_reco_state_lock);
765 list_add_tail(&ndata->list, &dlm->reco.node_data);
766 spin_unlock(&dlm_reco_state_lock);
767 num++;
768 }
769
770 return 0;
771 }
772
773 static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm)
774 {
775 struct dlm_reco_node_data *ndata, *next;
776 LIST_HEAD(tmplist);
777
778 spin_lock(&dlm_reco_state_lock);
779 list_splice_init(&dlm->reco.node_data, &tmplist);
780 spin_unlock(&dlm_reco_state_lock);
781
782 list_for_each_entry_safe(ndata, next, &tmplist, list) {
783 list_del_init(&ndata->list);
784 kfree(ndata);
785 }
786 }
787
788 static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,
789 u8 dead_node)
790 {
791 struct dlm_lock_request lr;
792 int ret;
793 int status;
794
795 mlog(0, "\n");
796
797
798 mlog(0, "dlm_request_all_locks: dead node is %u, sending request "
799 "to %u\n", dead_node, request_from);
800
801 memset(&lr, 0, sizeof(lr));
802 lr.node_idx = dlm->node_num;
803 lr.dead_node = dead_node;
804
805
806 ret = o2net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key,
807 &lr, sizeof(lr), request_from, &status);
808
809
810 if (ret < 0)
811 mlog(ML_ERROR, "%s: Error %d send LOCK_REQUEST to node %u "
812 "to recover dead node %u\n", dlm->name, ret,
813 request_from, dead_node);
814 else
815 ret = status;
816
817
818 return ret;
819
820 }
821
822 int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,
823 void **ret_data)
824 {
825 struct dlm_ctxt *dlm = data;
826 struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf;
827 char *buf = NULL;
828 struct dlm_work_item *item = NULL;
829
830 if (!dlm_grab(dlm))
831 return -EINVAL;
832
833 if (lr->dead_node != dlm->reco.dead_node) {
834 mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local "
835 "dead_node is %u\n", dlm->name, lr->node_idx,
836 lr->dead_node, dlm->reco.dead_node);
837 dlm_print_reco_node_status(dlm);
838
839 dlm_put(dlm);
840 return -ENOMEM;
841 }
842 BUG_ON(lr->dead_node != dlm->reco.dead_node);
843
844 item = kzalloc(sizeof(*item), GFP_NOFS);
845 if (!item) {
846 dlm_put(dlm);
847 return -ENOMEM;
848 }
849
850
851 buf = (char *) __get_free_page(GFP_NOFS);
852 if (!buf) {
853 kfree(item);
854 dlm_put(dlm);
855 return -ENOMEM;
856 }
857
858
859 dlm_grab(dlm);
860 dlm_init_work_item(dlm, item, dlm_request_all_locks_worker, buf);
861 item->u.ral.reco_master = lr->node_idx;
862 item->u.ral.dead_node = lr->dead_node;
863 spin_lock(&dlm->work_lock);
864 list_add_tail(&item->list, &dlm->work_list);
865 spin_unlock(&dlm->work_lock);
866 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
867
868 dlm_put(dlm);
869 return 0;
870 }
871
872 static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
873 {
874 struct dlm_migratable_lockres *mres;
875 struct dlm_lock_resource *res;
876 struct dlm_ctxt *dlm;
877 LIST_HEAD(resources);
878 int ret;
879 u8 dead_node, reco_master;
880 int skip_all_done = 0;
881
882 dlm = item->dlm;
883 dead_node = item->u.ral.dead_node;
884 reco_master = item->u.ral.reco_master;
885 mres = (struct dlm_migratable_lockres *)data;
886
887 mlog(0, "%s: recovery worker started, dead=%u, master=%u\n",
888 dlm->name, dead_node, reco_master);
889
890 if (dead_node != dlm->reco.dead_node ||
891 reco_master != dlm->reco.new_master) {
892
893
894 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
895 mlog(ML_NOTICE, "%s: will not send recovery state, "
896 "recovery master %u died, thread=(dead=%u,mas=%u)"
897 " current=(dead=%u,mas=%u)\n", dlm->name,
898 reco_master, dead_node, reco_master,
899 dlm->reco.dead_node, dlm->reco.new_master);
900 } else {
901 mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, "
902 "master=%u), request(dead=%u, master=%u)\n",
903 dlm->name, dlm->reco.dead_node,
904 dlm->reco.new_master, dead_node, reco_master);
905 }
906 goto leave;
907 }
908
909
910
911
912
913
914
915 dlm_move_reco_locks_to_list(dlm, &resources, dead_node);
916
917
918
919
920
921 list_for_each_entry(res, &resources, recovering) {
922 ret = dlm_send_one_lockres(dlm, res, mres, reco_master,
923 DLM_MRES_RECOVERY);
924 if (ret < 0) {
925 mlog(ML_ERROR, "%s: node %u went down while sending "
926 "recovery state for dead node %u, ret=%d\n", dlm->name,
927 reco_master, dead_node, ret);
928 skip_all_done = 1;
929 break;
930 }
931 }
932
933
934 spin_lock(&dlm->spinlock);
935 list_splice_init(&resources, &dlm->reco.resources);
936 spin_unlock(&dlm->spinlock);
937
938 if (!skip_all_done) {
939 ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
940 if (ret < 0) {
941 mlog(ML_ERROR, "%s: node %u went down while sending "
942 "recovery all-done for dead node %u, ret=%d\n",
943 dlm->name, reco_master, dead_node, ret);
944 }
945 }
946 leave:
947 free_page((unsigned long)data);
948 }
949
950
951 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
952 {
953 int ret, tmpret;
954 struct dlm_reco_data_done done_msg;
955
956 memset(&done_msg, 0, sizeof(done_msg));
957 done_msg.node_idx = dlm->node_num;
958 done_msg.dead_node = dead_node;
959 mlog(0, "sending DATA DONE message to %u, "
960 "my node=%u, dead node=%u\n", send_to, done_msg.node_idx,
961 done_msg.dead_node);
962
963 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
964 sizeof(done_msg), send_to, &tmpret);
965 if (ret < 0) {
966 mlog(ML_ERROR, "%s: Error %d send RECO_DATA_DONE to node %u "
967 "to recover dead node %u\n", dlm->name, ret, send_to,
968 dead_node);
969 if (!dlm_is_host_down(ret)) {
970 BUG();
971 }
972 } else
973 ret = tmpret;
974 return ret;
975 }
976
977
978 int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,
979 void **ret_data)
980 {
981 struct dlm_ctxt *dlm = data;
982 struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf;
983 struct dlm_reco_node_data *ndata = NULL;
984 int ret = -EINVAL;
985
986 if (!dlm_grab(dlm))
987 return -EINVAL;
988
989 mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, "
990 "node_idx=%u, this node=%u\n", done->dead_node,
991 dlm->reco.dead_node, done->node_idx, dlm->node_num);
992
993 mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node),
994 "Got DATA DONE: dead_node=%u, reco.dead_node=%u, "
995 "node_idx=%u, this node=%u\n", done->dead_node,
996 dlm->reco.dead_node, done->node_idx, dlm->node_num);
997
998 spin_lock(&dlm_reco_state_lock);
999 list_for_each_entry(ndata, &dlm->reco.node_data, list) {
1000 if (ndata->node_num != done->node_idx)
1001 continue;
1002
1003 switch (ndata->state) {
1004
1005 case DLM_RECO_NODE_DATA_INIT:
1006 case DLM_RECO_NODE_DATA_DEAD:
1007 case DLM_RECO_NODE_DATA_FINALIZE_SENT:
1008 mlog(ML_ERROR, "bad ndata state for node %u:"
1009 " state=%d\n", ndata->node_num,
1010 ndata->state);
1011 BUG();
1012 break;
1013
1014
1015 case DLM_RECO_NODE_DATA_DONE:
1016 case DLM_RECO_NODE_DATA_RECEIVING:
1017 case DLM_RECO_NODE_DATA_REQUESTED:
1018 case DLM_RECO_NODE_DATA_REQUESTING:
1019 mlog(0, "node %u is DONE sending "
1020 "recovery data!\n",
1021 ndata->node_num);
1022
1023 ndata->state = DLM_RECO_NODE_DATA_DONE;
1024 ret = 0;
1025 break;
1026 }
1027 }
1028 spin_unlock(&dlm_reco_state_lock);
1029
1030
1031 if (!ret)
1032 dlm_kick_recovery_thread(dlm);
1033
1034 if (ret < 0)
1035 mlog(ML_ERROR, "failed to find recovery node data for node "
1036 "%u\n", done->node_idx);
1037 dlm_put(dlm);
1038
1039 mlog(0, "leaving reco data done handler, ret=%d\n", ret);
1040 return ret;
1041 }
1042
1043 static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
1044 struct list_head *list,
1045 u8 dead_node)
1046 {
1047 struct dlm_lock_resource *res, *next;
1048 struct dlm_lock *lock;
1049
1050 spin_lock(&dlm->spinlock);
1051 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) {
1052
1053
1054 if (dlm_is_recovery_lock(res->lockname.name,
1055 res->lockname.len)) {
1056 spin_lock(&res->spinlock);
1057 list_for_each_entry(lock, &res->granted, list) {
1058 if (lock->ml.node == dead_node) {
1059 mlog(0, "AHA! there was "
1060 "a $RECOVERY lock for dead "
1061 "node %u (%s)!\n",
1062 dead_node, dlm->name);
1063 list_del_init(&lock->list);
1064 dlm_lock_put(lock);
1065
1066
1067 dlm_lock_put(lock);
1068 break;
1069 }
1070 }
1071 spin_unlock(&res->spinlock);
1072 continue;
1073 }
1074
1075 if (res->owner == dead_node) {
1076 mlog(0, "found lockres owned by dead node while "
1077 "doing recovery for node %u. sending it.\n",
1078 dead_node);
1079 list_move_tail(&res->recovering, list);
1080 } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
1081 mlog(0, "found UNKNOWN owner while doing recovery "
1082 "for node %u. sending it.\n", dead_node);
1083 list_move_tail(&res->recovering, list);
1084 }
1085 }
1086 spin_unlock(&dlm->spinlock);
1087 }
1088
1089 static inline int dlm_num_locks_in_lockres(struct dlm_lock_resource *res)
1090 {
1091 int total_locks = 0;
1092 struct list_head *iter, *queue = &res->granted;
1093 int i;
1094
1095 for (i=0; i<3; i++) {
1096 list_for_each(iter, queue)
1097 total_locks++;
1098 queue++;
1099 }
1100 return total_locks;
1101 }
1102
1103
1104 static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
1105 struct dlm_migratable_lockres *mres,
1106 u8 send_to,
1107 struct dlm_lock_resource *res,
1108 int total_locks)
1109 {
1110 u64 mig_cookie = be64_to_cpu(mres->mig_cookie);
1111 int mres_total_locks = be32_to_cpu(mres->total_locks);
1112 int ret = 0, status = 0;
1113 u8 orig_flags = mres->flags,
1114 orig_master = mres->master;
1115
1116 BUG_ON(mres->num_locks > DLM_MAX_MIGRATABLE_LOCKS);
1117 if (!mres->num_locks)
1118 return 0;
1119
1120
1121 orig_flags = mres->flags;
1122 BUG_ON(total_locks > mres_total_locks);
1123 if (total_locks == mres_total_locks)
1124 mres->flags |= DLM_MRES_ALL_DONE;
1125
1126 mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
1127 dlm->name, res->lockname.len, res->lockname.name,
1128 orig_flags & DLM_MRES_MIGRATION ? "migration" : "recovery",
1129 send_to);
1130
1131
1132 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
1133 struct_size(mres, ml, mres->num_locks),
1134 send_to, &status);
1135 if (ret < 0) {
1136
1137
1138 mlog(ML_ERROR, "%s: res %.*s, Error %d send MIG_LOCKRES to "
1139 "node %u (%s)\n", dlm->name, mres->lockname_len,
1140 mres->lockname, ret, send_to,
1141 (orig_flags & DLM_MRES_MIGRATION ?
1142 "migration" : "recovery"));
1143 } else {
1144
1145 ret = status;
1146 if (ret < 0) {
1147 mlog_errno(ret);
1148
1149 if (ret == -EFAULT) {
1150 mlog(ML_ERROR, "node %u told me to kill "
1151 "myself!\n", send_to);
1152 BUG();
1153 }
1154 }
1155 }
1156
1157
1158 dlm_init_migratable_lockres(mres, res->lockname.name,
1159 res->lockname.len, mres_total_locks,
1160 mig_cookie, orig_flags, orig_master);
1161 return ret;
1162 }
1163
1164 static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres,
1165 const char *lockname, int namelen,
1166 int total_locks, u64 cookie,
1167 u8 flags, u8 master)
1168 {
1169
1170 clear_page(mres);
1171 mres->lockname_len = namelen;
1172 memcpy(mres->lockname, lockname, namelen);
1173 mres->num_locks = 0;
1174 mres->total_locks = cpu_to_be32(total_locks);
1175 mres->mig_cookie = cpu_to_be64(cookie);
1176 mres->flags = flags;
1177 mres->master = master;
1178 }
1179
1180 static void dlm_prepare_lvb_for_migration(struct dlm_lock *lock,
1181 struct dlm_migratable_lockres *mres,
1182 int queue)
1183 {
1184 if (!lock->lksb)
1185 return;
1186
1187
1188 if (queue == DLM_BLOCKED_LIST)
1189 return;
1190
1191
1192 if (lock->ml.type != LKM_EXMODE && lock->ml.type != LKM_PRMODE)
1193 return;
1194
1195 if (dlm_lvb_is_empty(mres->lvb)) {
1196 memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN);
1197 return;
1198 }
1199
1200
1201 if (!memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))
1202 return;
1203
1204 mlog(ML_ERROR, "Mismatched lvb in lock cookie=%u:%llu, name=%.*s, "
1205 "node=%u\n",
1206 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
1207 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
1208 lock->lockres->lockname.len, lock->lockres->lockname.name,
1209 lock->ml.node);
1210 dlm_print_one_lock_resource(lock->lockres);
1211 BUG();
1212 }
1213
1214
1215
1216 static int dlm_add_lock_to_array(struct dlm_lock *lock,
1217 struct dlm_migratable_lockres *mres, int queue)
1218 {
1219 struct dlm_migratable_lock *ml;
1220 int lock_num = mres->num_locks;
1221
1222 ml = &(mres->ml[lock_num]);
1223 ml->cookie = lock->ml.cookie;
1224 ml->type = lock->ml.type;
1225 ml->convert_type = lock->ml.convert_type;
1226 ml->highest_blocked = lock->ml.highest_blocked;
1227 ml->list = queue;
1228 if (lock->lksb) {
1229 ml->flags = lock->lksb->flags;
1230 dlm_prepare_lvb_for_migration(lock, mres, queue);
1231 }
1232 ml->node = lock->ml.node;
1233 mres->num_locks++;
1234
1235 if (mres->num_locks == DLM_MAX_MIGRATABLE_LOCKS)
1236 return 1;
1237 return 0;
1238 }
1239
1240 static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
1241 struct dlm_migratable_lockres *mres)
1242 {
1243 struct dlm_lock dummy;
1244 memset(&dummy, 0, sizeof(dummy));
1245 dummy.ml.cookie = 0;
1246 dummy.ml.type = LKM_IVMODE;
1247 dummy.ml.convert_type = LKM_IVMODE;
1248 dummy.ml.highest_blocked = LKM_IVMODE;
1249 dummy.lksb = NULL;
1250 dummy.ml.node = dlm->node_num;
1251 dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
1252 }
1253
1254 static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
1255 struct dlm_migratable_lock *ml,
1256 u8 *nodenum)
1257 {
1258 if (unlikely(ml->cookie == 0 &&
1259 ml->type == LKM_IVMODE &&
1260 ml->convert_type == LKM_IVMODE &&
1261 ml->highest_blocked == LKM_IVMODE &&
1262 ml->list == DLM_BLOCKED_LIST)) {
1263 *nodenum = ml->node;
1264 return 1;
1265 }
1266 return 0;
1267 }
1268
1269 int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1270 struct dlm_migratable_lockres *mres,
1271 u8 send_to, u8 flags)
1272 {
1273 struct list_head *queue;
1274 int total_locks, i;
1275 u64 mig_cookie = 0;
1276 struct dlm_lock *lock;
1277 int ret = 0;
1278
1279 BUG_ON(!(flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION)));
1280
1281 mlog(0, "sending to %u\n", send_to);
1282
1283 total_locks = dlm_num_locks_in_lockres(res);
1284 if (total_locks > DLM_MAX_MIGRATABLE_LOCKS) {
1285
1286 mlog(0, "argh. lockres has %d locks. this will "
1287 "require more than one network packet to "
1288 "migrate\n", total_locks);
1289 mig_cookie = dlm_get_next_mig_cookie();
1290 }
1291
1292 dlm_init_migratable_lockres(mres, res->lockname.name,
1293 res->lockname.len, total_locks,
1294 mig_cookie, flags, res->owner);
1295
1296 total_locks = 0;
1297 for (i=DLM_GRANTED_LIST; i<=DLM_BLOCKED_LIST; i++) {
1298 queue = dlm_list_idx_to_ptr(res, i);
1299 list_for_each_entry(lock, queue, list) {
1300
1301 total_locks++;
1302 if (!dlm_add_lock_to_array(lock, mres, i))
1303 continue;
1304
1305
1306
1307 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to,
1308 res, total_locks);
1309 if (ret < 0)
1310 goto error;
1311 }
1312 }
1313 if (total_locks == 0) {
1314
1315 mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
1316 dlm->name, res->lockname.len, res->lockname.name,
1317 send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
1318 "migration");
1319 dlm_add_dummy_lock(dlm, mres);
1320 }
1321
1322 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
1323 if (ret < 0)
1324 goto error;
1325 return ret;
1326
1327 error:
1328 mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n",
1329 dlm->name, ret);
1330 if (!dlm_is_host_down(ret))
1331 BUG();
1332 mlog(0, "%s: node %u went down while sending %s "
1333 "lockres %.*s\n", dlm->name, send_to,
1334 flags & DLM_MRES_RECOVERY ? "recovery" : "migration",
1335 res->lockname.len, res->lockname.name);
1336 return ret;
1337 }
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354 int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
1355 void **ret_data)
1356 {
1357 struct dlm_ctxt *dlm = data;
1358 struct dlm_migratable_lockres *mres =
1359 (struct dlm_migratable_lockres *)msg->buf;
1360 int ret = 0;
1361 u8 real_master;
1362 u8 extra_refs = 0;
1363 char *buf = NULL;
1364 struct dlm_work_item *item = NULL;
1365 struct dlm_lock_resource *res = NULL;
1366 unsigned int hash;
1367
1368 if (!dlm_grab(dlm))
1369 return -EINVAL;
1370
1371 if (!dlm_joined(dlm)) {
1372 mlog(ML_ERROR, "Domain %s not joined! "
1373 "lockres %.*s, master %u\n",
1374 dlm->name, mres->lockname_len,
1375 mres->lockname, mres->master);
1376 dlm_put(dlm);
1377 return -EINVAL;
1378 }
1379
1380 BUG_ON(!(mres->flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION)));
1381
1382 real_master = mres->master;
1383 if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) {
1384
1385 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
1386 }
1387
1388 mlog(0, "%s message received from node %u\n",
1389 (mres->flags & DLM_MRES_RECOVERY) ?
1390 "recovery" : "migration", mres->master);
1391 if (mres->flags & DLM_MRES_ALL_DONE)
1392 mlog(0, "all done flag. all lockres data received!\n");
1393
1394 ret = -ENOMEM;
1395 buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS);
1396 item = kzalloc(sizeof(*item), GFP_NOFS);
1397 if (!buf || !item)
1398 goto leave;
1399
1400
1401
1402
1403 hash = dlm_lockid_hash(mres->lockname, mres->lockname_len);
1404 spin_lock(&dlm->spinlock);
1405 res = __dlm_lookup_lockres_full(dlm, mres->lockname, mres->lockname_len,
1406 hash);
1407 if (res) {
1408
1409
1410 spin_lock(&res->spinlock);
1411 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
1412 mlog(0, "%s: node is attempting to migrate "
1413 "lockres %.*s, but marked as dropping "
1414 " ref!\n", dlm->name,
1415 mres->lockname_len, mres->lockname);
1416 ret = -EINVAL;
1417 spin_unlock(&res->spinlock);
1418 spin_unlock(&dlm->spinlock);
1419 dlm_lockres_put(res);
1420 goto leave;
1421 }
1422
1423 if (mres->flags & DLM_MRES_RECOVERY) {
1424 res->state |= DLM_LOCK_RES_RECOVERING;
1425 } else {
1426 if (res->state & DLM_LOCK_RES_MIGRATING) {
1427
1428
1429 mlog(0, "lock %.*s is already migrating\n",
1430 mres->lockname_len,
1431 mres->lockname);
1432 } else if (res->state & DLM_LOCK_RES_RECOVERING) {
1433
1434 mlog(ML_ERROR, "node is attempting to migrate "
1435 "lock %.*s, but marked as recovering!\n",
1436 mres->lockname_len, mres->lockname);
1437 ret = -EFAULT;
1438 spin_unlock(&res->spinlock);
1439 spin_unlock(&dlm->spinlock);
1440 dlm_lockres_put(res);
1441 goto leave;
1442 }
1443 res->state |= DLM_LOCK_RES_MIGRATING;
1444 }
1445 spin_unlock(&res->spinlock);
1446 spin_unlock(&dlm->spinlock);
1447 } else {
1448 spin_unlock(&dlm->spinlock);
1449
1450
1451 res = dlm_new_lockres(dlm, mres->lockname, mres->lockname_len);
1452 if (!res)
1453 goto leave;
1454
1455
1456
1457 dlm_lockres_get(res);
1458
1459
1460 if (mres->flags & DLM_MRES_RECOVERY)
1461 res->state |= DLM_LOCK_RES_RECOVERING;
1462 else
1463 res->state |= DLM_LOCK_RES_MIGRATING;
1464
1465 spin_lock(&dlm->spinlock);
1466 __dlm_insert_lockres(dlm, res);
1467 spin_unlock(&dlm->spinlock);
1468
1469
1470
1471
1472 dlm_lockres_get(res);
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483 extra_refs++;
1484
1485
1486
1487 spin_lock(&res->spinlock);
1488 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
1489 spin_unlock(&res->spinlock);
1490 wake_up(&res->wq);
1491 }
1492
1493
1494
1495
1496 ret = 0;
1497 spin_lock(&res->spinlock);
1498
1499
1500 dlm_lockres_grab_inflight_ref(dlm, res);
1501 if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
1502
1503 BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
1504 mlog(0, "recovery has passed me a lockres with an "
1505 "unknown owner.. will need to requery: "
1506 "%.*s\n", mres->lockname_len, mres->lockname);
1507 } else {
1508
1509
1510 dlm_change_lockres_owner(dlm, res, dlm->node_num);
1511 }
1512 spin_unlock(&res->spinlock);
1513
1514
1515 dlm_grab(dlm);
1516 memcpy(buf, msg->buf, be16_to_cpu(msg->data_len));
1517 dlm_init_work_item(dlm, item, dlm_mig_lockres_worker, buf);
1518 item->u.ml.lockres = res;
1519 item->u.ml.real_master = real_master;
1520 item->u.ml.extra_ref = extra_refs;
1521 spin_lock(&dlm->work_lock);
1522 list_add_tail(&item->list, &dlm->work_list);
1523 spin_unlock(&dlm->work_lock);
1524 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
1525
1526 leave:
1527
1528 if (extra_refs)
1529 dlm_lockres_put(res);
1530
1531 dlm_put(dlm);
1532 if (ret < 0) {
1533 kfree(buf);
1534 kfree(item);
1535 mlog_errno(ret);
1536 }
1537
1538 return ret;
1539 }
1540
1541
1542 static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data)
1543 {
1544 struct dlm_ctxt *dlm;
1545 struct dlm_migratable_lockres *mres;
1546 int ret = 0;
1547 struct dlm_lock_resource *res;
1548 u8 real_master;
1549 u8 extra_ref;
1550
1551 dlm = item->dlm;
1552 mres = (struct dlm_migratable_lockres *)data;
1553
1554 res = item->u.ml.lockres;
1555 real_master = item->u.ml.real_master;
1556 extra_ref = item->u.ml.extra_ref;
1557
1558 if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) {
1559
1560
1561 again:
1562 ret = dlm_lockres_master_requery(dlm, res, &real_master);
1563 if (ret < 0) {
1564 mlog(0, "dlm_lockres_master_requery ret=%d\n",
1565 ret);
1566 goto again;
1567 }
1568 if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) {
1569 mlog(0, "lockres %.*s not claimed. "
1570 "this node will take it.\n",
1571 res->lockname.len, res->lockname.name);
1572 } else {
1573 spin_lock(&res->spinlock);
1574 dlm_lockres_drop_inflight_ref(dlm, res);
1575 spin_unlock(&res->spinlock);
1576 mlog(0, "master needs to respond to sender "
1577 "that node %u still owns %.*s\n",
1578 real_master, res->lockname.len,
1579 res->lockname.name);
1580
1581 goto leave;
1582 }
1583 }
1584
1585 ret = dlm_process_recovery_data(dlm, res, mres);
1586 if (ret < 0)
1587 mlog(0, "dlm_process_recovery_data returned %d\n", ret);
1588 else
1589 mlog(0, "dlm_process_recovery_data succeeded\n");
1590
1591 if ((mres->flags & (DLM_MRES_MIGRATION|DLM_MRES_ALL_DONE)) ==
1592 (DLM_MRES_MIGRATION|DLM_MRES_ALL_DONE)) {
1593 ret = dlm_finish_migration(dlm, res, mres->master);
1594 if (ret < 0)
1595 mlog_errno(ret);
1596 }
1597
1598 leave:
1599
1600 if (res) {
1601 if (extra_ref)
1602 dlm_lockres_put(res);
1603 dlm_lockres_put(res);
1604 }
1605 kfree(data);
1606 }
1607
1608
1609
1610 static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
1611 struct dlm_lock_resource *res,
1612 u8 *real_master)
1613 {
1614 struct dlm_node_iter iter;
1615 int nodenum;
1616 int ret = 0;
1617
1618 *real_master = DLM_LOCK_RES_OWNER_UNKNOWN;
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643 spin_lock(&dlm->spinlock);
1644 dlm_node_iter_init(dlm->domain_map, &iter);
1645 spin_unlock(&dlm->spinlock);
1646
1647 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
1648
1649 if (nodenum == dlm->node_num)
1650 continue;
1651 ret = dlm_do_master_requery(dlm, res, nodenum, real_master);
1652 if (ret < 0) {
1653 mlog_errno(ret);
1654 if (!dlm_is_host_down(ret))
1655 BUG();
1656
1657
1658 }
1659 if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1660 mlog(0, "lock master is %u\n", *real_master);
1661 break;
1662 }
1663 }
1664 return ret;
1665 }
1666
1667
1668 int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1669 u8 nodenum, u8 *real_master)
1670 {
1671 int ret = -EINVAL;
1672 struct dlm_master_requery req;
1673 int status = DLM_LOCK_RES_OWNER_UNKNOWN;
1674
1675 memset(&req, 0, sizeof(req));
1676 req.node_idx = dlm->node_num;
1677 req.namelen = res->lockname.len;
1678 memcpy(req.name, res->lockname.name, res->lockname.len);
1679
1680 resend:
1681 ret = o2net_send_message(DLM_MASTER_REQUERY_MSG, dlm->key,
1682 &req, sizeof(req), nodenum, &status);
1683 if (ret < 0)
1684 mlog(ML_ERROR, "Error %d when sending message %u (key "
1685 "0x%x) to node %u\n", ret, DLM_MASTER_REQUERY_MSG,
1686 dlm->key, nodenum);
1687 else if (status == -ENOMEM) {
1688 mlog_errno(status);
1689 msleep(50);
1690 goto resend;
1691 } else {
1692 BUG_ON(status < 0);
1693 BUG_ON(status > DLM_LOCK_RES_OWNER_UNKNOWN);
1694 *real_master = (u8) (status & 0xff);
1695 mlog(0, "node %u responded to master requery with %u\n",
1696 nodenum, *real_master);
1697 ret = 0;
1698 }
1699 return ret;
1700 }
1701
1702
1703
1704
1705
1706 int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
1707 void **ret_data)
1708 {
1709 struct dlm_ctxt *dlm = data;
1710 struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
1711 struct dlm_lock_resource *res = NULL;
1712 unsigned int hash;
1713 int master = DLM_LOCK_RES_OWNER_UNKNOWN;
1714 u32 flags = DLM_ASSERT_MASTER_REQUERY;
1715 int dispatched = 0;
1716
1717 if (!dlm_grab(dlm)) {
1718
1719
1720 return master;
1721 }
1722
1723 hash = dlm_lockid_hash(req->name, req->namelen);
1724
1725 spin_lock(&dlm->spinlock);
1726 res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash);
1727 if (res) {
1728 spin_lock(&res->spinlock);
1729 master = res->owner;
1730 if (master == dlm->node_num) {
1731 int ret = dlm_dispatch_assert_master(dlm, res,
1732 0, 0, flags);
1733 if (ret < 0) {
1734 mlog_errno(ret);
1735 spin_unlock(&res->spinlock);
1736 dlm_lockres_put(res);
1737 spin_unlock(&dlm->spinlock);
1738 dlm_put(dlm);
1739
1740 return ret;
1741 } else {
1742 dispatched = 1;
1743 __dlm_lockres_grab_inflight_worker(dlm, res);
1744 spin_unlock(&res->spinlock);
1745 }
1746 } else {
1747
1748 spin_unlock(&res->spinlock);
1749 dlm_lockres_put(res);
1750 }
1751 }
1752 spin_unlock(&dlm->spinlock);
1753
1754 if (!dispatched)
1755 dlm_put(dlm);
1756 return master;
1757 }
1758
1759 static inline struct list_head *
1760 dlm_list_num_to_pointer(struct dlm_lock_resource *res, int list_num)
1761 {
1762 struct list_head *ret;
1763 BUG_ON(list_num < 0);
1764 BUG_ON(list_num > 2);
1765 ret = &(res->granted);
1766 ret += list_num;
1767 return ret;
1768 }
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796 static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1797 struct dlm_lock_resource *res,
1798 struct dlm_migratable_lockres *mres)
1799 {
1800 struct dlm_migratable_lock *ml;
1801 struct list_head *queue, *iter;
1802 struct list_head *tmpq = NULL;
1803 struct dlm_lock *newlock = NULL;
1804 struct dlm_lockstatus *lksb = NULL;
1805 int ret = 0;
1806 int i, j, bad;
1807 struct dlm_lock *lock;
1808 u8 from = O2NM_MAX_NODES;
1809 __be64 c;
1810
1811 mlog(0, "running %d locks for this lockres\n", mres->num_locks);
1812 for (i=0; i<mres->num_locks; i++) {
1813 ml = &(mres->ml[i]);
1814
1815 if (dlm_is_dummy_lock(dlm, ml, &from)) {
1816
1817 BUG_ON(mres->num_locks != 1);
1818 mlog(0, "%s:%.*s: dummy lock for %u\n",
1819 dlm->name, mres->lockname_len, mres->lockname,
1820 from);
1821 spin_lock(&res->spinlock);
1822 dlm_lockres_set_refmap_bit(dlm, res, from);
1823 spin_unlock(&res->spinlock);
1824 break;
1825 }
1826 BUG_ON(ml->highest_blocked != LKM_IVMODE);
1827 newlock = NULL;
1828 lksb = NULL;
1829
1830 queue = dlm_list_num_to_pointer(res, ml->list);
1831 tmpq = NULL;
1832
1833
1834
1835
1836 if (ml->node == dlm->node_num) {
1837
1838 BUG_ON(!(mres->flags & DLM_MRES_MIGRATION));
1839
1840 lock = NULL;
1841 spin_lock(&res->spinlock);
1842 for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) {
1843 tmpq = dlm_list_idx_to_ptr(res, j);
1844 list_for_each(iter, tmpq) {
1845 lock = list_entry(iter,
1846 struct dlm_lock, list);
1847 if (lock->ml.cookie == ml->cookie)
1848 break;
1849 lock = NULL;
1850 }
1851 if (lock)
1852 break;
1853 }
1854
1855
1856
1857 if (!lock) {
1858 c = ml->cookie;
1859 mlog(ML_ERROR, "Could not find local lock "
1860 "with cookie %u:%llu, node %u, "
1861 "list %u, flags 0x%x, type %d, "
1862 "conv %d, highest blocked %d\n",
1863 dlm_get_lock_cookie_node(be64_to_cpu(c)),
1864 dlm_get_lock_cookie_seq(be64_to_cpu(c)),
1865 ml->node, ml->list, ml->flags, ml->type,
1866 ml->convert_type, ml->highest_blocked);
1867 __dlm_print_one_lock_resource(res);
1868 BUG();
1869 }
1870
1871 if (lock->ml.node != ml->node) {
1872 c = lock->ml.cookie;
1873 mlog(ML_ERROR, "Mismatched node# in lock "
1874 "cookie %u:%llu, name %.*s, node %u\n",
1875 dlm_get_lock_cookie_node(be64_to_cpu(c)),
1876 dlm_get_lock_cookie_seq(be64_to_cpu(c)),
1877 res->lockname.len, res->lockname.name,
1878 lock->ml.node);
1879 c = ml->cookie;
1880 mlog(ML_ERROR, "Migrate lock cookie %u:%llu, "
1881 "node %u, list %u, flags 0x%x, type %d, "
1882 "conv %d, highest blocked %d\n",
1883 dlm_get_lock_cookie_node(be64_to_cpu(c)),
1884 dlm_get_lock_cookie_seq(be64_to_cpu(c)),
1885 ml->node, ml->list, ml->flags, ml->type,
1886 ml->convert_type, ml->highest_blocked);
1887 __dlm_print_one_lock_resource(res);
1888 BUG();
1889 }
1890
1891 if (tmpq != queue) {
1892 c = ml->cookie;
1893 mlog(0, "Lock cookie %u:%llu was on list %u "
1894 "instead of list %u for %.*s\n",
1895 dlm_get_lock_cookie_node(be64_to_cpu(c)),
1896 dlm_get_lock_cookie_seq(be64_to_cpu(c)),
1897 j, ml->list, res->lockname.len,
1898 res->lockname.name);
1899 __dlm_print_one_lock_resource(res);
1900 spin_unlock(&res->spinlock);
1901 continue;
1902 }
1903
1904
1905
1906
1907
1908
1909 list_move_tail(&lock->list, queue);
1910 spin_unlock(&res->spinlock);
1911
1912 mlog(0, "just reordered a local lock!\n");
1913 continue;
1914 }
1915
1916
1917 newlock = dlm_new_lock(ml->type, ml->node,
1918 be64_to_cpu(ml->cookie), NULL);
1919 if (!newlock) {
1920 ret = -ENOMEM;
1921 goto leave;
1922 }
1923 lksb = newlock->lksb;
1924 dlm_lock_attach_lockres(newlock, res);
1925
1926 if (ml->convert_type != LKM_IVMODE) {
1927 BUG_ON(queue != &res->converting);
1928 newlock->ml.convert_type = ml->convert_type;
1929 }
1930 lksb->flags |= (ml->flags &
1931 (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
1932
1933 if (ml->type == LKM_NLMODE)
1934 goto skip_lvb;
1935
1936
1937
1938
1939
1940 if (ml->list == DLM_BLOCKED_LIST)
1941 goto skip_lvb;
1942
1943 if (!dlm_lvb_is_empty(mres->lvb)) {
1944 if (lksb->flags & DLM_LKSB_PUT_LVB) {
1945
1946
1947
1948 memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);
1949
1950
1951
1952
1953 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
1954 } else {
1955
1956
1957 BUG_ON(ml->type != LKM_EXMODE &&
1958 ml->type != LKM_PRMODE);
1959 if (!dlm_lvb_is_empty(res->lvb) &&
1960 (ml->type == LKM_EXMODE ||
1961 memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
1962 int i;
1963 mlog(ML_ERROR, "%s:%.*s: received bad "
1964 "lvb! type=%d\n", dlm->name,
1965 res->lockname.len,
1966 res->lockname.name, ml->type);
1967 printk("lockres lvb=[");
1968 for (i=0; i<DLM_LVB_LEN; i++)
1969 printk("%02x", res->lvb[i]);
1970 printk("]\nmigrated lvb=[");
1971 for (i=0; i<DLM_LVB_LEN; i++)
1972 printk("%02x", mres->lvb[i]);
1973 printk("]\n");
1974 dlm_print_one_lock_resource(res);
1975 BUG();
1976 }
1977 memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
1978 }
1979 }
1980 skip_lvb:
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998 bad = 0;
1999 spin_lock(&res->spinlock);
2000 list_for_each_entry(lock, queue, list) {
2001 if (lock->ml.cookie == ml->cookie) {
2002 c = lock->ml.cookie;
2003 mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
2004 "exists on this lockres!\n", dlm->name,
2005 res->lockname.len, res->lockname.name,
2006 dlm_get_lock_cookie_node(be64_to_cpu(c)),
2007 dlm_get_lock_cookie_seq(be64_to_cpu(c)));
2008
2009 mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
2010 "node=%u, cookie=%u:%llu, queue=%d\n",
2011 ml->type, ml->convert_type, ml->node,
2012 dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)),
2013 dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)),
2014 ml->list);
2015
2016 __dlm_print_one_lock_resource(res);
2017 bad = 1;
2018 break;
2019 }
2020 }
2021 if (!bad) {
2022 dlm_lock_get(newlock);
2023 if (mres->flags & DLM_MRES_RECOVERY &&
2024 ml->list == DLM_CONVERTING_LIST &&
2025 newlock->ml.type >
2026 newlock->ml.convert_type) {
2027
2028
2029 list_add(&newlock->list, queue);
2030 } else
2031 list_add_tail(&newlock->list, queue);
2032 mlog(0, "%s:%.*s: added lock for node %u, "
2033 "setting refmap bit\n", dlm->name,
2034 res->lockname.len, res->lockname.name, ml->node);
2035 dlm_lockres_set_refmap_bit(dlm, res, ml->node);
2036 }
2037 spin_unlock(&res->spinlock);
2038 }
2039 mlog(0, "done running all the locks\n");
2040
2041 leave:
2042
2043 spin_lock(&res->spinlock);
2044 dlm_lockres_drop_inflight_ref(dlm, res);
2045 spin_unlock(&res->spinlock);
2046
2047 if (ret < 0)
2048 mlog_errno(ret);
2049
2050 return ret;
2051 }
2052
2053 void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm,
2054 struct dlm_lock_resource *res)
2055 {
2056 int i;
2057 struct list_head *queue;
2058 struct dlm_lock *lock, *next;
2059
2060 assert_spin_locked(&dlm->spinlock);
2061 assert_spin_locked(&res->spinlock);
2062 res->state |= DLM_LOCK_RES_RECOVERING;
2063 if (!list_empty(&res->recovering)) {
2064 mlog(0,
2065 "Recovering res %s:%.*s, is already on recovery list!\n",
2066 dlm->name, res->lockname.len, res->lockname.name);
2067 list_del_init(&res->recovering);
2068 dlm_lockres_put(res);
2069 }
2070
2071 dlm_lockres_get(res);
2072 list_add_tail(&res->recovering, &dlm->reco.resources);
2073
2074
2075 for (i=DLM_BLOCKED_LIST; i>=DLM_GRANTED_LIST; i--) {
2076 queue = dlm_list_idx_to_ptr(res, i);
2077 list_for_each_entry_safe(lock, next, queue, list) {
2078 dlm_lock_get(lock);
2079 if (lock->convert_pending) {
2080
2081 mlog(0, "node died with convert pending "
2082 "on %.*s. move back to granted list.\n",
2083 res->lockname.len, res->lockname.name);
2084 dlm_revert_pending_convert(res, lock);
2085 lock->convert_pending = 0;
2086 } else if (lock->lock_pending) {
2087
2088 BUG_ON(i != DLM_BLOCKED_LIST);
2089 mlog(0, "node died with lock pending "
2090 "on %.*s. remove from blocked list and skip.\n",
2091 res->lockname.len, res->lockname.name);
2092
2093
2094
2095
2096
2097 dlm_revert_pending_lock(res, lock);
2098 lock->lock_pending = 0;
2099 } else if (lock->unlock_pending) {
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109 BUG_ON(i != DLM_GRANTED_LIST);
2110 mlog(0, "node died with unlock pending "
2111 "on %.*s. remove from blocked list and skip.\n",
2112 res->lockname.len, res->lockname.name);
2113 dlm_commit_pending_unlock(res, lock);
2114 lock->unlock_pending = 0;
2115 } else if (lock->cancel_pending) {
2116
2117
2118
2119
2120 BUG_ON(i != DLM_CONVERTING_LIST);
2121 mlog(0, "node died with cancel pending "
2122 "on %.*s. move back to granted list.\n",
2123 res->lockname.len, res->lockname.name);
2124 dlm_commit_pending_cancel(res, lock);
2125 lock->cancel_pending = 0;
2126 }
2127 dlm_lock_put(lock);
2128 }
2129 }
2130 }
2131
2132
2133
2134
2135
2136
2137 static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
2138 u8 dead_node, u8 new_master)
2139 {
2140 int i;
2141 struct hlist_head *bucket;
2142 struct dlm_lock_resource *res, *next;
2143
2144 assert_spin_locked(&dlm->spinlock);
2145
2146 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) {
2147 if (res->owner == dead_node) {
2148 mlog(0, "%s: res %.*s, Changing owner from %u to %u\n",
2149 dlm->name, res->lockname.len, res->lockname.name,
2150 res->owner, new_master);
2151 list_del_init(&res->recovering);
2152 spin_lock(&res->spinlock);
2153
2154
2155 dlm_change_lockres_owner(dlm, res, new_master);
2156 res->state &= ~DLM_LOCK_RES_RECOVERING;
2157 if (__dlm_lockres_has_locks(res))
2158 __dlm_dirty_lockres(dlm, res);
2159 spin_unlock(&res->spinlock);
2160 wake_up(&res->wq);
2161 dlm_lockres_put(res);
2162 }
2163 }
2164
2165
2166
2167
2168
2169 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
2170 bucket = dlm_lockres_hash(dlm, i);
2171 hlist_for_each_entry(res, bucket, hash_node) {
2172 if (res->state & DLM_LOCK_RES_RECOVERY_WAITING) {
2173 spin_lock(&res->spinlock);
2174 res->state &= ~DLM_LOCK_RES_RECOVERY_WAITING;
2175 spin_unlock(&res->spinlock);
2176 wake_up(&res->wq);
2177 }
2178
2179 if (!(res->state & DLM_LOCK_RES_RECOVERING))
2180 continue;
2181
2182 if (res->owner != dead_node &&
2183 res->owner != dlm->node_num)
2184 continue;
2185
2186 if (!list_empty(&res->recovering)) {
2187 list_del_init(&res->recovering);
2188 dlm_lockres_put(res);
2189 }
2190
2191
2192
2193 mlog(0, "%s: res %.*s, Changing owner from %u to %u\n",
2194 dlm->name, res->lockname.len, res->lockname.name,
2195 res->owner, new_master);
2196 spin_lock(&res->spinlock);
2197 dlm_change_lockres_owner(dlm, res, new_master);
2198 res->state &= ~DLM_LOCK_RES_RECOVERING;
2199 if (__dlm_lockres_has_locks(res))
2200 __dlm_dirty_lockres(dlm, res);
2201 spin_unlock(&res->spinlock);
2202 wake_up(&res->wq);
2203 }
2204 }
2205 }
2206
2207 static inline int dlm_lvb_needs_invalidation(struct dlm_lock *lock, int local)
2208 {
2209 if (local) {
2210 if (lock->ml.type != LKM_EXMODE &&
2211 lock->ml.type != LKM_PRMODE)
2212 return 1;
2213 } else if (lock->ml.type == LKM_EXMODE)
2214 return 1;
2215 return 0;
2216 }
2217
2218 static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
2219 struct dlm_lock_resource *res, u8 dead_node)
2220 {
2221 struct list_head *queue;
2222 struct dlm_lock *lock;
2223 int blank_lvb = 0, local = 0;
2224 int i;
2225 u8 search_node;
2226
2227 assert_spin_locked(&dlm->spinlock);
2228 assert_spin_locked(&res->spinlock);
2229
2230 if (res->owner == dlm->node_num)
2231
2232
2233 search_node = dead_node;
2234 else {
2235
2236
2237 search_node = dlm->node_num;
2238 local = 1;
2239 }
2240
2241 for (i=DLM_GRANTED_LIST; i<=DLM_CONVERTING_LIST; i++) {
2242 queue = dlm_list_idx_to_ptr(res, i);
2243 list_for_each_entry(lock, queue, list) {
2244 if (lock->ml.node == search_node) {
2245 if (dlm_lvb_needs_invalidation(lock, local)) {
2246
2247 blank_lvb = 1;
2248 memset(lock->lksb->lvb, 0, DLM_LVB_LEN);
2249 }
2250 }
2251 }
2252 }
2253
2254 if (blank_lvb) {
2255 mlog(0, "clearing %.*s lvb, dead node %u had EX\n",
2256 res->lockname.len, res->lockname.name, dead_node);
2257 memset(res->lvb, 0, DLM_LVB_LEN);
2258 }
2259 }
2260
2261 static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2262 struct dlm_lock_resource *res, u8 dead_node)
2263 {
2264 struct dlm_lock *lock, *next;
2265 unsigned int freed = 0;
2266
2267
2268
2269
2270
2271 assert_spin_locked(&dlm->spinlock);
2272 assert_spin_locked(&res->spinlock);
2273
2274
2275
2276
2277
2278 list_for_each_entry_safe(lock, next, &res->granted, list) {
2279 if (lock->ml.node == dead_node) {
2280 list_del_init(&lock->list);
2281 dlm_lock_put(lock);
2282
2283 dlm_lock_put(lock);
2284 freed++;
2285 }
2286 }
2287 list_for_each_entry_safe(lock, next, &res->converting, list) {
2288 if (lock->ml.node == dead_node) {
2289 list_del_init(&lock->list);
2290 dlm_lock_put(lock);
2291
2292 dlm_lock_put(lock);
2293 freed++;
2294 }
2295 }
2296 list_for_each_entry_safe(lock, next, &res->blocked, list) {
2297 if (lock->ml.node == dead_node) {
2298 list_del_init(&lock->list);
2299 dlm_lock_put(lock);
2300
2301 dlm_lock_put(lock);
2302 freed++;
2303 }
2304 }
2305
2306 if (freed) {
2307 mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
2308 "dropping ref from lockres\n", dlm->name,
2309 res->lockname.len, res->lockname.name, freed, dead_node);
2310 if(!test_bit(dead_node, res->refmap)) {
2311 mlog(ML_ERROR, "%s:%.*s: freed %u locks for dead node %u, "
2312 "but ref was not set\n", dlm->name,
2313 res->lockname.len, res->lockname.name, freed, dead_node);
2314 __dlm_print_one_lock_resource(res);
2315 }
2316 res->state |= DLM_LOCK_RES_RECOVERY_WAITING;
2317 dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
2318 } else if (test_bit(dead_node, res->refmap)) {
2319 mlog(0, "%s:%.*s: dead node %u had a ref, but had "
2320 "no locks and had not purged before dying\n", dlm->name,
2321 res->lockname.len, res->lockname.name, dead_node);
2322 dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
2323 }
2324
2325
2326 __dlm_dirty_lockres(dlm, res);
2327 }
2328
2329 static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
2330 {
2331 struct dlm_lock_resource *res;
2332 int i;
2333 struct hlist_head *bucket;
2334 struct hlist_node *tmp;
2335 struct dlm_lock *lock;
2336
2337
2338
2339 dlm_clean_master_list(dlm, dead_node);
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
2356 bucket = dlm_lockres_hash(dlm, i);
2357 hlist_for_each_entry_safe(res, tmp, bucket, hash_node) {
2358
2359
2360 if (dlm_is_recovery_lock(res->lockname.name,
2361 res->lockname.len)) {
2362 spin_lock(&res->spinlock);
2363 list_for_each_entry(lock, &res->granted, list) {
2364 if (lock->ml.node == dead_node) {
2365 mlog(0, "AHA! there was "
2366 "a $RECOVERY lock for dead "
2367 "node %u (%s)!\n",
2368 dead_node, dlm->name);
2369 list_del_init(&lock->list);
2370 dlm_lock_put(lock);
2371
2372
2373
2374 dlm_lock_put(lock);
2375 break;
2376 }
2377 }
2378
2379 if ((res->owner == dead_node) &&
2380 (res->state & DLM_LOCK_RES_DROPPING_REF)) {
2381 dlm_lockres_get(res);
2382 __dlm_do_purge_lockres(dlm, res);
2383 spin_unlock(&res->spinlock);
2384 wake_up(&res->wq);
2385 dlm_lockres_put(res);
2386 continue;
2387 } else if (res->owner == dlm->node_num)
2388 dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
2389 spin_unlock(&res->spinlock);
2390 continue;
2391 }
2392 spin_lock(&res->spinlock);
2393
2394 dlm_revalidate_lvb(dlm, res, dead_node);
2395 if (res->owner == dead_node) {
2396 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
2397 mlog(0, "%s:%.*s: owned by "
2398 "dead node %u, this node was "
2399 "dropping its ref when master died. "
2400 "continue, purging the lockres.\n",
2401 dlm->name, res->lockname.len,
2402 res->lockname.name, dead_node);
2403 dlm_lockres_get(res);
2404 __dlm_do_purge_lockres(dlm, res);
2405 spin_unlock(&res->spinlock);
2406 wake_up(&res->wq);
2407 dlm_lockres_put(res);
2408 continue;
2409 }
2410 dlm_move_lockres_to_recovery_list(dlm, res);
2411 } else if (res->owner == dlm->node_num) {
2412 dlm_free_dead_locks(dlm, res, dead_node);
2413 __dlm_lockres_calc_usage(dlm, res);
2414 } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
2415 if (test_bit(dead_node, res->refmap)) {
2416 mlog(0, "%s:%.*s: dead node %u had a ref, but had "
2417 "no locks and had not purged before dying\n",
2418 dlm->name, res->lockname.len,
2419 res->lockname.name, dead_node);
2420 dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
2421 }
2422 }
2423 spin_unlock(&res->spinlock);
2424 }
2425 }
2426
2427 }
2428
2429 static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx)
2430 {
2431 assert_spin_locked(&dlm->spinlock);
2432
2433 if (dlm->reco.new_master == idx) {
2434 mlog(0, "%s: recovery master %d just died\n",
2435 dlm->name, idx);
2436 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
2437
2438
2439
2440 mlog(0, "%s: dead master %d had reached "
2441 "finalize1 state, clearing\n", dlm->name, idx);
2442 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
2443 __dlm_reset_recovery(dlm);
2444 }
2445 }
2446
2447
2448 if (dlm->joining_node == idx) {
2449 mlog(0, "Clearing join state for node %u\n", idx);
2450 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
2451 }
2452
2453
2454 if (!test_bit(idx, dlm->live_nodes_map)) {
2455 mlog(0, "for domain %s, node %d is already dead. "
2456 "another node likely did recovery already.\n",
2457 dlm->name, idx);
2458 return;
2459 }
2460
2461
2462 if (!test_bit(idx, dlm->domain_map)) {
2463
2464
2465 mlog(0, "node %u already removed from domain!\n", idx);
2466 return;
2467 }
2468
2469 clear_bit(idx, dlm->live_nodes_map);
2470
2471
2472 if (!test_bit(idx, dlm->recovery_map))
2473 dlm_do_local_recovery_cleanup(dlm, idx);
2474
2475
2476 dlm_hb_event_notify_attached(dlm, idx, 0);
2477
2478 mlog(0, "node %u being removed from domain map!\n", idx);
2479 clear_bit(idx, dlm->domain_map);
2480 clear_bit(idx, dlm->exit_domain_map);
2481
2482
2483 wake_up(&dlm->migration_wq);
2484
2485 set_bit(idx, dlm->recovery_map);
2486 }
2487
2488 void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data)
2489 {
2490 struct dlm_ctxt *dlm = data;
2491
2492 if (!dlm_grab(dlm))
2493 return;
2494
2495
2496
2497
2498
2499 if (test_bit(idx, dlm->domain_map))
2500 dlm_fire_domain_eviction_callbacks(dlm, idx);
2501
2502 spin_lock(&dlm->spinlock);
2503 __dlm_hb_node_down(dlm, idx);
2504 spin_unlock(&dlm->spinlock);
2505
2506 dlm_put(dlm);
2507 }
2508
2509 void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data)
2510 {
2511 struct dlm_ctxt *dlm = data;
2512
2513 if (!dlm_grab(dlm))
2514 return;
2515
2516 spin_lock(&dlm->spinlock);
2517 set_bit(idx, dlm->live_nodes_map);
2518
2519
2520 spin_unlock(&dlm->spinlock);
2521
2522 dlm_put(dlm);
2523 }
2524
2525 static void dlm_reco_ast(void *astdata)
2526 {
2527 struct dlm_ctxt *dlm = astdata;
2528 mlog(0, "ast for recovery lock fired!, this=%u, dlm=%s\n",
2529 dlm->node_num, dlm->name);
2530 }
2531 static void dlm_reco_bast(void *astdata, int blocked_type)
2532 {
2533 struct dlm_ctxt *dlm = astdata;
2534 mlog(0, "bast for recovery lock fired!, this=%u, dlm=%s\n",
2535 dlm->node_num, dlm->name);
2536 }
2537 static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st)
2538 {
2539 mlog(0, "unlockast for recovery lock fired!\n");
2540 }
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
2555 {
2556 enum dlm_status ret;
2557 struct dlm_lockstatus lksb;
2558 int status = -EINVAL;
2559
2560 mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n",
2561 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num);
2562 again:
2563 memset(&lksb, 0, sizeof(lksb));
2564
2565 ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
2566 DLM_RECOVERY_LOCK_NAME, DLM_RECOVERY_LOCK_NAME_LEN,
2567 dlm_reco_ast, dlm, dlm_reco_bast);
2568
2569 mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
2570 dlm->name, ret, lksb.status);
2571
2572 if (ret == DLM_NORMAL) {
2573 mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n",
2574 dlm->name, dlm->node_num);
2575
2576
2577
2578 if (dlm_reco_master_ready(dlm)) {
2579 mlog(0, "%s: got reco EX lock, but %u will "
2580 "do the recovery\n", dlm->name,
2581 dlm->reco.new_master);
2582 status = -EEXIST;
2583 } else {
2584 status = 0;
2585
2586
2587 spin_lock(&dlm->spinlock);
2588 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
2589 status = -EINVAL;
2590 mlog(0, "%s: got reco EX lock, but "
2591 "node got recovered already\n", dlm->name);
2592 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
2593 mlog(ML_ERROR, "%s: new master is %u "
2594 "but no dead node!\n",
2595 dlm->name, dlm->reco.new_master);
2596 BUG();
2597 }
2598 }
2599 spin_unlock(&dlm->spinlock);
2600 }
2601
2602
2603
2604 if (!status) {
2605 mlog(0, "%s: dead=%u, this=%u, sending "
2606 "begin_reco now\n", dlm->name,
2607 dlm->reco.dead_node, dlm->node_num);
2608 status = dlm_send_begin_reco_message(dlm,
2609 dlm->reco.dead_node);
2610
2611 BUG_ON(status);
2612
2613
2614 spin_lock(&dlm->spinlock);
2615 dlm_set_reco_master(dlm, dlm->node_num);
2616 spin_unlock(&dlm->spinlock);
2617 }
2618
2619
2620
2621 ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm);
2622 if (ret == DLM_DENIED) {
2623 mlog(0, "got DLM_DENIED, trying LKM_CANCEL\n");
2624 ret = dlmunlock(dlm, &lksb, LKM_CANCEL, dlm_reco_unlock_ast, dlm);
2625 }
2626 if (ret != DLM_NORMAL) {
2627
2628
2629
2630
2631
2632
2633 mlog(ML_ERROR, "dlmunlock returned %d\n", ret);
2634 }
2635 } else if (ret == DLM_NOTQUEUED) {
2636 mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n",
2637 dlm->name, dlm->node_num);
2638
2639
2640
2641 wait_event_timeout(dlm->dlm_reco_thread_wq,
2642 dlm_reco_master_ready(dlm),
2643 msecs_to_jiffies(1000));
2644 if (!dlm_reco_master_ready(dlm)) {
2645 mlog(0, "%s: reco master taking awhile\n",
2646 dlm->name);
2647 goto again;
2648 }
2649
2650 mlog(0, "%s: reco master %u is ready to recover %u\n",
2651 dlm->name, dlm->reco.new_master, dlm->reco.dead_node);
2652 status = -EEXIST;
2653 } else if (ret == DLM_RECOVERING) {
2654 mlog(0, "dlm=%s dlmlock says master node died (this=%u)\n",
2655 dlm->name, dlm->node_num);
2656 goto again;
2657 } else {
2658 struct dlm_lock_resource *res;
2659
2660
2661 mlog(ML_ERROR, "%s: got %s from dlmlock($RECOVERY), "
2662 "lksb.status=%s\n", dlm->name, dlm_errname(ret),
2663 dlm_errname(lksb.status));
2664 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME,
2665 DLM_RECOVERY_LOCK_NAME_LEN);
2666 if (res) {
2667 dlm_print_one_lock_resource(res);
2668 dlm_lockres_put(res);
2669 } else {
2670 mlog(ML_ERROR, "recovery lock not found\n");
2671 }
2672 BUG();
2673 }
2674
2675 return status;
2676 }
2677
2678 static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
2679 {
2680 struct dlm_begin_reco br;
2681 int ret = 0;
2682 struct dlm_node_iter iter;
2683 int nodenum;
2684 int status;
2685
2686 mlog(0, "%s: dead node is %u\n", dlm->name, dead_node);
2687
2688 spin_lock(&dlm->spinlock);
2689 dlm_node_iter_init(dlm->domain_map, &iter);
2690 spin_unlock(&dlm->spinlock);
2691
2692 clear_bit(dead_node, iter.node_map);
2693
2694 memset(&br, 0, sizeof(br));
2695 br.node_idx = dlm->node_num;
2696 br.dead_node = dead_node;
2697
2698 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2699 ret = 0;
2700 if (nodenum == dead_node) {
2701 mlog(0, "not sending begin reco to dead node "
2702 "%u\n", dead_node);
2703 continue;
2704 }
2705 if (nodenum == dlm->node_num) {
2706 mlog(0, "not sending begin reco to self\n");
2707 continue;
2708 }
2709 retry:
2710 ret = -EINVAL;
2711 mlog(0, "attempting to send begin reco msg to %d\n",
2712 nodenum);
2713 ret = o2net_send_message(DLM_BEGIN_RECO_MSG, dlm->key,
2714 &br, sizeof(br), nodenum, &status);
2715
2716 if (ret >= 0)
2717 ret = status;
2718 if (dlm_is_host_down(ret)) {
2719
2720
2721 mlog(ML_NOTICE, "%s: node %u was down when sending "
2722 "begin reco msg (%d)\n", dlm->name, nodenum, ret);
2723 ret = 0;
2724 }
2725
2726
2727
2728
2729
2730
2731 if (ret == -EAGAIN || ret == EAGAIN) {
2732 mlog(0, "%s: trying to start recovery of node "
2733 "%u, but node %u is waiting for last recovery "
2734 "to complete, backoff for a bit\n", dlm->name,
2735 dead_node, nodenum);
2736 msleep(100);
2737 goto retry;
2738 }
2739 if (ret < 0) {
2740 struct dlm_lock_resource *res;
2741
2742
2743
2744 mlog_errno(ret);
2745 mlog(ML_ERROR, "begin reco of dlm %s to node %u "
2746 "returned %d\n", dlm->name, nodenum, ret);
2747 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME,
2748 DLM_RECOVERY_LOCK_NAME_LEN);
2749 if (res) {
2750 dlm_print_one_lock_resource(res);
2751 dlm_lockres_put(res);
2752 } else {
2753 mlog(ML_ERROR, "recovery lock not found\n");
2754 }
2755
2756
2757 msleep(100);
2758 goto retry;
2759 }
2760 }
2761
2762 return ret;
2763 }
2764
2765 int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
2766 void **ret_data)
2767 {
2768 struct dlm_ctxt *dlm = data;
2769 struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf;
2770
2771
2772 if (!dlm_grab(dlm))
2773 return 0;
2774
2775 spin_lock(&dlm->spinlock);
2776 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
2777 mlog(0, "%s: node %u wants to recover node %u (%u:%u) "
2778 "but this node is in finalize state, waiting on finalize2\n",
2779 dlm->name, br->node_idx, br->dead_node,
2780 dlm->reco.dead_node, dlm->reco.new_master);
2781 spin_unlock(&dlm->spinlock);
2782 dlm_put(dlm);
2783 return -EAGAIN;
2784 }
2785 spin_unlock(&dlm->spinlock);
2786
2787 mlog(0, "%s: node %u wants to recover node %u (%u:%u)\n",
2788 dlm->name, br->node_idx, br->dead_node,
2789 dlm->reco.dead_node, dlm->reco.new_master);
2790
2791 dlm_fire_domain_eviction_callbacks(dlm, br->dead_node);
2792
2793 spin_lock(&dlm->spinlock);
2794 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
2795 if (test_bit(dlm->reco.new_master, dlm->recovery_map)) {
2796 mlog(0, "%s: new_master %u died, changing "
2797 "to %u\n", dlm->name, dlm->reco.new_master,
2798 br->node_idx);
2799 } else {
2800 mlog(0, "%s: new_master %u NOT DEAD, changing "
2801 "to %u\n", dlm->name, dlm->reco.new_master,
2802 br->node_idx);
2803
2804 }
2805 }
2806 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) {
2807 mlog(ML_NOTICE, "%s: dead_node previously set to %u, "
2808 "node %u changing it to %u\n", dlm->name,
2809 dlm->reco.dead_node, br->node_idx, br->dead_node);
2810 }
2811 dlm_set_reco_master(dlm, br->node_idx);
2812 dlm_set_reco_dead_node(dlm, br->dead_node);
2813 if (!test_bit(br->dead_node, dlm->recovery_map)) {
2814 mlog(0, "recovery master %u sees %u as dead, but this "
2815 "node has not yet. marking %u as dead\n",
2816 br->node_idx, br->dead_node, br->dead_node);
2817 if (!test_bit(br->dead_node, dlm->domain_map) ||
2818 !test_bit(br->dead_node, dlm->live_nodes_map))
2819 mlog(0, "%u not in domain/live_nodes map "
2820 "so setting it in reco map manually\n",
2821 br->dead_node);
2822
2823
2824 set_bit(br->dead_node, dlm->domain_map);
2825 set_bit(br->dead_node, dlm->live_nodes_map);
2826 __dlm_hb_node_down(dlm, br->dead_node);
2827 }
2828 spin_unlock(&dlm->spinlock);
2829
2830 dlm_kick_recovery_thread(dlm);
2831
2832 mlog(0, "%s: recovery started by node %u, for %u (%u:%u)\n",
2833 dlm->name, br->node_idx, br->dead_node,
2834 dlm->reco.dead_node, dlm->reco.new_master);
2835
2836 dlm_put(dlm);
2837 return 0;
2838 }
2839
2840 #define DLM_FINALIZE_STAGE2 0x01
2841 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
2842 {
2843 int ret = 0;
2844 struct dlm_finalize_reco fr;
2845 struct dlm_node_iter iter;
2846 int nodenum;
2847 int status;
2848 int stage = 1;
2849
2850 mlog(0, "finishing recovery for node %s:%u, "
2851 "stage %d\n", dlm->name, dlm->reco.dead_node, stage);
2852
2853 spin_lock(&dlm->spinlock);
2854 dlm_node_iter_init(dlm->domain_map, &iter);
2855 spin_unlock(&dlm->spinlock);
2856
2857 stage2:
2858 memset(&fr, 0, sizeof(fr));
2859 fr.node_idx = dlm->node_num;
2860 fr.dead_node = dlm->reco.dead_node;
2861 if (stage == 2)
2862 fr.flags |= DLM_FINALIZE_STAGE2;
2863
2864 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2865 if (nodenum == dlm->node_num)
2866 continue;
2867 ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key,
2868 &fr, sizeof(fr), nodenum, &status);
2869 if (ret >= 0)
2870 ret = status;
2871 if (ret < 0) {
2872 mlog(ML_ERROR, "Error %d when sending message %u (key "
2873 "0x%x) to node %u\n", ret, DLM_FINALIZE_RECO_MSG,
2874 dlm->key, nodenum);
2875 if (dlm_is_host_down(ret)) {
2876
2877
2878
2879 mlog(ML_ERROR, "node %u went down after this "
2880 "node finished recovery.\n", nodenum);
2881 ret = 0;
2882 continue;
2883 }
2884 break;
2885 }
2886 }
2887 if (stage == 1) {
2888
2889 iter.curnode = -1;
2890 stage = 2;
2891 goto stage2;
2892 }
2893
2894 return ret;
2895 }
2896
2897 int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
2898 void **ret_data)
2899 {
2900 struct dlm_ctxt *dlm = data;
2901 struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;
2902 int stage = 1;
2903
2904
2905 if (!dlm_grab(dlm))
2906 return 0;
2907
2908 if (fr->flags & DLM_FINALIZE_STAGE2)
2909 stage = 2;
2910
2911 mlog(0, "%s: node %u finalizing recovery stage%d of "
2912 "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage,
2913 fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master);
2914
2915 spin_lock(&dlm->spinlock);
2916
2917 if (dlm->reco.new_master != fr->node_idx) {
2918 mlog(ML_ERROR, "node %u sent recovery finalize msg, but node "
2919 "%u is supposed to be the new master, dead=%u\n",
2920 fr->node_idx, dlm->reco.new_master, fr->dead_node);
2921 BUG();
2922 }
2923 if (dlm->reco.dead_node != fr->dead_node) {
2924 mlog(ML_ERROR, "node %u sent recovery finalize msg for dead "
2925 "node %u, but node %u is supposed to be dead\n",
2926 fr->node_idx, fr->dead_node, dlm->reco.dead_node);
2927 BUG();
2928 }
2929
2930 switch (stage) {
2931 case 1:
2932 dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
2933 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
2934 mlog(ML_ERROR, "%s: received finalize1 from "
2935 "new master %u for dead node %u, but "
2936 "this node has already received it!\n",
2937 dlm->name, fr->node_idx, fr->dead_node);
2938 dlm_print_reco_node_status(dlm);
2939 BUG();
2940 }
2941 dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
2942 spin_unlock(&dlm->spinlock);
2943 break;
2944 case 2:
2945 if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) {
2946 mlog(ML_ERROR, "%s: received finalize2 from "
2947 "new master %u for dead node %u, but "
2948 "this node did not have finalize1!\n",
2949 dlm->name, fr->node_idx, fr->dead_node);
2950 dlm_print_reco_node_status(dlm);
2951 BUG();
2952 }
2953 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
2954 __dlm_reset_recovery(dlm);
2955 spin_unlock(&dlm->spinlock);
2956 dlm_kick_recovery_thread(dlm);
2957 break;
2958 }
2959
2960 mlog(0, "%s: recovery done, reco master was %u, dead now %u, master now %u\n",
2961 dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master);
2962
2963 dlm_put(dlm);
2964 return 0;
2965 }