root/fs/dlm/rcom.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. rcom_response
  2. create_rcom
  3. send_rcom
  4. set_rcom_status
  5. set_rcom_config
  6. check_rcom_config
  7. allow_sync_reply
  8. disallow_sync_reply
  9. dlm_rcom_status
  10. receive_rcom_status
  11. receive_sync_reply
  12. dlm_rcom_names
  13. receive_rcom_names
  14. dlm_send_rcom_lookup
  15. receive_rcom_lookup
  16. receive_rcom_lookup_reply
  17. pack_rcom_lock
  18. dlm_send_rcom_lock
  19. receive_rcom_lock
  20. dlm_send_ls_not_ready
  21. dlm_receive_rcom

   1 // SPDX-License-Identifier: GPL-2.0-only
   2 /******************************************************************************
   3 *******************************************************************************
   4 **
   5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
   7 **
   8 **
   9 *******************************************************************************
  10 ******************************************************************************/
  11 
  12 #include "dlm_internal.h"
  13 #include "lockspace.h"
  14 #include "member.h"
  15 #include "lowcomms.h"
  16 #include "midcomms.h"
  17 #include "rcom.h"
  18 #include "recover.h"
  19 #include "dir.h"
  20 #include "config.h"
  21 #include "memory.h"
  22 #include "lock.h"
  23 #include "util.h"
  24 
  25 static int rcom_response(struct dlm_ls *ls)
  26 {
  27         return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
  28 }
  29 
  30 static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
  31                        struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
  32 {
  33         struct dlm_rcom *rc;
  34         struct dlm_mhandle *mh;
  35         char *mb;
  36         int mb_len = sizeof(struct dlm_rcom) + len;
  37 
  38         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
  39         if (!mh) {
  40                 log_print("create_rcom to %d type %d len %d ENOBUFS",
  41                           to_nodeid, type, len);
  42                 return -ENOBUFS;
  43         }
  44         memset(mb, 0, mb_len);
  45 
  46         rc = (struct dlm_rcom *) mb;
  47 
  48         rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
  49         rc->rc_header.h_lockspace = ls->ls_global_id;
  50         rc->rc_header.h_nodeid = dlm_our_nodeid();
  51         rc->rc_header.h_length = mb_len;
  52         rc->rc_header.h_cmd = DLM_RCOM;
  53 
  54         rc->rc_type = type;
  55 
  56         spin_lock(&ls->ls_recover_lock);
  57         rc->rc_seq = ls->ls_recover_seq;
  58         spin_unlock(&ls->ls_recover_lock);
  59 
  60         *mh_ret = mh;
  61         *rc_ret = rc;
  62         return 0;
  63 }
  64 
  65 static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
  66                       struct dlm_rcom *rc)
  67 {
  68         dlm_rcom_out(rc);
  69         dlm_lowcomms_commit_buffer(mh);
  70 }
  71 
  72 static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
  73                             uint32_t flags)
  74 {
  75         rs->rs_flags = cpu_to_le32(flags);
  76 }
  77 
  78 /* When replying to a status request, a node also sends back its
  79    configuration values.  The requesting node then checks that the remote
  80    node is configured the same way as itself. */
  81 
  82 static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
  83                             uint32_t num_slots)
  84 {
  85         rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
  86         rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
  87 
  88         rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
  89         rf->rf_num_slots = cpu_to_le16(num_slots);
  90         rf->rf_generation =  cpu_to_le32(ls->ls_generation);
  91 }
  92 
  93 static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
  94 {
  95         struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
  96 
  97         if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
  98                 log_error(ls, "version mismatch: %x nodeid %d: %x",
  99                           DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
 100                           rc->rc_header.h_version);
 101                 return -EPROTO;
 102         }
 103 
 104         if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
 105             le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
 106                 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
 107                           ls->ls_lvblen, ls->ls_exflags, nodeid,
 108                           le32_to_cpu(rf->rf_lvblen),
 109                           le32_to_cpu(rf->rf_lsflags));
 110                 return -EPROTO;
 111         }
 112         return 0;
 113 }
 114 
 115 static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
 116 {
 117         spin_lock(&ls->ls_rcom_spin);
 118         *new_seq = ++ls->ls_rcom_seq;
 119         set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 120         spin_unlock(&ls->ls_rcom_spin);
 121 }
 122 
 123 static void disallow_sync_reply(struct dlm_ls *ls)
 124 {
 125         spin_lock(&ls->ls_rcom_spin);
 126         clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 127         clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
 128         spin_unlock(&ls->ls_rcom_spin);
 129 }
 130 
 131 /*
 132  * low nodeid gathers one slot value at a time from each node.
 133  * it sets need_slots=0, and saves rf_our_slot returned from each
 134  * rcom_config.
 135  *
 136  * other nodes gather all slot values at once from the low nodeid.
 137  * they set need_slots=1, and ignore the rf_our_slot returned from each
 138  * rcom_config.  they use the rf_num_slots returned from the low
 139  * node's rcom_config.
 140  */
 141 
 142 int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
 143 {
 144         struct dlm_rcom *rc;
 145         struct dlm_mhandle *mh;
 146         int error = 0;
 147 
 148         ls->ls_recover_nodeid = nodeid;
 149 
 150         if (nodeid == dlm_our_nodeid()) {
 151                 rc = ls->ls_recover_buf;
 152                 rc->rc_result = dlm_recover_status(ls);
 153                 goto out;
 154         }
 155 
 156 retry:
 157         error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
 158                             sizeof(struct rcom_status), &rc, &mh);
 159         if (error)
 160                 goto out;
 161 
 162         set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
 163 
 164         allow_sync_reply(ls, &rc->rc_id);
 165         memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
 166 
 167         send_rcom(ls, mh, rc);
 168 
 169         error = dlm_wait_function(ls, &rcom_response);
 170         disallow_sync_reply(ls);
 171         if (error == -ETIMEDOUT)
 172                 goto retry;
 173         if (error)
 174                 goto out;
 175 
 176         rc = ls->ls_recover_buf;
 177 
 178         if (rc->rc_result == -ESRCH) {
 179                 /* we pretend the remote lockspace exists with 0 status */
 180                 log_debug(ls, "remote node %d not ready", nodeid);
 181                 rc->rc_result = 0;
 182                 error = 0;
 183         } else {
 184                 error = check_rcom_config(ls, rc, nodeid);
 185         }
 186 
 187         /* the caller looks at rc_result for the remote recovery status */
 188  out:
 189         return error;
 190 }
 191 
 192 static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 193 {
 194         struct dlm_rcom *rc;
 195         struct dlm_mhandle *mh;
 196         struct rcom_status *rs;
 197         uint32_t status;
 198         int nodeid = rc_in->rc_header.h_nodeid;
 199         int len = sizeof(struct rcom_config);
 200         int num_slots = 0;
 201         int error;
 202 
 203         if (!dlm_slots_version(&rc_in->rc_header)) {
 204                 status = dlm_recover_status(ls);
 205                 goto do_create;
 206         }
 207 
 208         rs = (struct rcom_status *)rc_in->rc_buf;
 209 
 210         if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
 211                 status = dlm_recover_status(ls);
 212                 goto do_create;
 213         }
 214 
 215         spin_lock(&ls->ls_recover_lock);
 216         status = ls->ls_recover_status;
 217         num_slots = ls->ls_num_slots;
 218         spin_unlock(&ls->ls_recover_lock);
 219         len += num_slots * sizeof(struct rcom_slot);
 220 
 221  do_create:
 222         error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
 223                             len, &rc, &mh);
 224         if (error)
 225                 return;
 226 
 227         rc->rc_id = rc_in->rc_id;
 228         rc->rc_seq_reply = rc_in->rc_seq;
 229         rc->rc_result = status;
 230 
 231         set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
 232 
 233         if (!num_slots)
 234                 goto do_send;
 235 
 236         spin_lock(&ls->ls_recover_lock);
 237         if (ls->ls_num_slots != num_slots) {
 238                 spin_unlock(&ls->ls_recover_lock);
 239                 log_debug(ls, "receive_rcom_status num_slots %d to %d",
 240                           num_slots, ls->ls_num_slots);
 241                 rc->rc_result = 0;
 242                 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
 243                 goto do_send;
 244         }
 245 
 246         dlm_slots_copy_out(ls, rc);
 247         spin_unlock(&ls->ls_recover_lock);
 248 
 249  do_send:
 250         send_rcom(ls, mh, rc);
 251 }
 252 
 253 static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 254 {
 255         spin_lock(&ls->ls_rcom_spin);
 256         if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
 257             rc_in->rc_id != ls->ls_rcom_seq) {
 258                 log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
 259                           rc_in->rc_type, rc_in->rc_header.h_nodeid,
 260                           (unsigned long long)rc_in->rc_id,
 261                           (unsigned long long)ls->ls_rcom_seq);
 262                 goto out;
 263         }
 264         memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
 265         set_bit(LSFL_RCOM_READY, &ls->ls_flags);
 266         clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
 267         wake_up(&ls->ls_wait_general);
 268  out:
 269         spin_unlock(&ls->ls_rcom_spin);
 270 }
 271 
 272 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
 273 {
 274         struct dlm_rcom *rc;
 275         struct dlm_mhandle *mh;
 276         int error = 0;
 277 
 278         ls->ls_recover_nodeid = nodeid;
 279 
 280 retry:
 281         error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
 282         if (error)
 283                 goto out;
 284         memcpy(rc->rc_buf, last_name, last_len);
 285 
 286         allow_sync_reply(ls, &rc->rc_id);
 287         memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
 288 
 289         send_rcom(ls, mh, rc);
 290 
 291         error = dlm_wait_function(ls, &rcom_response);
 292         disallow_sync_reply(ls);
 293         if (error == -ETIMEDOUT)
 294                 goto retry;
 295  out:
 296         return error;
 297 }
 298 
 299 static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 300 {
 301         struct dlm_rcom *rc;
 302         struct dlm_mhandle *mh;
 303         int error, inlen, outlen, nodeid;
 304 
 305         nodeid = rc_in->rc_header.h_nodeid;
 306         inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
 307         outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
 308 
 309         error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
 310         if (error)
 311                 return;
 312         rc->rc_id = rc_in->rc_id;
 313         rc->rc_seq_reply = rc_in->rc_seq;
 314 
 315         dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
 316                               nodeid);
 317         send_rcom(ls, mh, rc);
 318 }
 319 
 320 int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
 321 {
 322         struct dlm_rcom *rc;
 323         struct dlm_mhandle *mh;
 324         struct dlm_ls *ls = r->res_ls;
 325         int error;
 326 
 327         error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
 328                             &rc, &mh);
 329         if (error)
 330                 goto out;
 331         memcpy(rc->rc_buf, r->res_name, r->res_length);
 332         rc->rc_id = (unsigned long) r->res_id;
 333 
 334         send_rcom(ls, mh, rc);
 335  out:
 336         return error;
 337 }
 338 
 339 static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 340 {
 341         struct dlm_rcom *rc;
 342         struct dlm_mhandle *mh;
 343         int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
 344         int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
 345 
 346         error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
 347         if (error)
 348                 return;
 349 
 350         /* Old code would send this special id to trigger a debug dump. */
 351         if (rc_in->rc_id == 0xFFFFFFFF) {
 352                 log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
 353                 dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
 354                 return;
 355         }
 356 
 357         error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
 358                                   DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
 359         if (error)
 360                 ret_nodeid = error;
 361         rc->rc_result = ret_nodeid;
 362         rc->rc_id = rc_in->rc_id;
 363         rc->rc_seq_reply = rc_in->rc_seq;
 364 
 365         send_rcom(ls, mh, rc);
 366 }
 367 
 368 static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 369 {
 370         dlm_recover_master_reply(ls, rc_in);
 371 }
 372 
 373 static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
 374                            struct rcom_lock *rl)
 375 {
 376         memset(rl, 0, sizeof(*rl));
 377 
 378         rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
 379         rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
 380         rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
 381         rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
 382         rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
 383         rl->rl_rqmode = lkb->lkb_rqmode;
 384         rl->rl_grmode = lkb->lkb_grmode;
 385         rl->rl_status = lkb->lkb_status;
 386         rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
 387 
 388         if (lkb->lkb_bastfn)
 389                 rl->rl_asts |= DLM_CB_BAST;
 390         if (lkb->lkb_astfn)
 391                 rl->rl_asts |= DLM_CB_CAST;
 392 
 393         rl->rl_namelen = cpu_to_le16(r->res_length);
 394         memcpy(rl->rl_name, r->res_name, r->res_length);
 395 
 396         /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
 397            If so, receive_rcom_lock_args() won't take this copy. */
 398 
 399         if (lkb->lkb_lvbptr)
 400                 memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
 401 }
 402 
 403 int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 404 {
 405         struct dlm_ls *ls = r->res_ls;
 406         struct dlm_rcom *rc;
 407         struct dlm_mhandle *mh;
 408         struct rcom_lock *rl;
 409         int error, len = sizeof(struct rcom_lock);
 410 
 411         if (lkb->lkb_lvbptr)
 412                 len += ls->ls_lvblen;
 413 
 414         error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
 415         if (error)
 416                 goto out;
 417 
 418         rl = (struct rcom_lock *) rc->rc_buf;
 419         pack_rcom_lock(r, lkb, rl);
 420         rc->rc_id = (unsigned long) r;
 421 
 422         send_rcom(ls, mh, rc);
 423  out:
 424         return error;
 425 }
 426 
 427 /* needs at least dlm_rcom + rcom_lock */
 428 static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 429 {
 430         struct dlm_rcom *rc;
 431         struct dlm_mhandle *mh;
 432         int error, nodeid = rc_in->rc_header.h_nodeid;
 433 
 434         dlm_recover_master_copy(ls, rc_in);
 435 
 436         error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
 437                             sizeof(struct rcom_lock), &rc, &mh);
 438         if (error)
 439                 return;
 440 
 441         /* We send back the same rcom_lock struct we received, but
 442            dlm_recover_master_copy() has filled in rl_remid and rl_result */
 443 
 444         memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
 445         rc->rc_id = rc_in->rc_id;
 446         rc->rc_seq_reply = rc_in->rc_seq;
 447 
 448         send_rcom(ls, mh, rc);
 449 }
 450 
 451 /* If the lockspace doesn't exist then still send a status message
 452    back; it's possible that it just doesn't have its global_id yet. */
 453 
 454 int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
 455 {
 456         struct dlm_rcom *rc;
 457         struct rcom_config *rf;
 458         struct dlm_mhandle *mh;
 459         char *mb;
 460         int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
 461 
 462         mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
 463         if (!mh)
 464                 return -ENOBUFS;
 465         memset(mb, 0, mb_len);
 466 
 467         rc = (struct dlm_rcom *) mb;
 468 
 469         rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
 470         rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
 471         rc->rc_header.h_nodeid = dlm_our_nodeid();
 472         rc->rc_header.h_length = mb_len;
 473         rc->rc_header.h_cmd = DLM_RCOM;
 474 
 475         rc->rc_type = DLM_RCOM_STATUS_REPLY;
 476         rc->rc_id = rc_in->rc_id;
 477         rc->rc_seq_reply = rc_in->rc_seq;
 478         rc->rc_result = -ESRCH;
 479 
 480         rf = (struct rcom_config *) rc->rc_buf;
 481         rf->rf_lvblen = cpu_to_le32(~0U);
 482 
 483         dlm_rcom_out(rc);
 484         dlm_lowcomms_commit_buffer(mh);
 485 
 486         return 0;
 487 }
 488 
 489 /*
 490  * Ignore messages for stage Y before we set
 491  * recover_status bit for stage X:
 492  *
 493  * recover_status = 0
 494  *
 495  * dlm_recover_members()
 496  * - send nothing
 497  * - recv nothing
 498  * - ignore NAMES, NAMES_REPLY
 499  * - ignore LOOKUP, LOOKUP_REPLY
 500  * - ignore LOCK, LOCK_REPLY
 501  *
 502  * recover_status |= NODES
 503  *
 504  * dlm_recover_members_wait()
 505  *
 506  * dlm_recover_directory()
 507  * - send NAMES
 508  * - recv NAMES_REPLY
 509  * - ignore LOOKUP, LOOKUP_REPLY
 510  * - ignore LOCK, LOCK_REPLY
 511  *
 512  * recover_status |= DIR
 513  *
 514  * dlm_recover_directory_wait()
 515  *
 516  * dlm_recover_masters()
 517  * - send LOOKUP
 518  * - recv LOOKUP_REPLY
 519  *
 520  * dlm_recover_locks()
 521  * - send LOCKS
 522  * - recv LOCKS_REPLY
 523  *
 524  * recover_status |= LOCKS
 525  *
 526  * dlm_recover_locks_wait()
 527  *
 528  * recover_status |= DONE
 529  */
 530 
 531 /* Called by dlm_recv; corresponds to dlm_receive_message() but special
 532    recovery-only comms are sent through here. */
 533 
 534 void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 535 {
 536         int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
 537         int stop, reply = 0, names = 0, lookup = 0, lock = 0;
 538         uint32_t status;
 539         uint64_t seq;
 540 
 541         switch (rc->rc_type) {
 542         case DLM_RCOM_STATUS_REPLY:
 543                 reply = 1;
 544                 break;
 545         case DLM_RCOM_NAMES:
 546                 names = 1;
 547                 break;
 548         case DLM_RCOM_NAMES_REPLY:
 549                 names = 1;
 550                 reply = 1;
 551                 break;
 552         case DLM_RCOM_LOOKUP:
 553                 lookup = 1;
 554                 break;
 555         case DLM_RCOM_LOOKUP_REPLY:
 556                 lookup = 1;
 557                 reply = 1;
 558                 break;
 559         case DLM_RCOM_LOCK:
 560                 lock = 1;
 561                 break;
 562         case DLM_RCOM_LOCK_REPLY:
 563                 lock = 1;
 564                 reply = 1;
 565                 break;
 566         };
 567 
 568         spin_lock(&ls->ls_recover_lock);
 569         status = ls->ls_recover_status;
 570         stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 571         seq = ls->ls_recover_seq;
 572         spin_unlock(&ls->ls_recover_lock);
 573 
 574         if (stop && (rc->rc_type != DLM_RCOM_STATUS))
 575                 goto ignore;
 576 
 577         if (reply && (rc->rc_seq_reply != seq))
 578                 goto ignore;
 579 
 580         if (!(status & DLM_RS_NODES) && (names || lookup || lock))
 581                 goto ignore;
 582 
 583         if (!(status & DLM_RS_DIR) && (lookup || lock))
 584                 goto ignore;
 585 
 586         switch (rc->rc_type) {
 587         case DLM_RCOM_STATUS:
 588                 receive_rcom_status(ls, rc);
 589                 break;
 590 
 591         case DLM_RCOM_NAMES:
 592                 receive_rcom_names(ls, rc);
 593                 break;
 594 
 595         case DLM_RCOM_LOOKUP:
 596                 receive_rcom_lookup(ls, rc);
 597                 break;
 598 
 599         case DLM_RCOM_LOCK:
 600                 if (rc->rc_header.h_length < lock_size)
 601                         goto Eshort;
 602                 receive_rcom_lock(ls, rc);
 603                 break;
 604 
 605         case DLM_RCOM_STATUS_REPLY:
 606                 receive_sync_reply(ls, rc);
 607                 break;
 608 
 609         case DLM_RCOM_NAMES_REPLY:
 610                 receive_sync_reply(ls, rc);
 611                 break;
 612 
 613         case DLM_RCOM_LOOKUP_REPLY:
 614                 receive_rcom_lookup_reply(ls, rc);
 615                 break;
 616 
 617         case DLM_RCOM_LOCK_REPLY:
 618                 if (rc->rc_header.h_length < lock_size)
 619                         goto Eshort;
 620                 dlm_recover_process_copy(ls, rc);
 621                 break;
 622 
 623         default:
 624                 log_error(ls, "receive_rcom bad type %d", rc->rc_type);
 625         }
 626         return;
 627 
 628 ignore:
 629         log_limit(ls, "dlm_receive_rcom ignore msg %d "
 630                   "from %d %llu %llu recover seq %llu sts %x gen %u",
 631                    rc->rc_type,
 632                    nodeid,
 633                    (unsigned long long)rc->rc_seq,
 634                    (unsigned long long)rc->rc_seq_reply,
 635                    (unsigned long long)seq,
 636                    status, ls->ls_generation);
 637         return;
 638 Eshort:
 639         log_error(ls, "recovery message %d from %d is too short",
 640                   rc->rc_type, nodeid);
 641 }
 642 

/* [<][>][^][v][top][bottom][index][help] */