root/fs/ocfs2/cluster/heartbeat.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. o2hb_dead_threshold_set
  2. o2hb_global_heartbeat_mode_set
  3. o2hb_write_timeout
  4. o2hb_arm_timeout
  5. o2hb_disarm_timeout
  6. o2hb_send_nego_msg
  7. o2hb_nego_timeout
  8. o2hb_nego_timeout_handler
  9. o2hb_nego_approve_handler
  10. o2hb_bio_wait_init
  11. o2hb_bio_wait_dec
  12. o2hb_wait_on_io
  13. o2hb_bio_end_io
  14. o2hb_setup_one_bio
  15. o2hb_read_slots
  16. o2hb_issue_node_write
  17. o2hb_compute_block_crc_le
  18. o2hb_dump_slot
  19. o2hb_verify_crc
  20. o2hb_check_own_slot
  21. o2hb_prepare_block
  22. o2hb_fire_callbacks
  23. o2hb_run_event_list
  24. o2hb_queue_node_event
  25. o2hb_shutdown_slot
  26. o2hb_set_quorum_device
  27. o2hb_check_slot
  28. o2hb_highest_node
  29. o2hb_lowest_node
  30. o2hb_do_disk_heartbeat
  31. o2hb_thread
  32. o2hb_debug_open
  33. o2hb_debug_release
  34. o2hb_debug_read
  35. o2hb_debug_open
  36. o2hb_debug_release
  37. o2hb_debug_read
  38. o2hb_exit
  39. o2hb_debug_create
  40. o2hb_debug_init
  41. o2hb_init
  42. o2hb_fill_node_map_from_callback
  43. o2hb_fill_node_map
  44. to_o2hb_region
  45. o2hb_region_release
  46. o2hb_read_block_input
  47. o2hb_region_block_bytes_show
  48. o2hb_region_block_bytes_store
  49. o2hb_region_start_block_show
  50. o2hb_region_start_block_store
  51. o2hb_region_blocks_show
  52. o2hb_region_blocks_store
  53. o2hb_region_dev_show
  54. o2hb_init_region_params
  55. o2hb_map_slot_data
  56. o2hb_populate_slot_data
  57. o2hb_region_dev_store
  58. o2hb_region_pid_show
  59. to_o2hb_heartbeat_group
  60. o2hb_debug_region_init
  61. o2hb_heartbeat_group_make_item
  62. o2hb_heartbeat_group_drop_item
  63. o2hb_heartbeat_group_dead_threshold_show
  64. o2hb_heartbeat_group_dead_threshold_store
  65. o2hb_heartbeat_group_mode_show
  66. o2hb_heartbeat_group_mode_store
  67. o2hb_alloc_hb_set
  68. o2hb_free_hb_set
  69. hbcall_from_type
  70. o2hb_setup_callback
  71. o2hb_region_pin
  72. o2hb_region_unpin
  73. o2hb_region_inc_user
  74. o2hb_region_dec_user
  75. o2hb_register_callback
  76. o2hb_unregister_callback
  77. o2hb_check_node_heartbeating_no_sem
  78. o2hb_check_node_heartbeating_from_callback
  79. o2hb_stop_all_regions
  80. o2hb_get_all_regions
  81. o2hb_global_heartbeat_active

   1 // SPDX-License-Identifier: GPL-2.0-or-later
   2 /* -*- mode: c; c-basic-offset: 8; -*-
   3  * vim: noexpandtab sw=8 ts=8 sts=0:
   4  *
   5  * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
   6  */
   7 
   8 #include <linux/kernel.h>
   9 #include <linux/sched.h>
  10 #include <linux/jiffies.h>
  11 #include <linux/module.h>
  12 #include <linux/fs.h>
  13 #include <linux/bio.h>
  14 #include <linux/blkdev.h>
  15 #include <linux/delay.h>
  16 #include <linux/file.h>
  17 #include <linux/kthread.h>
  18 #include <linux/configfs.h>
  19 #include <linux/random.h>
  20 #include <linux/crc32.h>
  21 #include <linux/time.h>
  22 #include <linux/debugfs.h>
  23 #include <linux/slab.h>
  24 #include <linux/bitmap.h>
  25 #include <linux/ktime.h>
  26 #include "heartbeat.h"
  27 #include "tcp.h"
  28 #include "nodemanager.h"
  29 #include "quorum.h"
  30 
  31 #include "masklog.h"
  32 
  33 
  34 /*
  35  * The first heartbeat pass had one global thread that would serialize all hb
  36  * callback calls.  This global serializing sem should only be removed once
  37  * we've made sure that all callees can deal with being called concurrently
  38  * from multiple hb region threads.
  39  */
  40 static DECLARE_RWSEM(o2hb_callback_sem);
  41 
  42 /*
  43  * multiple hb threads are watching multiple regions.  A node is live
  44  * whenever any of the threads sees activity from the node in its region.
  45  */
  46 static DEFINE_SPINLOCK(o2hb_live_lock);
  47 static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
  48 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
  49 static LIST_HEAD(o2hb_node_events);
  50 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
  51 
  52 /*
  53  * In global heartbeat, we maintain a series of region bitmaps.
  54  *      - o2hb_region_bitmap allows us to limit the region number to max region.
  55  *      - o2hb_live_region_bitmap tracks live regions (seen steady iterations).
  56  *      - o2hb_quorum_region_bitmap tracks live regions that have seen all nodes
  57  *              heartbeat on it.
  58  *      - o2hb_failed_region_bitmap tracks the regions that have seen io timeouts.
  59  */
  60 static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
  61 static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
  62 static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
  63 static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
  64 
  65 #define O2HB_DB_TYPE_LIVENODES          0
  66 #define O2HB_DB_TYPE_LIVEREGIONS        1
  67 #define O2HB_DB_TYPE_QUORUMREGIONS      2
  68 #define O2HB_DB_TYPE_FAILEDREGIONS      3
  69 #define O2HB_DB_TYPE_REGION_LIVENODES   4
  70 #define O2HB_DB_TYPE_REGION_NUMBER      5
  71 #define O2HB_DB_TYPE_REGION_ELAPSED_TIME        6
  72 #define O2HB_DB_TYPE_REGION_PINNED      7
  73 struct o2hb_debug_buf {
  74         int db_type;
  75         int db_size;
  76         int db_len;
  77         void *db_data;
  78 };
  79 
  80 static struct o2hb_debug_buf *o2hb_db_livenodes;
  81 static struct o2hb_debug_buf *o2hb_db_liveregions;
  82 static struct o2hb_debug_buf *o2hb_db_quorumregions;
  83 static struct o2hb_debug_buf *o2hb_db_failedregions;
  84 
  85 #define O2HB_DEBUG_DIR                  "o2hb"
  86 #define O2HB_DEBUG_LIVENODES            "livenodes"
  87 #define O2HB_DEBUG_LIVEREGIONS          "live_regions"
  88 #define O2HB_DEBUG_QUORUMREGIONS        "quorum_regions"
  89 #define O2HB_DEBUG_FAILEDREGIONS        "failed_regions"
  90 #define O2HB_DEBUG_REGION_NUMBER        "num"
  91 #define O2HB_DEBUG_REGION_ELAPSED_TIME  "elapsed_time_in_ms"
  92 #define O2HB_DEBUG_REGION_PINNED        "pinned"
  93 
  94 static struct dentry *o2hb_debug_dir;
  95 
  96 static LIST_HEAD(o2hb_all_regions);
  97 
  98 static struct o2hb_callback {
  99         struct list_head list;
 100 } o2hb_callbacks[O2HB_NUM_CB];
 101 
 102 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
 103 
 104 #define O2HB_DEFAULT_BLOCK_BITS       9
 105 
 106 enum o2hb_heartbeat_modes {
 107         O2HB_HEARTBEAT_LOCAL            = 0,
 108         O2HB_HEARTBEAT_GLOBAL,
 109         O2HB_HEARTBEAT_NUM_MODES,
 110 };
 111 
 112 static const char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
 113         "local",        /* O2HB_HEARTBEAT_LOCAL */
 114         "global",       /* O2HB_HEARTBEAT_GLOBAL */
 115 };
 116 
 117 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
 118 static unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
 119 
 120 /*
 121  * o2hb_dependent_users tracks the number of registered callbacks that depend
 122  * on heartbeat. o2net and o2dlm are two entities that register this callback.
 123  * However only o2dlm depends on the heartbeat. It does not want the heartbeat
 124  * to stop while a dlm domain is still active.
 125  */
 126 static unsigned int o2hb_dependent_users;
 127 
 128 /*
 129  * In global heartbeat mode, all regions are pinned if there are one or more
 130  * dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All
 131  * regions are unpinned if the region count exceeds the cut off or the number
 132  * of dependent users falls to zero.
 133  */
 134 #define O2HB_PIN_CUT_OFF                3
 135 
 136 /*
 137  * In local heartbeat mode, we assume the dlm domain name to be the same as
 138  * region uuid. This is true for domains created for the file system but not
 139  * necessarily true for userdlm domains. This is a known limitation.
 140  *
 141  * In global heartbeat mode, we pin/unpin all o2hb regions. This solution
 142  * works for both file system and userdlm domains.
 143  */
 144 static int o2hb_region_pin(const char *region_uuid);
 145 static void o2hb_region_unpin(const char *region_uuid);
 146 
 147 /* Only sets a new threshold if there are no active regions.
 148  *
 149  * No locking or otherwise interesting code is required for reading
 150  * o2hb_dead_threshold as it can't change once regions are active and
 151  * it's not interesting to anyone until then anyway. */
 152 static void o2hb_dead_threshold_set(unsigned int threshold)
 153 {
 154         if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
 155                 spin_lock(&o2hb_live_lock);
 156                 if (list_empty(&o2hb_all_regions))
 157                         o2hb_dead_threshold = threshold;
 158                 spin_unlock(&o2hb_live_lock);
 159         }
 160 }
 161 
 162 static int o2hb_global_heartbeat_mode_set(unsigned int hb_mode)
 163 {
 164         int ret = -1;
 165 
 166         if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) {
 167                 spin_lock(&o2hb_live_lock);
 168                 if (list_empty(&o2hb_all_regions)) {
 169                         o2hb_heartbeat_mode = hb_mode;
 170                         ret = 0;
 171                 }
 172                 spin_unlock(&o2hb_live_lock);
 173         }
 174 
 175         return ret;
 176 }
 177 
 178 struct o2hb_node_event {
 179         struct list_head        hn_item;
 180         enum o2hb_callback_type hn_event_type;
 181         struct o2nm_node        *hn_node;
 182         int                     hn_node_num;
 183 };
 184 
 185 struct o2hb_disk_slot {
 186         struct o2hb_disk_heartbeat_block *ds_raw_block;
 187         u8                      ds_node_num;
 188         u64                     ds_last_time;
 189         u64                     ds_last_generation;
 190         u16                     ds_equal_samples;
 191         u16                     ds_changed_samples;
 192         struct list_head        ds_live_item;
 193 };
 194 
 195 /* each thread owns a region.. when we're asked to tear down the region
 196  * we ask the thread to stop, who cleans up the region */
 197 struct o2hb_region {
 198         struct config_item      hr_item;
 199 
 200         struct list_head        hr_all_item;
 201         unsigned                hr_unclean_stop:1,
 202                                 hr_aborted_start:1,
 203                                 hr_item_pinned:1,
 204                                 hr_item_dropped:1,
 205                                 hr_node_deleted:1;
 206 
 207         /* protected by the hr_callback_sem */
 208         struct task_struct      *hr_task;
 209 
 210         unsigned int            hr_blocks;
 211         unsigned long long      hr_start_block;
 212 
 213         unsigned int            hr_block_bits;
 214         unsigned int            hr_block_bytes;
 215 
 216         unsigned int            hr_slots_per_page;
 217         unsigned int            hr_num_pages;
 218 
 219         struct page             **hr_slot_data;
 220         struct block_device     *hr_bdev;
 221         struct o2hb_disk_slot   *hr_slots;
 222 
 223         /* live node map of this region */
 224         unsigned long           hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 225         unsigned int            hr_region_num;
 226 
 227         struct dentry           *hr_debug_dir;
 228         struct o2hb_debug_buf   *hr_db_livenodes;
 229         struct o2hb_debug_buf   *hr_db_regnum;
 230         struct o2hb_debug_buf   *hr_db_elapsed_time;
 231         struct o2hb_debug_buf   *hr_db_pinned;
 232 
 233         /* let the person setting up hb wait for it to return until it
 234          * has reached a 'steady' state.  This will be fixed when we have
 235          * a more complete api that doesn't lead to this sort of fragility. */
 236         atomic_t                hr_steady_iterations;
 237 
 238         /* terminate o2hb thread if it does not reach steady state
 239          * (hr_steady_iterations == 0) within hr_unsteady_iterations */
 240         atomic_t                hr_unsteady_iterations;
 241 
 242         char                    hr_dev_name[BDEVNAME_SIZE];
 243 
 244         unsigned int            hr_timeout_ms;
 245 
 246         /* randomized as the region goes up and down so that a node
 247          * recognizes a node going up and down in one iteration */
 248         u64                     hr_generation;
 249 
 250         struct delayed_work     hr_write_timeout_work;
 251         unsigned long           hr_last_timeout_start;
 252 
 253         /* negotiate timer, used to negotiate extending hb timeout. */
 254         struct delayed_work     hr_nego_timeout_work;
 255         unsigned long           hr_nego_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 256 
 257         /* Used during o2hb_check_slot to hold a copy of the block
 258          * being checked because we temporarily have to zero out the
 259          * crc field. */
 260         struct o2hb_disk_heartbeat_block *hr_tmp_block;
 261 
 262         /* Message key for negotiate timeout message. */
 263         unsigned int            hr_key;
 264         struct list_head        hr_handler_list;
 265 
 266         /* last hb status, 0 for success, other value for error. */
 267         int                     hr_last_hb_status;
 268 };
 269 
 270 struct o2hb_bio_wait_ctxt {
 271         atomic_t          wc_num_reqs;
 272         struct completion wc_io_complete;
 273         int               wc_error;
 274 };
 275 
 276 #define O2HB_NEGO_TIMEOUT_MS (O2HB_MAX_WRITE_TIMEOUT_MS/2)
 277 
 278 enum {
 279         O2HB_NEGO_TIMEOUT_MSG = 1,
 280         O2HB_NEGO_APPROVE_MSG = 2,
 281 };
 282 
 283 struct o2hb_nego_msg {
 284         u8 node_num;
 285 };
 286 
 287 static void o2hb_write_timeout(struct work_struct *work)
 288 {
 289         int failed, quorum;
 290         struct o2hb_region *reg =
 291                 container_of(work, struct o2hb_region,
 292                              hr_write_timeout_work.work);
 293 
 294         mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
 295              "milliseconds\n", reg->hr_dev_name,
 296              jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
 297 
 298         if (o2hb_global_heartbeat_active()) {
 299                 spin_lock(&o2hb_live_lock);
 300                 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
 301                         set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
 302                 failed = bitmap_weight(o2hb_failed_region_bitmap,
 303                                         O2NM_MAX_REGIONS);
 304                 quorum = bitmap_weight(o2hb_quorum_region_bitmap,
 305                                         O2NM_MAX_REGIONS);
 306                 spin_unlock(&o2hb_live_lock);
 307 
 308                 mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n",
 309                      quorum, failed);
 310 
 311                 /*
 312                  * Fence if the number of failed regions >= half the number
 313                  * of  quorum regions
 314                  */
 315                 if ((failed << 1) < quorum)
 316                         return;
 317         }
 318 
 319         o2quo_disk_timeout();
 320 }
 321 
 322 static void o2hb_arm_timeout(struct o2hb_region *reg)
 323 {
 324         /* Arm writeout only after thread reaches steady state */
 325         if (atomic_read(&reg->hr_steady_iterations) != 0)
 326                 return;
 327 
 328         mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
 329              O2HB_MAX_WRITE_TIMEOUT_MS);
 330 
 331         if (o2hb_global_heartbeat_active()) {
 332                 spin_lock(&o2hb_live_lock);
 333                 clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
 334                 spin_unlock(&o2hb_live_lock);
 335         }
 336         cancel_delayed_work(&reg->hr_write_timeout_work);
 337         schedule_delayed_work(&reg->hr_write_timeout_work,
 338                               msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
 339 
 340         cancel_delayed_work(&reg->hr_nego_timeout_work);
 341         /* negotiate timeout must be less than write timeout. */
 342         schedule_delayed_work(&reg->hr_nego_timeout_work,
 343                               msecs_to_jiffies(O2HB_NEGO_TIMEOUT_MS));
 344         memset(reg->hr_nego_node_bitmap, 0, sizeof(reg->hr_nego_node_bitmap));
 345 }
 346 
 347 static void o2hb_disarm_timeout(struct o2hb_region *reg)
 348 {
 349         cancel_delayed_work_sync(&reg->hr_write_timeout_work);
 350         cancel_delayed_work_sync(&reg->hr_nego_timeout_work);
 351 }
 352 
 353 static int o2hb_send_nego_msg(int key, int type, u8 target)
 354 {
 355         struct o2hb_nego_msg msg;
 356         int status, ret;
 357 
 358         msg.node_num = o2nm_this_node();
 359 again:
 360         ret = o2net_send_message(type, key, &msg, sizeof(msg),
 361                         target, &status);
 362 
 363         if (ret == -EAGAIN || ret == -ENOMEM) {
 364                 msleep(100);
 365                 goto again;
 366         }
 367 
 368         return ret;
 369 }
 370 
 371 static void o2hb_nego_timeout(struct work_struct *work)
 372 {
 373         unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 374         int master_node, i, ret;
 375         struct o2hb_region *reg;
 376 
 377         reg = container_of(work, struct o2hb_region, hr_nego_timeout_work.work);
 378         /* don't negotiate timeout if last hb failed since it is very
 379          * possible io failed. Should let write timeout fence self.
 380          */
 381         if (reg->hr_last_hb_status)
 382                 return;
 383 
 384         o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
 385         /* lowest node as master node to make negotiate decision. */
 386         master_node = find_next_bit(live_node_bitmap, O2NM_MAX_NODES, 0);
 387 
 388         if (master_node == o2nm_this_node()) {
 389                 if (!test_bit(master_node, reg->hr_nego_node_bitmap)) {
 390                         printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s).\n",
 391                                 o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000,
 392                                 config_item_name(&reg->hr_item), reg->hr_dev_name);
 393                         set_bit(master_node, reg->hr_nego_node_bitmap);
 394                 }
 395                 if (memcmp(reg->hr_nego_node_bitmap, live_node_bitmap,
 396                                 sizeof(reg->hr_nego_node_bitmap))) {
 397                         /* check negotiate bitmap every second to do timeout
 398                          * approve decision.
 399                          */
 400                         schedule_delayed_work(&reg->hr_nego_timeout_work,
 401                                 msecs_to_jiffies(1000));
 402 
 403                         return;
 404                 }
 405 
 406                 printk(KERN_NOTICE "o2hb: all nodes hb write hung, maybe region %s (%s) is down.\n",
 407                         config_item_name(&reg->hr_item), reg->hr_dev_name);
 408                 /* approve negotiate timeout request. */
 409                 o2hb_arm_timeout(reg);
 410 
 411                 i = -1;
 412                 while ((i = find_next_bit(live_node_bitmap,
 413                                 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
 414                         if (i == master_node)
 415                                 continue;
 416 
 417                         mlog(ML_HEARTBEAT, "send NEGO_APPROVE msg to node %d\n", i);
 418                         ret = o2hb_send_nego_msg(reg->hr_key,
 419                                         O2HB_NEGO_APPROVE_MSG, i);
 420                         if (ret)
 421                                 mlog(ML_ERROR, "send NEGO_APPROVE msg to node %d fail %d\n",
 422                                         i, ret);
 423                 }
 424         } else {
 425                 /* negotiate timeout with master node. */
 426                 printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s), negotiate timeout with node %d.\n",
 427                         o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, config_item_name(&reg->hr_item),
 428                         reg->hr_dev_name, master_node);
 429                 ret = o2hb_send_nego_msg(reg->hr_key, O2HB_NEGO_TIMEOUT_MSG,
 430                                 master_node);
 431                 if (ret)
 432                         mlog(ML_ERROR, "send NEGO_TIMEOUT msg to node %d fail %d\n",
 433                                 master_node, ret);
 434         }
 435 }
 436 
 437 static int o2hb_nego_timeout_handler(struct o2net_msg *msg, u32 len, void *data,
 438                                 void **ret_data)
 439 {
 440         struct o2hb_region *reg = data;
 441         struct o2hb_nego_msg *nego_msg;
 442 
 443         nego_msg = (struct o2hb_nego_msg *)msg->buf;
 444         printk(KERN_NOTICE "o2hb: receive negotiate timeout message from node %d on region %s (%s).\n",
 445                 nego_msg->node_num, config_item_name(&reg->hr_item), reg->hr_dev_name);
 446         if (nego_msg->node_num < O2NM_MAX_NODES)
 447                 set_bit(nego_msg->node_num, reg->hr_nego_node_bitmap);
 448         else
 449                 mlog(ML_ERROR, "got nego timeout message from bad node.\n");
 450 
 451         return 0;
 452 }
 453 
 454 static int o2hb_nego_approve_handler(struct o2net_msg *msg, u32 len, void *data,
 455                                 void **ret_data)
 456 {
 457         struct o2hb_region *reg = data;
 458 
 459         printk(KERN_NOTICE "o2hb: negotiate timeout approved by master node on region %s (%s).\n",
 460                 config_item_name(&reg->hr_item), reg->hr_dev_name);
 461         o2hb_arm_timeout(reg);
 462         return 0;
 463 }
 464 
 465 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
 466 {
 467         atomic_set(&wc->wc_num_reqs, 1);
 468         init_completion(&wc->wc_io_complete);
 469         wc->wc_error = 0;
 470 }
 471 
 472 /* Used in error paths too */
 473 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
 474                                      unsigned int num)
 475 {
 476         /* sadly atomic_sub_and_test() isn't available on all platforms.  The
 477          * good news is that the fast path only completes one at a time */
 478         while(num--) {
 479                 if (atomic_dec_and_test(&wc->wc_num_reqs)) {
 480                         BUG_ON(num > 0);
 481                         complete(&wc->wc_io_complete);
 482                 }
 483         }
 484 }
 485 
 486 static void o2hb_wait_on_io(struct o2hb_bio_wait_ctxt *wc)
 487 {
 488         o2hb_bio_wait_dec(wc, 1);
 489         wait_for_completion(&wc->wc_io_complete);
 490 }
 491 
 492 static void o2hb_bio_end_io(struct bio *bio)
 493 {
 494         struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
 495 
 496         if (bio->bi_status) {
 497                 mlog(ML_ERROR, "IO Error %d\n", bio->bi_status);
 498                 wc->wc_error = blk_status_to_errno(bio->bi_status);
 499         }
 500 
 501         o2hb_bio_wait_dec(wc, 1);
 502         bio_put(bio);
 503 }
 504 
 505 /* Setup a Bio to cover I/O against num_slots slots starting at
 506  * start_slot. */
 507 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
 508                                       struct o2hb_bio_wait_ctxt *wc,
 509                                       unsigned int *current_slot,
 510                                       unsigned int max_slots, int op,
 511                                       int op_flags)
 512 {
 513         int len, current_page;
 514         unsigned int vec_len, vec_start;
 515         unsigned int bits = reg->hr_block_bits;
 516         unsigned int spp = reg->hr_slots_per_page;
 517         unsigned int cs = *current_slot;
 518         struct bio *bio;
 519         struct page *page;
 520 
 521         /* Testing has shown this allocation to take long enough under
 522          * GFP_KERNEL that the local node can get fenced. It would be
 523          * nicest if we could pre-allocate these bios and avoid this
 524          * all together. */
 525         bio = bio_alloc(GFP_ATOMIC, 16);
 526         if (!bio) {
 527                 mlog(ML_ERROR, "Could not alloc slots BIO!\n");
 528                 bio = ERR_PTR(-ENOMEM);
 529                 goto bail;
 530         }
 531 
 532         /* Must put everything in 512 byte sectors for the bio... */
 533         bio->bi_iter.bi_sector = (reg->hr_start_block + cs) << (bits - 9);
 534         bio_set_dev(bio, reg->hr_bdev);
 535         bio->bi_private = wc;
 536         bio->bi_end_io = o2hb_bio_end_io;
 537         bio_set_op_attrs(bio, op, op_flags);
 538 
 539         vec_start = (cs << bits) % PAGE_SIZE;
 540         while(cs < max_slots) {
 541                 current_page = cs / spp;
 542                 page = reg->hr_slot_data[current_page];
 543 
 544                 vec_len = min(PAGE_SIZE - vec_start,
 545                               (max_slots-cs) * (PAGE_SIZE/spp) );
 546 
 547                 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
 548                      current_page, vec_len, vec_start);
 549 
 550                 len = bio_add_page(bio, page, vec_len, vec_start);
 551                 if (len != vec_len) break;
 552 
 553                 cs += vec_len / (PAGE_SIZE/spp);
 554                 vec_start = 0;
 555         }
 556 
 557 bail:
 558         *current_slot = cs;
 559         return bio;
 560 }
 561 
 562 static int o2hb_read_slots(struct o2hb_region *reg,
 563                            unsigned int begin_slot,
 564                            unsigned int max_slots)
 565 {
 566         unsigned int current_slot = begin_slot;
 567         int status;
 568         struct o2hb_bio_wait_ctxt wc;
 569         struct bio *bio;
 570 
 571         o2hb_bio_wait_init(&wc);
 572 
 573         while(current_slot < max_slots) {
 574                 bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots,
 575                                          REQ_OP_READ, 0);
 576                 if (IS_ERR(bio)) {
 577                         status = PTR_ERR(bio);
 578                         mlog_errno(status);
 579                         goto bail_and_wait;
 580                 }
 581 
 582                 atomic_inc(&wc.wc_num_reqs);
 583                 submit_bio(bio);
 584         }
 585 
 586         status = 0;
 587 
 588 bail_and_wait:
 589         o2hb_wait_on_io(&wc);
 590         if (wc.wc_error && !status)
 591                 status = wc.wc_error;
 592 
 593         return status;
 594 }
 595 
 596 static int o2hb_issue_node_write(struct o2hb_region *reg,
 597                                  struct o2hb_bio_wait_ctxt *write_wc)
 598 {
 599         int status;
 600         unsigned int slot;
 601         struct bio *bio;
 602 
 603         o2hb_bio_wait_init(write_wc);
 604 
 605         slot = o2nm_this_node();
 606 
 607         bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1, REQ_OP_WRITE,
 608                                  REQ_SYNC);
 609         if (IS_ERR(bio)) {
 610                 status = PTR_ERR(bio);
 611                 mlog_errno(status);
 612                 goto bail;
 613         }
 614 
 615         atomic_inc(&write_wc->wc_num_reqs);
 616         submit_bio(bio);
 617 
 618         status = 0;
 619 bail:
 620         return status;
 621 }
 622 
 623 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
 624                                      struct o2hb_disk_heartbeat_block *hb_block)
 625 {
 626         __le32 old_cksum;
 627         u32 ret;
 628 
 629         /* We want to compute the block crc with a 0 value in the
 630          * hb_cksum field. Save it off here and replace after the
 631          * crc. */
 632         old_cksum = hb_block->hb_cksum;
 633         hb_block->hb_cksum = 0;
 634 
 635         ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
 636 
 637         hb_block->hb_cksum = old_cksum;
 638 
 639         return ret;
 640 }
 641 
 642 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
 643 {
 644         mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
 645              "cksum = 0x%x, generation 0x%llx\n",
 646              (long long)le64_to_cpu(hb_block->hb_seq),
 647              hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
 648              (long long)le64_to_cpu(hb_block->hb_generation));
 649 }
 650 
 651 static int o2hb_verify_crc(struct o2hb_region *reg,
 652                            struct o2hb_disk_heartbeat_block *hb_block)
 653 {
 654         u32 read, computed;
 655 
 656         read = le32_to_cpu(hb_block->hb_cksum);
 657         computed = o2hb_compute_block_crc_le(reg, hb_block);
 658 
 659         return read == computed;
 660 }
 661 
 662 /*
 663  * Compare the slot data with what we wrote in the last iteration.
 664  * If the match fails, print an appropriate error message. This is to
 665  * detect errors like... another node hearting on the same slot,
 666  * flaky device that is losing writes, etc.
 667  * Returns 1 if check succeeds, 0 otherwise.
 668  */
 669 static int o2hb_check_own_slot(struct o2hb_region *reg)
 670 {
 671         struct o2hb_disk_slot *slot;
 672         struct o2hb_disk_heartbeat_block *hb_block;
 673         char *errstr;
 674 
 675         slot = &reg->hr_slots[o2nm_this_node()];
 676         /* Don't check on our 1st timestamp */
 677         if (!slot->ds_last_time)
 678                 return 0;
 679 
 680         hb_block = slot->ds_raw_block;
 681         if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time &&
 682             le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation &&
 683             hb_block->hb_node == slot->ds_node_num)
 684                 return 1;
 685 
 686 #define ERRSTR1         "Another node is heartbeating on device"
 687 #define ERRSTR2         "Heartbeat generation mismatch on device"
 688 #define ERRSTR3         "Heartbeat sequence mismatch on device"
 689 
 690         if (hb_block->hb_node != slot->ds_node_num)
 691                 errstr = ERRSTR1;
 692         else if (le64_to_cpu(hb_block->hb_generation) !=
 693                  slot->ds_last_generation)
 694                 errstr = ERRSTR2;
 695         else
 696                 errstr = ERRSTR3;
 697 
 698         mlog(ML_ERROR, "%s (%s): expected(%u:0x%llx, 0x%llx), "
 699              "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_dev_name,
 700              slot->ds_node_num, (unsigned long long)slot->ds_last_generation,
 701              (unsigned long long)slot->ds_last_time, hb_block->hb_node,
 702              (unsigned long long)le64_to_cpu(hb_block->hb_generation),
 703              (unsigned long long)le64_to_cpu(hb_block->hb_seq));
 704 
 705         return 0;
 706 }
 707 
 708 static inline void o2hb_prepare_block(struct o2hb_region *reg,
 709                                       u64 generation)
 710 {
 711         int node_num;
 712         u64 cputime;
 713         struct o2hb_disk_slot *slot;
 714         struct o2hb_disk_heartbeat_block *hb_block;
 715 
 716         node_num = o2nm_this_node();
 717         slot = &reg->hr_slots[node_num];
 718 
 719         hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
 720         memset(hb_block, 0, reg->hr_block_bytes);
 721         /* TODO: time stuff */
 722         cputime = ktime_get_real_seconds();
 723         if (!cputime)
 724                 cputime = 1;
 725 
 726         hb_block->hb_seq = cpu_to_le64(cputime);
 727         hb_block->hb_node = node_num;
 728         hb_block->hb_generation = cpu_to_le64(generation);
 729         hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
 730 
 731         /* This step must always happen last! */
 732         hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
 733                                                                    hb_block));
 734 
 735         mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
 736              (long long)generation,
 737              le32_to_cpu(hb_block->hb_cksum));
 738 }
 739 
 740 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
 741                                 struct o2nm_node *node,
 742                                 int idx)
 743 {
 744         struct o2hb_callback_func *f;
 745 
 746         list_for_each_entry(f, &hbcall->list, hc_item) {
 747                 mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
 748                 (f->hc_func)(node, idx, f->hc_data);
 749         }
 750 }
 751 
 752 /* Will run the list in order until we process the passed event */
 753 static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
 754 {
 755         struct o2hb_callback *hbcall;
 756         struct o2hb_node_event *event;
 757 
 758         /* Holding callback sem assures we don't alter the callback
 759          * lists when doing this, and serializes ourselves with other
 760          * processes wanting callbacks. */
 761         down_write(&o2hb_callback_sem);
 762 
 763         spin_lock(&o2hb_live_lock);
 764         while (!list_empty(&o2hb_node_events)
 765                && !list_empty(&queued_event->hn_item)) {
 766                 event = list_entry(o2hb_node_events.next,
 767                                    struct o2hb_node_event,
 768                                    hn_item);
 769                 list_del_init(&event->hn_item);
 770                 spin_unlock(&o2hb_live_lock);
 771 
 772                 mlog(ML_HEARTBEAT, "Node %s event for %d\n",
 773                      event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
 774                      event->hn_node_num);
 775 
 776                 hbcall = hbcall_from_type(event->hn_event_type);
 777 
 778                 /* We should *never* have gotten on to the list with a
 779                  * bad type... This isn't something that we should try
 780                  * to recover from. */
 781                 BUG_ON(IS_ERR(hbcall));
 782 
 783                 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
 784 
 785                 spin_lock(&o2hb_live_lock);
 786         }
 787         spin_unlock(&o2hb_live_lock);
 788 
 789         up_write(&o2hb_callback_sem);
 790 }
 791 
 792 static void o2hb_queue_node_event(struct o2hb_node_event *event,
 793                                   enum o2hb_callback_type type,
 794                                   struct o2nm_node *node,
 795                                   int node_num)
 796 {
 797         assert_spin_locked(&o2hb_live_lock);
 798 
 799         BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
 800 
 801         event->hn_event_type = type;
 802         event->hn_node = node;
 803         event->hn_node_num = node_num;
 804 
 805         mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
 806              type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
 807 
 808         list_add_tail(&event->hn_item, &o2hb_node_events);
 809 }
 810 
 811 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
 812 {
 813         struct o2hb_node_event event =
 814                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
 815         struct o2nm_node *node;
 816         int queued = 0;
 817 
 818         node = o2nm_get_node_by_num(slot->ds_node_num);
 819         if (!node)
 820                 return;
 821 
 822         spin_lock(&o2hb_live_lock);
 823         if (!list_empty(&slot->ds_live_item)) {
 824                 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
 825                      slot->ds_node_num);
 826 
 827                 list_del_init(&slot->ds_live_item);
 828 
 829                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
 830                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 831 
 832                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
 833                                               slot->ds_node_num);
 834                         queued = 1;
 835                 }
 836         }
 837         spin_unlock(&o2hb_live_lock);
 838 
 839         if (queued)
 840                 o2hb_run_event_list(&event);
 841 
 842         o2nm_node_put(node);
 843 }
 844 
 845 static void o2hb_set_quorum_device(struct o2hb_region *reg)
 846 {
 847         if (!o2hb_global_heartbeat_active())
 848                 return;
 849 
 850         /* Prevent race with o2hb_heartbeat_group_drop_item() */
 851         if (kthread_should_stop())
 852                 return;
 853 
 854         /* Tag region as quorum only after thread reaches steady state */
 855         if (atomic_read(&reg->hr_steady_iterations) != 0)
 856                 return;
 857 
 858         spin_lock(&o2hb_live_lock);
 859 
 860         if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
 861                 goto unlock;
 862 
 863         /*
 864          * A region can be added to the quorum only when it sees all
 865          * live nodes heartbeat on it. In other words, the region has been
 866          * added to all nodes.
 867          */
 868         if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
 869                    sizeof(o2hb_live_node_bitmap)))
 870                 goto unlock;
 871 
 872         printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n",
 873                config_item_name(&reg->hr_item), reg->hr_dev_name);
 874 
 875         set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
 876 
 877         /*
 878          * If global heartbeat active, unpin all regions if the
 879          * region count > CUT_OFF
 880          */
 881         if (bitmap_weight(o2hb_quorum_region_bitmap,
 882                            O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
 883                 o2hb_region_unpin(NULL);
 884 unlock:
 885         spin_unlock(&o2hb_live_lock);
 886 }
 887 
 888 static int o2hb_check_slot(struct o2hb_region *reg,
 889                            struct o2hb_disk_slot *slot)
 890 {
 891         int changed = 0, gen_changed = 0;
 892         struct o2hb_node_event event =
 893                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
 894         struct o2nm_node *node;
 895         struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
 896         u64 cputime;
 897         unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
 898         unsigned int slot_dead_ms;
 899         int tmp;
 900         int queued = 0;
 901 
 902         memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
 903 
 904         /*
 905          * If a node is no longer configured but is still in the livemap, we
 906          * may need to clear that bit from the livemap.
 907          */
 908         node = o2nm_get_node_by_num(slot->ds_node_num);
 909         if (!node) {
 910                 spin_lock(&o2hb_live_lock);
 911                 tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 912                 spin_unlock(&o2hb_live_lock);
 913                 if (!tmp)
 914                         return 0;
 915         }
 916 
 917         if (!o2hb_verify_crc(reg, hb_block)) {
 918                 /* all paths from here will drop o2hb_live_lock for
 919                  * us. */
 920                 spin_lock(&o2hb_live_lock);
 921 
 922                 /* Don't print an error on the console in this case -
 923                  * a freshly formatted heartbeat area will not have a
 924                  * crc set on it. */
 925                 if (list_empty(&slot->ds_live_item))
 926                         goto out;
 927 
 928                 /* The node is live but pushed out a bad crc. We
 929                  * consider it a transient miss but don't populate any
 930                  * other values as they may be junk. */
 931                 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
 932                      slot->ds_node_num, reg->hr_dev_name);
 933                 o2hb_dump_slot(hb_block);
 934 
 935                 slot->ds_equal_samples++;
 936                 goto fire_callbacks;
 937         }
 938 
 939         /* we don't care if these wrap.. the state transitions below
 940          * clear at the right places */
 941         cputime = le64_to_cpu(hb_block->hb_seq);
 942         if (slot->ds_last_time != cputime)
 943                 slot->ds_changed_samples++;
 944         else
 945                 slot->ds_equal_samples++;
 946         slot->ds_last_time = cputime;
 947 
 948         /* The node changed heartbeat generations. We assume this to
 949          * mean it dropped off but came back before we timed out. We
 950          * want to consider it down for the time being but don't want
 951          * to lose any changed_samples state we might build up to
 952          * considering it live again. */
 953         if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
 954                 gen_changed = 1;
 955                 slot->ds_equal_samples = 0;
 956                 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
 957                      "to 0x%llx)\n", slot->ds_node_num,
 958                      (long long)slot->ds_last_generation,
 959                      (long long)le64_to_cpu(hb_block->hb_generation));
 960         }
 961 
 962         slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
 963 
 964         mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
 965              "seq %llu last %llu changed %u equal %u\n",
 966              slot->ds_node_num, (long long)slot->ds_last_generation,
 967              le32_to_cpu(hb_block->hb_cksum),
 968              (unsigned long long)le64_to_cpu(hb_block->hb_seq),
 969              (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
 970              slot->ds_equal_samples);
 971 
 972         spin_lock(&o2hb_live_lock);
 973 
 974 fire_callbacks:
 975         /* dead nodes only come to life after some number of
 976          * changes at any time during their dead time */
 977         if (list_empty(&slot->ds_live_item) &&
 978             slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
 979                 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
 980                      slot->ds_node_num, (long long)slot->ds_last_generation);
 981 
 982                 set_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
 983 
 984                 /* first on the list generates a callback */
 985                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
 986                         mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes "
 987                              "bitmap\n", slot->ds_node_num);
 988                         set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 989 
 990                         o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
 991                                               slot->ds_node_num);
 992 
 993                         changed = 1;
 994                         queued = 1;
 995                 }
 996 
 997                 list_add_tail(&slot->ds_live_item,
 998                               &o2hb_live_slots[slot->ds_node_num]);
 999 
1000                 slot->ds_equal_samples = 0;
1001 
1002                 /* We want to be sure that all nodes agree on the
1003                  * number of milliseconds before a node will be
1004                  * considered dead. The self-fencing timeout is
1005                  * computed from this value, and a discrepancy might
1006                  * result in heartbeat calling a node dead when it
1007                  * hasn't self-fenced yet. */
1008                 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
1009                 if (slot_dead_ms && slot_dead_ms != dead_ms) {
1010                         /* TODO: Perhaps we can fail the region here. */
1011                         mlog(ML_ERROR, "Node %d on device %s has a dead count "
1012                              "of %u ms, but our count is %u ms.\n"
1013                              "Please double check your configuration values "
1014                              "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
1015                              slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
1016                              dead_ms);
1017                 }
1018                 goto out;
1019         }
1020 
1021         /* if the list is dead, we're done.. */
1022         if (list_empty(&slot->ds_live_item))
1023                 goto out;
1024 
1025         /* live nodes only go dead after enough consequtive missed
1026          * samples..  reset the missed counter whenever we see
1027          * activity */
1028         if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
1029                 mlog(ML_HEARTBEAT, "Node %d left my region\n",
1030                      slot->ds_node_num);
1031 
1032                 clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
1033 
1034                 /* last off the live_slot generates a callback */
1035                 list_del_init(&slot->ds_live_item);
1036                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
1037                         mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live "
1038                              "nodes bitmap\n", slot->ds_node_num);
1039                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
1040 
1041                         /* node can be null */
1042                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
1043                                               node, slot->ds_node_num);
1044 
1045                         changed = 1;
1046                         queued = 1;
1047                 }
1048 
1049                 /* We don't clear this because the node is still
1050                  * actually writing new blocks. */
1051                 if (!gen_changed)
1052                         slot->ds_changed_samples = 0;
1053                 goto out;
1054         }
1055         if (slot->ds_changed_samples) {
1056                 slot->ds_changed_samples = 0;
1057                 slot->ds_equal_samples = 0;
1058         }
1059 out:
1060         spin_unlock(&o2hb_live_lock);
1061 
1062         if (queued)
1063                 o2hb_run_event_list(&event);
1064 
1065         if (node)
1066                 o2nm_node_put(node);
1067         return changed;
1068 }
1069 
1070 static int o2hb_highest_node(unsigned long *nodes, int numbits)
1071 {
1072         return find_last_bit(nodes, numbits);
1073 }
1074 
1075 static int o2hb_lowest_node(unsigned long *nodes, int numbits)
1076 {
1077         return find_first_bit(nodes, numbits);
1078 }
1079 
1080 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
1081 {
1082         int i, ret, highest_node, lowest_node;
1083         int membership_change = 0, own_slot_ok = 0;
1084         unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
1085         unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
1086         struct o2hb_bio_wait_ctxt write_wc;
1087 
1088         ret = o2nm_configured_node_map(configured_nodes,
1089                                        sizeof(configured_nodes));
1090         if (ret) {
1091                 mlog_errno(ret);
1092                 goto bail;
1093         }
1094 
1095         /*
1096          * If a node is not configured but is in the livemap, we still need
1097          * to read the slot so as to be able to remove it from the livemap.
1098          */
1099         o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
1100         i = -1;
1101         while ((i = find_next_bit(live_node_bitmap,
1102                                   O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1103                 set_bit(i, configured_nodes);
1104         }
1105 
1106         highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
1107         lowest_node = o2hb_lowest_node(configured_nodes, O2NM_MAX_NODES);
1108         if (highest_node >= O2NM_MAX_NODES || lowest_node >= O2NM_MAX_NODES) {
1109                 mlog(ML_NOTICE, "o2hb: No configured nodes found!\n");
1110                 ret = -EINVAL;
1111                 goto bail;
1112         }
1113 
1114         /* No sense in reading the slots of nodes that don't exist
1115          * yet. Of course, if the node definitions have holes in them
1116          * then we're reading an empty slot anyway... Consider this
1117          * best-effort. */
1118         ret = o2hb_read_slots(reg, lowest_node, highest_node + 1);
1119         if (ret < 0) {
1120                 mlog_errno(ret);
1121                 goto bail;
1122         }
1123 
1124         /* With an up to date view of the slots, we can check that no
1125          * other node has been improperly configured to heartbeat in
1126          * our slot. */
1127         own_slot_ok = o2hb_check_own_slot(reg);
1128 
1129         /* fill in the proper info for our next heartbeat */
1130         o2hb_prepare_block(reg, reg->hr_generation);
1131 
1132         ret = o2hb_issue_node_write(reg, &write_wc);
1133         if (ret < 0) {
1134                 mlog_errno(ret);
1135                 goto bail;
1136         }
1137 
1138         i = -1;
1139         while((i = find_next_bit(configured_nodes,
1140                                  O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1141                 membership_change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
1142         }
1143 
1144         /*
1145          * We have to be sure we've advertised ourselves on disk
1146          * before we can go to steady state.  This ensures that
1147          * people we find in our steady state have seen us.
1148          */
1149         o2hb_wait_on_io(&write_wc);
1150         if (write_wc.wc_error) {
1151                 /* Do not re-arm the write timeout on I/O error - we
1152                  * can't be sure that the new block ever made it to
1153                  * disk */
1154                 mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
1155                      write_wc.wc_error, reg->hr_dev_name);
1156                 ret = write_wc.wc_error;
1157                 goto bail;
1158         }
1159 
1160         /* Skip disarming the timeout if own slot has stale/bad data */
1161         if (own_slot_ok) {
1162                 o2hb_set_quorum_device(reg);
1163                 o2hb_arm_timeout(reg);
1164                 reg->hr_last_timeout_start = jiffies;
1165         }
1166 
1167 bail:
1168         /* let the person who launched us know when things are steady */
1169         if (atomic_read(&reg->hr_steady_iterations) != 0) {
1170                 if (!ret && own_slot_ok && !membership_change) {
1171                         if (atomic_dec_and_test(&reg->hr_steady_iterations))
1172                                 wake_up(&o2hb_steady_queue);
1173                 }
1174         }
1175 
1176         if (atomic_read(&reg->hr_steady_iterations) != 0) {
1177                 if (atomic_dec_and_test(&reg->hr_unsteady_iterations)) {
1178                         printk(KERN_NOTICE "o2hb: Unable to stabilize "
1179                                "heartbeat on region %s (%s)\n",
1180                                config_item_name(&reg->hr_item),
1181                                reg->hr_dev_name);
1182                         atomic_set(&reg->hr_steady_iterations, 0);
1183                         reg->hr_aborted_start = 1;
1184                         wake_up(&o2hb_steady_queue);
1185                         ret = -EIO;
1186                 }
1187         }
1188 
1189         return ret;
1190 }
1191 
1192 /*
1193  * we ride the region ref that the region dir holds.  before the region
1194  * dir is removed and drops it ref it will wait to tear down this
1195  * thread.
1196  */
1197 static int o2hb_thread(void *data)
1198 {
1199         int i, ret;
1200         struct o2hb_region *reg = data;
1201         struct o2hb_bio_wait_ctxt write_wc;
1202         ktime_t before_hb, after_hb;
1203         unsigned int elapsed_msec;
1204 
1205         mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
1206 
1207         set_user_nice(current, MIN_NICE);
1208 
1209         /* Pin node */
1210         ret = o2nm_depend_this_node();
1211         if (ret) {
1212                 mlog(ML_ERROR, "Node has been deleted, ret = %d\n", ret);
1213                 reg->hr_node_deleted = 1;
1214                 wake_up(&o2hb_steady_queue);
1215                 return 0;
1216         }
1217 
1218         while (!kthread_should_stop() &&
1219                !reg->hr_unclean_stop && !reg->hr_aborted_start) {
1220                 /* We track the time spent inside
1221                  * o2hb_do_disk_heartbeat so that we avoid more than
1222                  * hr_timeout_ms between disk writes. On busy systems
1223                  * this should result in a heartbeat which is less
1224                  * likely to time itself out. */
1225                 before_hb = ktime_get_real();
1226 
1227                 ret = o2hb_do_disk_heartbeat(reg);
1228                 reg->hr_last_hb_status = ret;
1229 
1230                 after_hb = ktime_get_real();
1231 
1232                 elapsed_msec = (unsigned int)
1233                                 ktime_ms_delta(after_hb, before_hb);
1234 
1235                 mlog(ML_HEARTBEAT,
1236                      "start = %lld, end = %lld, msec = %u, ret = %d\n",
1237                      before_hb, after_hb, elapsed_msec, ret);
1238 
1239                 if (!kthread_should_stop() &&
1240                     elapsed_msec < reg->hr_timeout_ms) {
1241                         /* the kthread api has blocked signals for us so no
1242                          * need to record the return value. */
1243                         msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
1244                 }
1245         }
1246 
1247         o2hb_disarm_timeout(reg);
1248 
1249         /* unclean stop is only used in very bad situation */
1250         for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
1251                 o2hb_shutdown_slot(&reg->hr_slots[i]);
1252 
1253         /* Explicit down notification - avoid forcing the other nodes
1254          * to timeout on this region when we could just as easily
1255          * write a clear generation - thus indicating to them that
1256          * this node has left this region.
1257          */
1258         if (!reg->hr_unclean_stop && !reg->hr_aborted_start) {
1259                 o2hb_prepare_block(reg, 0);
1260                 ret = o2hb_issue_node_write(reg, &write_wc);
1261                 if (ret == 0)
1262                         o2hb_wait_on_io(&write_wc);
1263                 else
1264                         mlog_errno(ret);
1265         }
1266 
1267         /* Unpin node */
1268         o2nm_undepend_this_node();
1269 
1270         mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n");
1271 
1272         return 0;
1273 }
1274 
1275 #ifdef CONFIG_DEBUG_FS
1276 static int o2hb_debug_open(struct inode *inode, struct file *file)
1277 {
1278         struct o2hb_debug_buf *db = inode->i_private;
1279         struct o2hb_region *reg;
1280         unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1281         unsigned long lts;
1282         char *buf = NULL;
1283         int i = -1;
1284         int out = 0;
1285 
1286         /* max_nodes should be the largest bitmap we pass here */
1287         BUG_ON(sizeof(map) < db->db_size);
1288 
1289         buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
1290         if (!buf)
1291                 goto bail;
1292 
1293         switch (db->db_type) {
1294         case O2HB_DB_TYPE_LIVENODES:
1295         case O2HB_DB_TYPE_LIVEREGIONS:
1296         case O2HB_DB_TYPE_QUORUMREGIONS:
1297         case O2HB_DB_TYPE_FAILEDREGIONS:
1298                 spin_lock(&o2hb_live_lock);
1299                 memcpy(map, db->db_data, db->db_size);
1300                 spin_unlock(&o2hb_live_lock);
1301                 break;
1302 
1303         case O2HB_DB_TYPE_REGION_LIVENODES:
1304                 spin_lock(&o2hb_live_lock);
1305                 reg = (struct o2hb_region *)db->db_data;
1306                 memcpy(map, reg->hr_live_node_bitmap, db->db_size);
1307                 spin_unlock(&o2hb_live_lock);
1308                 break;
1309 
1310         case O2HB_DB_TYPE_REGION_NUMBER:
1311                 reg = (struct o2hb_region *)db->db_data;
1312                 out += snprintf(buf + out, PAGE_SIZE - out, "%d\n",
1313                                 reg->hr_region_num);
1314                 goto done;
1315 
1316         case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
1317                 reg = (struct o2hb_region *)db->db_data;
1318                 lts = reg->hr_last_timeout_start;
1319                 /* If 0, it has never been set before */
1320                 if (lts)
1321                         lts = jiffies_to_msecs(jiffies - lts);
1322                 out += snprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts);
1323                 goto done;
1324 
1325         case O2HB_DB_TYPE_REGION_PINNED:
1326                 reg = (struct o2hb_region *)db->db_data;
1327                 out += snprintf(buf + out, PAGE_SIZE - out, "%u\n",
1328                                 !!reg->hr_item_pinned);
1329                 goto done;
1330 
1331         default:
1332                 goto done;
1333         }
1334 
1335         while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len)
1336                 out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
1337         out += snprintf(buf + out, PAGE_SIZE - out, "\n");
1338 
1339 done:
1340         i_size_write(inode, out);
1341 
1342         file->private_data = buf;
1343 
1344         return 0;
1345 bail:
1346         return -ENOMEM;
1347 }
1348 
1349 static int o2hb_debug_release(struct inode *inode, struct file *file)
1350 {
1351         kfree(file->private_data);
1352         return 0;
1353 }
1354 
1355 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1356                                  size_t nbytes, loff_t *ppos)
1357 {
1358         return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
1359                                        i_size_read(file->f_mapping->host));
1360 }
1361 #else
1362 static int o2hb_debug_open(struct inode *inode, struct file *file)
1363 {
1364         return 0;
1365 }
1366 static int o2hb_debug_release(struct inode *inode, struct file *file)
1367 {
1368         return 0;
1369 }
1370 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1371                                size_t nbytes, loff_t *ppos)
1372 {
1373         return 0;
1374 }
1375 #endif  /* CONFIG_DEBUG_FS */
1376 
1377 static const struct file_operations o2hb_debug_fops = {
1378         .open =         o2hb_debug_open,
1379         .release =      o2hb_debug_release,
1380         .read =         o2hb_debug_read,
1381         .llseek =       generic_file_llseek,
1382 };
1383 
1384 void o2hb_exit(void)
1385 {
1386         debugfs_remove_recursive(o2hb_debug_dir);
1387         kfree(o2hb_db_livenodes);
1388         kfree(o2hb_db_liveregions);
1389         kfree(o2hb_db_quorumregions);
1390         kfree(o2hb_db_failedregions);
1391 }
1392 
1393 static void o2hb_debug_create(const char *name, struct dentry *dir,
1394                               struct o2hb_debug_buf **db, int db_len, int type,
1395                               int size, int len, void *data)
1396 {
1397         *db = kmalloc(db_len, GFP_KERNEL);
1398         if (!*db)
1399                 return;
1400 
1401         (*db)->db_type = type;
1402         (*db)->db_size = size;
1403         (*db)->db_len = len;
1404         (*db)->db_data = data;
1405 
1406         debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db, &o2hb_debug_fops);
1407 }
1408 
1409 static void o2hb_debug_init(void)
1410 {
1411         o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
1412 
1413         o2hb_debug_create(O2HB_DEBUG_LIVENODES, o2hb_debug_dir,
1414                           &o2hb_db_livenodes, sizeof(*o2hb_db_livenodes),
1415                           O2HB_DB_TYPE_LIVENODES, sizeof(o2hb_live_node_bitmap),
1416                           O2NM_MAX_NODES, o2hb_live_node_bitmap);
1417 
1418         o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS, o2hb_debug_dir,
1419                           &o2hb_db_liveregions, sizeof(*o2hb_db_liveregions),
1420                           O2HB_DB_TYPE_LIVEREGIONS,
1421                           sizeof(o2hb_live_region_bitmap), O2NM_MAX_REGIONS,
1422                           o2hb_live_region_bitmap);
1423 
1424         o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS, o2hb_debug_dir,
1425                           &o2hb_db_quorumregions,
1426                           sizeof(*o2hb_db_quorumregions),
1427                           O2HB_DB_TYPE_QUORUMREGIONS,
1428                           sizeof(o2hb_quorum_region_bitmap), O2NM_MAX_REGIONS,
1429                           o2hb_quorum_region_bitmap);
1430 
1431         o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS, o2hb_debug_dir,
1432                           &o2hb_db_failedregions,
1433                           sizeof(*o2hb_db_failedregions),
1434                           O2HB_DB_TYPE_FAILEDREGIONS,
1435                           sizeof(o2hb_failed_region_bitmap), O2NM_MAX_REGIONS,
1436                           o2hb_failed_region_bitmap);
1437 }
1438 
1439 void o2hb_init(void)
1440 {
1441         int i;
1442 
1443         for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
1444                 INIT_LIST_HEAD(&o2hb_callbacks[i].list);
1445 
1446         for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
1447                 INIT_LIST_HEAD(&o2hb_live_slots[i]);
1448 
1449         INIT_LIST_HEAD(&o2hb_node_events);
1450 
1451         memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
1452         memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap));
1453         memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap));
1454         memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap));
1455         memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap));
1456 
1457         o2hb_dependent_users = 0;
1458 
1459         o2hb_debug_init();
1460 }
1461 
1462 /* if we're already in a callback then we're already serialized by the sem */
1463 static void o2hb_fill_node_map_from_callback(unsigned long *map,
1464                                              unsigned bytes)
1465 {
1466         BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
1467 
1468         memcpy(map, &o2hb_live_node_bitmap, bytes);
1469 }
1470 
1471 /*
1472  * get a map of all nodes that are heartbeating in any regions
1473  */
1474 void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
1475 {
1476         /* callers want to serialize this map and callbacks so that they
1477          * can trust that they don't miss nodes coming to the party */
1478         down_read(&o2hb_callback_sem);
1479         spin_lock(&o2hb_live_lock);
1480         o2hb_fill_node_map_from_callback(map, bytes);
1481         spin_unlock(&o2hb_live_lock);
1482         up_read(&o2hb_callback_sem);
1483 }
1484 EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
1485 
1486 /*
1487  * heartbeat configfs bits.  The heartbeat set is a default set under
1488  * the cluster set in nodemanager.c.
1489  */
1490 
1491 static struct o2hb_region *to_o2hb_region(struct config_item *item)
1492 {
1493         return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
1494 }
1495 
1496 /* drop_item only drops its ref after killing the thread, nothing should
1497  * be using the region anymore.  this has to clean up any state that
1498  * attributes might have built up. */
1499 static void o2hb_region_release(struct config_item *item)
1500 {
1501         int i;
1502         struct page *page;
1503         struct o2hb_region *reg = to_o2hb_region(item);
1504 
1505         mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name);
1506 
1507         kfree(reg->hr_tmp_block);
1508 
1509         if (reg->hr_slot_data) {
1510                 for (i = 0; i < reg->hr_num_pages; i++) {
1511                         page = reg->hr_slot_data[i];
1512                         if (page)
1513                                 __free_page(page);
1514                 }
1515                 kfree(reg->hr_slot_data);
1516         }
1517 
1518         if (reg->hr_bdev)
1519                 blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1520 
1521         kfree(reg->hr_slots);
1522 
1523         debugfs_remove_recursive(reg->hr_debug_dir);
1524         kfree(reg->hr_db_livenodes);
1525         kfree(reg->hr_db_regnum);
1526         kfree(reg->hr_db_elapsed_time);
1527         kfree(reg->hr_db_pinned);
1528 
1529         spin_lock(&o2hb_live_lock);
1530         list_del(&reg->hr_all_item);
1531         spin_unlock(&o2hb_live_lock);
1532 
1533         o2net_unregister_handler_list(&reg->hr_handler_list);
1534         kfree(reg);
1535 }
1536 
1537 static int o2hb_read_block_input(struct o2hb_region *reg,
1538                                  const char *page,
1539                                  unsigned long *ret_bytes,
1540                                  unsigned int *ret_bits)
1541 {
1542         unsigned long bytes;
1543         char *p = (char *)page;
1544 
1545         bytes = simple_strtoul(p, &p, 0);
1546         if (!p || (*p && (*p != '\n')))
1547                 return -EINVAL;
1548 
1549         /* Heartbeat and fs min / max block sizes are the same. */
1550         if (bytes > 4096 || bytes < 512)
1551                 return -ERANGE;
1552         if (hweight16(bytes) != 1)
1553                 return -EINVAL;
1554 
1555         if (ret_bytes)
1556                 *ret_bytes = bytes;
1557         if (ret_bits)
1558                 *ret_bits = ffs(bytes) - 1;
1559 
1560         return 0;
1561 }
1562 
1563 static ssize_t o2hb_region_block_bytes_show(struct config_item *item,
1564                                             char *page)
1565 {
1566         return sprintf(page, "%u\n", to_o2hb_region(item)->hr_block_bytes);
1567 }
1568 
1569 static ssize_t o2hb_region_block_bytes_store(struct config_item *item,
1570                                              const char *page,
1571                                              size_t count)
1572 {
1573         struct o2hb_region *reg = to_o2hb_region(item);
1574         int status;
1575         unsigned long block_bytes;
1576         unsigned int block_bits;
1577 
1578         if (reg->hr_bdev)
1579                 return -EINVAL;
1580 
1581         status = o2hb_read_block_input(reg, page, &block_bytes,
1582                                        &block_bits);
1583         if (status)
1584                 return status;
1585 
1586         reg->hr_block_bytes = (unsigned int)block_bytes;
1587         reg->hr_block_bits = block_bits;
1588 
1589         return count;
1590 }
1591 
1592 static ssize_t o2hb_region_start_block_show(struct config_item *item,
1593                                             char *page)
1594 {
1595         return sprintf(page, "%llu\n", to_o2hb_region(item)->hr_start_block);
1596 }
1597 
1598 static ssize_t o2hb_region_start_block_store(struct config_item *item,
1599                                              const char *page,
1600                                              size_t count)
1601 {
1602         struct o2hb_region *reg = to_o2hb_region(item);
1603         unsigned long long tmp;
1604         char *p = (char *)page;
1605 
1606         if (reg->hr_bdev)
1607                 return -EINVAL;
1608 
1609         tmp = simple_strtoull(p, &p, 0);
1610         if (!p || (*p && (*p != '\n')))
1611                 return -EINVAL;
1612 
1613         reg->hr_start_block = tmp;
1614 
1615         return count;
1616 }
1617 
1618 static ssize_t o2hb_region_blocks_show(struct config_item *item, char *page)
1619 {
1620         return sprintf(page, "%d\n", to_o2hb_region(item)->hr_blocks);
1621 }
1622 
1623 static ssize_t o2hb_region_blocks_store(struct config_item *item,
1624                                         const char *page,
1625                                         size_t count)
1626 {
1627         struct o2hb_region *reg = to_o2hb_region(item);
1628         unsigned long tmp;
1629         char *p = (char *)page;
1630 
1631         if (reg->hr_bdev)
1632                 return -EINVAL;
1633 
1634         tmp = simple_strtoul(p, &p, 0);
1635         if (!p || (*p && (*p != '\n')))
1636                 return -EINVAL;
1637 
1638         if (tmp > O2NM_MAX_NODES || tmp == 0)
1639                 return -ERANGE;
1640 
1641         reg->hr_blocks = (unsigned int)tmp;
1642 
1643         return count;
1644 }
1645 
1646 static ssize_t o2hb_region_dev_show(struct config_item *item, char *page)
1647 {
1648         unsigned int ret = 0;
1649 
1650         if (to_o2hb_region(item)->hr_bdev)
1651                 ret = sprintf(page, "%s\n", to_o2hb_region(item)->hr_dev_name);
1652 
1653         return ret;
1654 }
1655 
1656 static void o2hb_init_region_params(struct o2hb_region *reg)
1657 {
1658         reg->hr_slots_per_page = PAGE_SIZE >> reg->hr_block_bits;
1659         reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1660 
1661         mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1662              reg->hr_start_block, reg->hr_blocks);
1663         mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1664              reg->hr_block_bytes, reg->hr_block_bits);
1665         mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1666         mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1667 }
1668 
1669 static int o2hb_map_slot_data(struct o2hb_region *reg)
1670 {
1671         int i, j;
1672         unsigned int last_slot;
1673         unsigned int spp = reg->hr_slots_per_page;
1674         struct page *page;
1675         char *raw;
1676         struct o2hb_disk_slot *slot;
1677 
1678         reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1679         if (reg->hr_tmp_block == NULL)
1680                 return -ENOMEM;
1681 
1682         reg->hr_slots = kcalloc(reg->hr_blocks,
1683                                 sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1684         if (reg->hr_slots == NULL)
1685                 return -ENOMEM;
1686 
1687         for(i = 0; i < reg->hr_blocks; i++) {
1688                 slot = &reg->hr_slots[i];
1689                 slot->ds_node_num = i;
1690                 INIT_LIST_HEAD(&slot->ds_live_item);
1691                 slot->ds_raw_block = NULL;
1692         }
1693 
1694         reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1695         mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1696                            "at %u blocks per page\n",
1697              reg->hr_num_pages, reg->hr_blocks, spp);
1698 
1699         reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1700                                     GFP_KERNEL);
1701         if (!reg->hr_slot_data)
1702                 return -ENOMEM;
1703 
1704         for(i = 0; i < reg->hr_num_pages; i++) {
1705                 page = alloc_page(GFP_KERNEL);
1706                 if (!page)
1707                         return -ENOMEM;
1708 
1709                 reg->hr_slot_data[i] = page;
1710 
1711                 last_slot = i * spp;
1712                 raw = page_address(page);
1713                 for (j = 0;
1714                      (j < spp) && ((j + last_slot) < reg->hr_blocks);
1715                      j++) {
1716                         BUG_ON((j + last_slot) >= reg->hr_blocks);
1717 
1718                         slot = &reg->hr_slots[j + last_slot];
1719                         slot->ds_raw_block =
1720                                 (struct o2hb_disk_heartbeat_block *) raw;
1721 
1722                         raw += reg->hr_block_bytes;
1723                 }
1724         }
1725 
1726         return 0;
1727 }
1728 
1729 /* Read in all the slots available and populate the tracking
1730  * structures so that we can start with a baseline idea of what's
1731  * there. */
1732 static int o2hb_populate_slot_data(struct o2hb_region *reg)
1733 {
1734         int ret, i;
1735         struct o2hb_disk_slot *slot;
1736         struct o2hb_disk_heartbeat_block *hb_block;
1737 
1738         ret = o2hb_read_slots(reg, 0, reg->hr_blocks);
1739         if (ret)
1740                 goto out;
1741 
1742         /* We only want to get an idea of the values initially in each
1743          * slot, so we do no verification - o2hb_check_slot will
1744          * actually determine if each configured slot is valid and
1745          * whether any values have changed. */
1746         for(i = 0; i < reg->hr_blocks; i++) {
1747                 slot = &reg->hr_slots[i];
1748                 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1749 
1750                 /* Only fill the values that o2hb_check_slot uses to
1751                  * determine changing slots */
1752                 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1753                 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1754         }
1755 
1756 out:
1757         return ret;
1758 }
1759 
1760 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1761 static ssize_t o2hb_region_dev_store(struct config_item *item,
1762                                      const char *page,
1763                                      size_t count)
1764 {
1765         struct o2hb_region *reg = to_o2hb_region(item);
1766         struct task_struct *hb_task;
1767         long fd;
1768         int sectsize;
1769         char *p = (char *)page;
1770         struct fd f;
1771         struct inode *inode;
1772         ssize_t ret = -EINVAL;
1773         int live_threshold;
1774 
1775         if (reg->hr_bdev)
1776                 goto out;
1777 
1778         /* We can't heartbeat without having had our node number
1779          * configured yet. */
1780         if (o2nm_this_node() == O2NM_MAX_NODES)
1781                 goto out;
1782 
1783         fd = simple_strtol(p, &p, 0);
1784         if (!p || (*p && (*p != '\n')))
1785                 goto out;
1786 
1787         if (fd < 0 || fd >= INT_MAX)
1788                 goto out;
1789 
1790         f = fdget(fd);
1791         if (f.file == NULL)
1792                 goto out;
1793 
1794         if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1795             reg->hr_block_bytes == 0)
1796                 goto out2;
1797 
1798         inode = igrab(f.file->f_mapping->host);
1799         if (inode == NULL)
1800                 goto out2;
1801 
1802         if (!S_ISBLK(inode->i_mode))
1803                 goto out3;
1804 
1805         reg->hr_bdev = I_BDEV(f.file->f_mapping->host);
1806         ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, NULL);
1807         if (ret) {
1808                 reg->hr_bdev = NULL;
1809                 goto out3;
1810         }
1811         inode = NULL;
1812 
1813         bdevname(reg->hr_bdev, reg->hr_dev_name);
1814 
1815         sectsize = bdev_logical_block_size(reg->hr_bdev);
1816         if (sectsize != reg->hr_block_bytes) {
1817                 mlog(ML_ERROR,
1818                      "blocksize %u incorrect for device, expected %d",
1819                      reg->hr_block_bytes, sectsize);
1820                 ret = -EINVAL;
1821                 goto out3;
1822         }
1823 
1824         o2hb_init_region_params(reg);
1825 
1826         /* Generation of zero is invalid */
1827         do {
1828                 get_random_bytes(&reg->hr_generation,
1829                                  sizeof(reg->hr_generation));
1830         } while (reg->hr_generation == 0);
1831 
1832         ret = o2hb_map_slot_data(reg);
1833         if (ret) {
1834                 mlog_errno(ret);
1835                 goto out3;
1836         }
1837 
1838         ret = o2hb_populate_slot_data(reg);
1839         if (ret) {
1840                 mlog_errno(ret);
1841                 goto out3;
1842         }
1843 
1844         INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1845         INIT_DELAYED_WORK(&reg->hr_nego_timeout_work, o2hb_nego_timeout);
1846 
1847         /*
1848          * A node is considered live after it has beat LIVE_THRESHOLD
1849          * times.  We're not steady until we've given them a chance
1850          * _after_ our first read.
1851          * The default threshold is bare minimum so as to limit the delay
1852          * during mounts. For global heartbeat, the threshold doubled for the
1853          * first region.
1854          */
1855         live_threshold = O2HB_LIVE_THRESHOLD;
1856         if (o2hb_global_heartbeat_active()) {
1857                 spin_lock(&o2hb_live_lock);
1858                 if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1)
1859                         live_threshold <<= 1;
1860                 spin_unlock(&o2hb_live_lock);
1861         }
1862         ++live_threshold;
1863         atomic_set(&reg->hr_steady_iterations, live_threshold);
1864         /* unsteady_iterations is triple the steady_iterations */
1865         atomic_set(&reg->hr_unsteady_iterations, (live_threshold * 3));
1866 
1867         hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1868                               reg->hr_item.ci_name);
1869         if (IS_ERR(hb_task)) {
1870                 ret = PTR_ERR(hb_task);
1871                 mlog_errno(ret);
1872                 goto out3;
1873         }
1874 
1875         spin_lock(&o2hb_live_lock);
1876         reg->hr_task = hb_task;
1877         spin_unlock(&o2hb_live_lock);
1878 
1879         ret = wait_event_interruptible(o2hb_steady_queue,
1880                                 atomic_read(&reg->hr_steady_iterations) == 0 ||
1881                                 reg->hr_node_deleted);
1882         if (ret) {
1883                 atomic_set(&reg->hr_steady_iterations, 0);
1884                 reg->hr_aborted_start = 1;
1885         }
1886 
1887         if (reg->hr_aborted_start) {
1888                 ret = -EIO;
1889                 goto out3;
1890         }
1891 
1892         if (reg->hr_node_deleted) {
1893                 ret = -EINVAL;
1894                 goto out3;
1895         }
1896 
1897         /* Ok, we were woken.  Make sure it wasn't by drop_item() */
1898         spin_lock(&o2hb_live_lock);
1899         hb_task = reg->hr_task;
1900         if (o2hb_global_heartbeat_active())
1901                 set_bit(reg->hr_region_num, o2hb_live_region_bitmap);
1902         spin_unlock(&o2hb_live_lock);
1903 
1904         if (hb_task)
1905                 ret = count;
1906         else
1907                 ret = -EIO;
1908 
1909         if (hb_task && o2hb_global_heartbeat_active())
1910                 printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n",
1911                        config_item_name(&reg->hr_item), reg->hr_dev_name);
1912 
1913 out3:
1914         iput(inode);
1915 out2:
1916         fdput(f);
1917 out:
1918         if (ret < 0) {
1919                 if (reg->hr_bdev) {
1920                         blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1921                         reg->hr_bdev = NULL;
1922                 }
1923         }
1924         return ret;
1925 }
1926 
1927 static ssize_t o2hb_region_pid_show(struct config_item *item, char *page)
1928 {
1929         struct o2hb_region *reg = to_o2hb_region(item);
1930         pid_t pid = 0;
1931 
1932         spin_lock(&o2hb_live_lock);
1933         if (reg->hr_task)
1934                 pid = task_pid_nr(reg->hr_task);
1935         spin_unlock(&o2hb_live_lock);
1936 
1937         if (!pid)
1938                 return 0;
1939 
1940         return sprintf(page, "%u\n", pid);
1941 }
1942 
1943 CONFIGFS_ATTR(o2hb_region_, block_bytes);
1944 CONFIGFS_ATTR(o2hb_region_, start_block);
1945 CONFIGFS_ATTR(o2hb_region_, blocks);
1946 CONFIGFS_ATTR(o2hb_region_, dev);
1947 CONFIGFS_ATTR_RO(o2hb_region_, pid);
1948 
1949 static struct configfs_attribute *o2hb_region_attrs[] = {
1950         &o2hb_region_attr_block_bytes,
1951         &o2hb_region_attr_start_block,
1952         &o2hb_region_attr_blocks,
1953         &o2hb_region_attr_dev,
1954         &o2hb_region_attr_pid,
1955         NULL,
1956 };
1957 
1958 static struct configfs_item_operations o2hb_region_item_ops = {
1959         .release                = o2hb_region_release,
1960 };
1961 
1962 static const struct config_item_type o2hb_region_type = {
1963         .ct_item_ops    = &o2hb_region_item_ops,
1964         .ct_attrs       = o2hb_region_attrs,
1965         .ct_owner       = THIS_MODULE,
1966 };
1967 
1968 /* heartbeat set */
1969 
1970 struct o2hb_heartbeat_group {
1971         struct config_group hs_group;
1972         /* some stuff? */
1973 };
1974 
1975 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
1976 {
1977         return group ?
1978                 container_of(group, struct o2hb_heartbeat_group, hs_group)
1979                 : NULL;
1980 }
1981 
1982 static void o2hb_debug_region_init(struct o2hb_region *reg,
1983                                    struct dentry *parent)
1984 {
1985         struct dentry *dir;
1986 
1987         dir = debugfs_create_dir(config_item_name(&reg->hr_item), parent);
1988         reg->hr_debug_dir = dir;
1989 
1990         o2hb_debug_create(O2HB_DEBUG_LIVENODES, dir, &(reg->hr_db_livenodes),
1991                           sizeof(*(reg->hr_db_livenodes)),
1992                           O2HB_DB_TYPE_REGION_LIVENODES,
1993                           sizeof(reg->hr_live_node_bitmap), O2NM_MAX_NODES,
1994                           reg);
1995 
1996         o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER, dir, &(reg->hr_db_regnum),
1997                           sizeof(*(reg->hr_db_regnum)),
1998                           O2HB_DB_TYPE_REGION_NUMBER, 0, O2NM_MAX_NODES, reg);
1999 
2000         o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME, dir,
2001                           &(reg->hr_db_elapsed_time),
2002                           sizeof(*(reg->hr_db_elapsed_time)),
2003                           O2HB_DB_TYPE_REGION_ELAPSED_TIME, 0, 0, reg);
2004 
2005         o2hb_debug_create(O2HB_DEBUG_REGION_PINNED, dir, &(reg->hr_db_pinned),
2006                           sizeof(*(reg->hr_db_pinned)),
2007                           O2HB_DB_TYPE_REGION_PINNED, 0, 0, reg);
2008 
2009 }
2010 
2011 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
2012                                                           const char *name)
2013 {
2014         struct o2hb_region *reg = NULL;
2015         int ret;
2016 
2017         reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
2018         if (reg == NULL)
2019                 return ERR_PTR(-ENOMEM);
2020 
2021         if (strlen(name) > O2HB_MAX_REGION_NAME_LEN) {
2022                 ret = -ENAMETOOLONG;
2023                 goto free;
2024         }
2025 
2026         spin_lock(&o2hb_live_lock);
2027         reg->hr_region_num = 0;
2028         if (o2hb_global_heartbeat_active()) {
2029                 reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap,
2030                                                          O2NM_MAX_REGIONS);
2031                 if (reg->hr_region_num >= O2NM_MAX_REGIONS) {
2032                         spin_unlock(&o2hb_live_lock);
2033                         ret = -EFBIG;
2034                         goto free;
2035                 }
2036                 set_bit(reg->hr_region_num, o2hb_region_bitmap);
2037         }
2038         list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
2039         spin_unlock(&o2hb_live_lock);
2040 
2041         config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
2042 
2043         /* this is the same way to generate msg key as dlm, for local heartbeat,
2044          * name is also the same, so make initial crc value different to avoid
2045          * message key conflict.
2046          */
2047         reg->hr_key = crc32_le(reg->hr_region_num + O2NM_MAX_REGIONS,
2048                 name, strlen(name));
2049         INIT_LIST_HEAD(&reg->hr_handler_list);
2050         ret = o2net_register_handler(O2HB_NEGO_TIMEOUT_MSG, reg->hr_key,
2051                         sizeof(struct o2hb_nego_msg),
2052                         o2hb_nego_timeout_handler,
2053                         reg, NULL, &reg->hr_handler_list);
2054         if (ret)
2055                 goto free;
2056 
2057         ret = o2net_register_handler(O2HB_NEGO_APPROVE_MSG, reg->hr_key,
2058                         sizeof(struct o2hb_nego_msg),
2059                         o2hb_nego_approve_handler,
2060                         reg, NULL, &reg->hr_handler_list);
2061         if (ret)
2062                 goto unregister_handler;
2063 
2064         o2hb_debug_region_init(reg, o2hb_debug_dir);
2065 
2066         return &reg->hr_item;
2067 
2068 unregister_handler:
2069         o2net_unregister_handler_list(&reg->hr_handler_list);
2070 free:
2071         kfree(reg);
2072         return ERR_PTR(ret);
2073 }
2074 
2075 static void o2hb_heartbeat_group_drop_item(struct config_group *group,
2076                                            struct config_item *item)
2077 {
2078         struct task_struct *hb_task;
2079         struct o2hb_region *reg = to_o2hb_region(item);
2080         int quorum_region = 0;
2081 
2082         /* stop the thread when the user removes the region dir */
2083         spin_lock(&o2hb_live_lock);
2084         hb_task = reg->hr_task;
2085         reg->hr_task = NULL;
2086         reg->hr_item_dropped = 1;
2087         spin_unlock(&o2hb_live_lock);
2088 
2089         if (hb_task)
2090                 kthread_stop(hb_task);
2091 
2092         if (o2hb_global_heartbeat_active()) {
2093                 spin_lock(&o2hb_live_lock);
2094                 clear_bit(reg->hr_region_num, o2hb_region_bitmap);
2095                 clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
2096                 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
2097                         quorum_region = 1;
2098                 clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
2099                 spin_unlock(&o2hb_live_lock);
2100                 printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n",
2101                        ((atomic_read(&reg->hr_steady_iterations) == 0) ?
2102                         "stopped" : "start aborted"), config_item_name(item),
2103                        reg->hr_dev_name);
2104         }
2105 
2106         /*
2107          * If we're racing a dev_write(), we need to wake them.  They will
2108          * check reg->hr_task
2109          */
2110         if (atomic_read(&reg->hr_steady_iterations) != 0) {
2111                 reg->hr_aborted_start = 1;
2112                 atomic_set(&reg->hr_steady_iterations, 0);
2113                 wake_up(&o2hb_steady_queue);
2114         }
2115 
2116         config_item_put(item);
2117 
2118         if (!o2hb_global_heartbeat_active() || !quorum_region)
2119                 return;
2120 
2121         /*
2122          * If global heartbeat active and there are dependent users,
2123          * pin all regions if quorum region count <= CUT_OFF
2124          */
2125         spin_lock(&o2hb_live_lock);
2126 
2127         if (!o2hb_dependent_users)
2128                 goto unlock;
2129 
2130         if (bitmap_weight(o2hb_quorum_region_bitmap,
2131                            O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2132                 o2hb_region_pin(NULL);
2133 
2134 unlock:
2135         spin_unlock(&o2hb_live_lock);
2136 }
2137 
2138 static ssize_t o2hb_heartbeat_group_dead_threshold_show(struct config_item *item,
2139                 char *page)
2140 {
2141         return sprintf(page, "%u\n", o2hb_dead_threshold);
2142 }
2143 
2144 static ssize_t o2hb_heartbeat_group_dead_threshold_store(struct config_item *item,
2145                 const char *page, size_t count)
2146 {
2147         unsigned long tmp;
2148         char *p = (char *)page;
2149 
2150         tmp = simple_strtoul(p, &p, 10);
2151         if (!p || (*p && (*p != '\n')))
2152                 return -EINVAL;
2153 
2154         /* this will validate ranges for us. */
2155         o2hb_dead_threshold_set((unsigned int) tmp);
2156 
2157         return count;
2158 }
2159 
2160 static ssize_t o2hb_heartbeat_group_mode_show(struct config_item *item,
2161                 char *page)
2162 {
2163         return sprintf(page, "%s\n",
2164                        o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]);
2165 }
2166 
2167 static ssize_t o2hb_heartbeat_group_mode_store(struct config_item *item,
2168                 const char *page, size_t count)
2169 {
2170         unsigned int i;
2171         int ret;
2172         size_t len;
2173 
2174         len = (page[count - 1] == '\n') ? count - 1 : count;
2175         if (!len)
2176                 return -EINVAL;
2177 
2178         for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) {
2179                 if (strncasecmp(page, o2hb_heartbeat_mode_desc[i], len))
2180                         continue;
2181 
2182                 ret = o2hb_global_heartbeat_mode_set(i);
2183                 if (!ret)
2184                         printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
2185                                o2hb_heartbeat_mode_desc[i]);
2186                 return count;
2187         }
2188 
2189         return -EINVAL;
2190 
2191 }
2192 
2193 CONFIGFS_ATTR(o2hb_heartbeat_group_, dead_threshold);
2194 CONFIGFS_ATTR(o2hb_heartbeat_group_, mode);
2195 
2196 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
2197         &o2hb_heartbeat_group_attr_dead_threshold,
2198         &o2hb_heartbeat_group_attr_mode,
2199         NULL,
2200 };
2201 
2202 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
2203         .make_item      = o2hb_heartbeat_group_make_item,
2204         .drop_item      = o2hb_heartbeat_group_drop_item,
2205 };
2206 
2207 static const struct config_item_type o2hb_heartbeat_group_type = {
2208         .ct_group_ops   = &o2hb_heartbeat_group_group_ops,
2209         .ct_attrs       = o2hb_heartbeat_group_attrs,
2210         .ct_owner       = THIS_MODULE,
2211 };
2212 
2213 /* this is just here to avoid touching group in heartbeat.h which the
2214  * entire damn world #includes */
2215 struct config_group *o2hb_alloc_hb_set(void)
2216 {
2217         struct o2hb_heartbeat_group *hs = NULL;
2218         struct config_group *ret = NULL;
2219 
2220         hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
2221         if (hs == NULL)
2222                 goto out;
2223 
2224         config_group_init_type_name(&hs->hs_group, "heartbeat",
2225                                     &o2hb_heartbeat_group_type);
2226 
2227         ret = &hs->hs_group;
2228 out:
2229         if (ret == NULL)
2230                 kfree(hs);
2231         return ret;
2232 }
2233 
2234 void o2hb_free_hb_set(struct config_group *group)
2235 {
2236         struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
2237         kfree(hs);
2238 }
2239 
2240 /* hb callback registration and issuing */
2241 
2242 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
2243 {
2244         if (type == O2HB_NUM_CB)
2245                 return ERR_PTR(-EINVAL);
2246 
2247         return &o2hb_callbacks[type];
2248 }
2249 
2250 void o2hb_setup_callback(struct o2hb_callback_func *hc,
2251                          enum o2hb_callback_type type,
2252                          o2hb_cb_func *func,
2253                          void *data,
2254                          int priority)
2255 {
2256         INIT_LIST_HEAD(&hc->hc_item);
2257         hc->hc_func = func;
2258         hc->hc_data = data;
2259         hc->hc_priority = priority;
2260         hc->hc_type = type;
2261         hc->hc_magic = O2HB_CB_MAGIC;
2262 }
2263 EXPORT_SYMBOL_GPL(o2hb_setup_callback);
2264 
2265 /*
2266  * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2267  * In global heartbeat mode, region_uuid passed is NULL.
2268  *
2269  * In local, we only pin the matching region. In global we pin all the active
2270  * regions.
2271  */
2272 static int o2hb_region_pin(const char *region_uuid)
2273 {
2274         int ret = 0, found = 0;
2275         struct o2hb_region *reg;
2276         char *uuid;
2277 
2278         assert_spin_locked(&o2hb_live_lock);
2279 
2280         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2281                 if (reg->hr_item_dropped)
2282                         continue;
2283 
2284                 uuid = config_item_name(&reg->hr_item);
2285 
2286                 /* local heartbeat */
2287                 if (region_uuid) {
2288                         if (strcmp(region_uuid, uuid))
2289                                 continue;
2290                         found = 1;
2291                 }
2292 
2293                 if (reg->hr_item_pinned || reg->hr_item_dropped)
2294                         goto skip_pin;
2295 
2296                 /* Ignore ENOENT only for local hb (userdlm domain) */
2297                 ret = o2nm_depend_item(&reg->hr_item);
2298                 if (!ret) {
2299                         mlog(ML_CLUSTER, "Pin region %s\n", uuid);
2300                         reg->hr_item_pinned = 1;
2301                 } else {
2302                         if (ret == -ENOENT && found)
2303                                 ret = 0;
2304                         else {
2305                                 mlog(ML_ERROR, "Pin region %s fails with %d\n",
2306                                      uuid, ret);
2307                                 break;
2308                         }
2309                 }
2310 skip_pin:
2311                 if (found)
2312                         break;
2313         }
2314 
2315         return ret;
2316 }
2317 
2318 /*
2319  * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2320  * In global heartbeat mode, region_uuid passed is NULL.
2321  *
2322  * In local, we only unpin the matching region. In global we unpin all the
2323  * active regions.
2324  */
2325 static void o2hb_region_unpin(const char *region_uuid)
2326 {
2327         struct o2hb_region *reg;
2328         char *uuid;
2329         int found = 0;
2330 
2331         assert_spin_locked(&o2hb_live_lock);
2332 
2333         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2334                 if (reg->hr_item_dropped)
2335                         continue;
2336 
2337                 uuid = config_item_name(&reg->hr_item);
2338                 if (region_uuid) {
2339                         if (strcmp(region_uuid, uuid))
2340                                 continue;
2341                         found = 1;
2342                 }
2343 
2344                 if (reg->hr_item_pinned) {
2345                         mlog(ML_CLUSTER, "Unpin region %s\n", uuid);
2346                         o2nm_undepend_item(&reg->hr_item);
2347                         reg->hr_item_pinned = 0;
2348                 }
2349                 if (found)
2350                         break;
2351         }
2352 }
2353 
2354 static int o2hb_region_inc_user(const char *region_uuid)
2355 {
2356         int ret = 0;
2357 
2358         spin_lock(&o2hb_live_lock);
2359 
2360         /* local heartbeat */
2361         if (!o2hb_global_heartbeat_active()) {
2362             ret = o2hb_region_pin(region_uuid);
2363             goto unlock;
2364         }
2365 
2366         /*
2367          * if global heartbeat active and this is the first dependent user,
2368          * pin all regions if quorum region count <= CUT_OFF
2369          */
2370         o2hb_dependent_users++;
2371         if (o2hb_dependent_users > 1)
2372                 goto unlock;
2373 
2374         if (bitmap_weight(o2hb_quorum_region_bitmap,
2375                            O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2376                 ret = o2hb_region_pin(NULL);
2377 
2378 unlock:
2379         spin_unlock(&o2hb_live_lock);
2380         return ret;
2381 }
2382 
2383 static void o2hb_region_dec_user(const char *region_uuid)
2384 {
2385         spin_lock(&o2hb_live_lock);
2386 
2387         /* local heartbeat */
2388         if (!o2hb_global_heartbeat_active()) {
2389             o2hb_region_unpin(region_uuid);
2390             goto unlock;
2391         }
2392 
2393         /*
2394          * if global heartbeat active and there are no dependent users,
2395          * unpin all quorum regions
2396          */
2397         o2hb_dependent_users--;
2398         if (!o2hb_dependent_users)
2399                 o2hb_region_unpin(NULL);
2400 
2401 unlock:
2402         spin_unlock(&o2hb_live_lock);
2403 }
2404 
2405 int o2hb_register_callback(const char *region_uuid,
2406                            struct o2hb_callback_func *hc)
2407 {
2408         struct o2hb_callback_func *f;
2409         struct o2hb_callback *hbcall;
2410         int ret;
2411 
2412         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2413         BUG_ON(!list_empty(&hc->hc_item));
2414 
2415         hbcall = hbcall_from_type(hc->hc_type);
2416         if (IS_ERR(hbcall)) {
2417                 ret = PTR_ERR(hbcall);
2418                 goto out;
2419         }
2420 
2421         if (region_uuid) {
2422                 ret = o2hb_region_inc_user(region_uuid);
2423                 if (ret) {
2424                         mlog_errno(ret);
2425                         goto out;
2426                 }
2427         }
2428 
2429         down_write(&o2hb_callback_sem);
2430 
2431         list_for_each_entry(f, &hbcall->list, hc_item) {
2432                 if (hc->hc_priority < f->hc_priority) {
2433                         list_add_tail(&hc->hc_item, &f->hc_item);
2434                         break;
2435                 }
2436         }
2437         if (list_empty(&hc->hc_item))
2438                 list_add_tail(&hc->hc_item, &hbcall->list);
2439 
2440         up_write(&o2hb_callback_sem);
2441         ret = 0;
2442 out:
2443         mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n",
2444              ret, __builtin_return_address(0), hc);
2445         return ret;
2446 }
2447 EXPORT_SYMBOL_GPL(o2hb_register_callback);
2448 
2449 void o2hb_unregister_callback(const char *region_uuid,
2450                               struct o2hb_callback_func *hc)
2451 {
2452         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2453 
2454         mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n",
2455              __builtin_return_address(0), hc);
2456 
2457         /* XXX Can this happen _with_ a region reference? */
2458         if (list_empty(&hc->hc_item))
2459                 return;
2460 
2461         if (region_uuid)
2462                 o2hb_region_dec_user(region_uuid);
2463 
2464         down_write(&o2hb_callback_sem);
2465 
2466         list_del_init(&hc->hc_item);
2467 
2468         up_write(&o2hb_callback_sem);
2469 }
2470 EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
2471 
2472 int o2hb_check_node_heartbeating_no_sem(u8 node_num)
2473 {
2474         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2475 
2476         spin_lock(&o2hb_live_lock);
2477         o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2478         spin_unlock(&o2hb_live_lock);
2479         if (!test_bit(node_num, testing_map)) {
2480                 mlog(ML_HEARTBEAT,
2481                      "node (%u) does not have heartbeating enabled.\n",
2482                      node_num);
2483                 return 0;
2484         }
2485 
2486         return 1;
2487 }
2488 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_no_sem);
2489 
2490 int o2hb_check_node_heartbeating_from_callback(u8 node_num)
2491 {
2492         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2493 
2494         o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2495         if (!test_bit(node_num, testing_map)) {
2496                 mlog(ML_HEARTBEAT,
2497                      "node (%u) does not have heartbeating enabled.\n",
2498                      node_num);
2499                 return 0;
2500         }
2501 
2502         return 1;
2503 }
2504 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
2505 
2506 /*
2507  * this is just a hack until we get the plumbing which flips file systems
2508  * read only and drops the hb ref instead of killing the node dead.
2509  */
2510 void o2hb_stop_all_regions(void)
2511 {
2512         struct o2hb_region *reg;
2513 
2514         mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
2515 
2516         spin_lock(&o2hb_live_lock);
2517 
2518         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
2519                 reg->hr_unclean_stop = 1;
2520 
2521         spin_unlock(&o2hb_live_lock);
2522 }
2523 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
2524 
2525 int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
2526 {
2527         struct o2hb_region *reg;
2528         int numregs = 0;
2529         char *p;
2530 
2531         spin_lock(&o2hb_live_lock);
2532 
2533         p = region_uuids;
2534         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2535                 if (reg->hr_item_dropped)
2536                         continue;
2537 
2538                 mlog(0, "Region: %s\n", config_item_name(&reg->hr_item));
2539                 if (numregs < max_regions) {
2540                         memcpy(p, config_item_name(&reg->hr_item),
2541                                O2HB_MAX_REGION_NAME_LEN);
2542                         p += O2HB_MAX_REGION_NAME_LEN;
2543                 }
2544                 numregs++;
2545         }
2546 
2547         spin_unlock(&o2hb_live_lock);
2548 
2549         return numregs;
2550 }
2551 EXPORT_SYMBOL_GPL(o2hb_get_all_regions);
2552 
2553 int o2hb_global_heartbeat_active(void)
2554 {
2555         return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL);
2556 }
2557 EXPORT_SYMBOL(o2hb_global_heartbeat_active);

/* [<][>][^][v][top][bottom][index][help] */