root/net/ceph/mon_client.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. ceph_monmap_decode
  2. ceph_monmap_contains
  3. __send_prepared_auth_request
  4. __close_session
  5. pick_new_mon
  6. __open_session
  7. reopen_session
  8. ceph_monc_reopen_session
  9. un_backoff
  10. __schedule_delayed
  11. __send_subscribe
  12. handle_subscribe_ack
  13. __ceph_monc_want_map
  14. ceph_monc_want_map
  15. __ceph_monc_got_map
  16. ceph_monc_got_map
  17. ceph_monc_renew_subs
  18. ceph_monc_wait_osdmap
  19. ceph_monc_open_session
  20. ceph_monc_handle_map
  21. DEFINE_RB_FUNCS
  22. put_generic_request
  23. get_generic_request
  24. alloc_generic_request
  25. register_generic_request
  26. send_generic_request
  27. __finish_generic_request
  28. finish_generic_request
  29. complete_generic_request
  30. cancel_generic_request
  31. wait_generic_request
  32. get_generic_reply
  33. handle_statfs_reply
  34. ceph_monc_do_statfs
  35. handle_get_version_reply
  36. __ceph_monc_get_version
  37. ceph_monc_get_version
  38. ceph_monc_get_version_async
  39. handle_command_ack
  40. ceph_monc_blacklist_add
  41. __resend_generic_request
  42. delayed_work
  43. build_initial_monmap
  44. ceph_monc_init
  45. ceph_monc_stop
  46. finish_hunting
  47. handle_auth_reply
  48. __validate_auth
  49. ceph_monc_validate_auth
  50. dispatch
  51. mon_alloc_msg
  52. mon_fault
  53. con_get
  54. con_put

   1 // SPDX-License-Identifier: GPL-2.0
   2 #include <linux/ceph/ceph_debug.h>
   3 
   4 #include <linux/module.h>
   5 #include <linux/types.h>
   6 #include <linux/slab.h>
   7 #include <linux/random.h>
   8 #include <linux/sched.h>
   9 
  10 #include <linux/ceph/ceph_features.h>
  11 #include <linux/ceph/mon_client.h>
  12 #include <linux/ceph/libceph.h>
  13 #include <linux/ceph/debugfs.h>
  14 #include <linux/ceph/decode.h>
  15 #include <linux/ceph/auth.h>
  16 
  17 /*
  18  * Interact with Ceph monitor cluster.  Handle requests for new map
  19  * versions, and periodically resend as needed.  Also implement
  20  * statfs() and umount().
  21  *
  22  * A small cluster of Ceph "monitors" are responsible for managing critical
  23  * cluster configuration and state information.  An odd number (e.g., 3, 5)
  24  * of cmon daemons use a modified version of the Paxos part-time parliament
  25  * algorithm to manage the MDS map (mds cluster membership), OSD map, and
  26  * list of clients who have mounted the file system.
  27  *
  28  * We maintain an open, active session with a monitor at all times in order to
  29  * receive timely MDSMap updates.  We periodically send a keepalive byte on the
  30  * TCP socket to ensure we detect a failure.  If the connection does break, we
  31  * randomly hunt for a new monitor.  Once the connection is reestablished, we
  32  * resend any outstanding requests.
  33  */
  34 
  35 static const struct ceph_connection_operations mon_con_ops;
  36 
  37 static int __validate_auth(struct ceph_mon_client *monc);
  38 
  39 /*
  40  * Decode a monmap blob (e.g., during mount).
  41  */
  42 static struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
  43 {
  44         struct ceph_monmap *m = NULL;
  45         int i, err = -EINVAL;
  46         struct ceph_fsid fsid;
  47         u32 epoch, num_mon;
  48         u32 len;
  49 
  50         ceph_decode_32_safe(&p, end, len, bad);
  51         ceph_decode_need(&p, end, len, bad);
  52 
  53         dout("monmap_decode %p %p len %d (%d)\n", p, end, len, (int)(end-p));
  54         p += sizeof(u16);  /* skip version */
  55 
  56         ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
  57         ceph_decode_copy(&p, &fsid, sizeof(fsid));
  58         epoch = ceph_decode_32(&p);
  59 
  60         num_mon = ceph_decode_32(&p);
  61 
  62         if (num_mon > CEPH_MAX_MON)
  63                 goto bad;
  64         m = kmalloc(struct_size(m, mon_inst, num_mon), GFP_NOFS);
  65         if (m == NULL)
  66                 return ERR_PTR(-ENOMEM);
  67         m->fsid = fsid;
  68         m->epoch = epoch;
  69         m->num_mon = num_mon;
  70         for (i = 0; i < num_mon; ++i) {
  71                 struct ceph_entity_inst *inst = &m->mon_inst[i];
  72 
  73                 /* copy name portion */
  74                 ceph_decode_copy_safe(&p, end, &inst->name,
  75                                         sizeof(inst->name), bad);
  76                 err = ceph_decode_entity_addr(&p, end, &inst->addr);
  77                 if (err)
  78                         goto bad;
  79         }
  80         dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
  81              m->num_mon);
  82         for (i = 0; i < m->num_mon; i++)
  83                 dout("monmap_decode  mon%d is %s\n", i,
  84                      ceph_pr_addr(&m->mon_inst[i].addr));
  85         return m;
  86 bad:
  87         dout("monmap_decode failed with %d\n", err);
  88         kfree(m);
  89         return ERR_PTR(err);
  90 }
  91 
  92 /*
  93  * return true if *addr is included in the monmap.
  94  */
  95 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
  96 {
  97         int i;
  98 
  99         for (i = 0; i < m->num_mon; i++)
 100                 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
 101                         return 1;
 102         return 0;
 103 }
 104 
 105 /*
 106  * Send an auth request.
 107  */
 108 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
 109 {
 110         monc->pending_auth = 1;
 111         monc->m_auth->front.iov_len = len;
 112         monc->m_auth->hdr.front_len = cpu_to_le32(len);
 113         ceph_msg_revoke(monc->m_auth);
 114         ceph_msg_get(monc->m_auth);  /* keep our ref */
 115         ceph_con_send(&monc->con, monc->m_auth);
 116 }
 117 
 118 /*
 119  * Close monitor session, if any.
 120  */
 121 static void __close_session(struct ceph_mon_client *monc)
 122 {
 123         dout("__close_session closing mon%d\n", monc->cur_mon);
 124         ceph_msg_revoke(monc->m_auth);
 125         ceph_msg_revoke_incoming(monc->m_auth_reply);
 126         ceph_msg_revoke(monc->m_subscribe);
 127         ceph_msg_revoke_incoming(monc->m_subscribe_ack);
 128         ceph_con_close(&monc->con);
 129 
 130         monc->pending_auth = 0;
 131         ceph_auth_reset(monc->auth);
 132 }
 133 
 134 /*
 135  * Pick a new monitor at random and set cur_mon.  If we are repicking
 136  * (i.e. cur_mon is already set), be sure to pick a different one.
 137  */
 138 static void pick_new_mon(struct ceph_mon_client *monc)
 139 {
 140         int old_mon = monc->cur_mon;
 141 
 142         BUG_ON(monc->monmap->num_mon < 1);
 143 
 144         if (monc->monmap->num_mon == 1) {
 145                 monc->cur_mon = 0;
 146         } else {
 147                 int max = monc->monmap->num_mon;
 148                 int o = -1;
 149                 int n;
 150 
 151                 if (monc->cur_mon >= 0) {
 152                         if (monc->cur_mon < monc->monmap->num_mon)
 153                                 o = monc->cur_mon;
 154                         if (o >= 0)
 155                                 max--;
 156                 }
 157 
 158                 n = prandom_u32() % max;
 159                 if (o >= 0 && n >= o)
 160                         n++;
 161 
 162                 monc->cur_mon = n;
 163         }
 164 
 165         dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
 166              monc->cur_mon, monc->monmap->num_mon);
 167 }
 168 
 169 /*
 170  * Open a session with a new monitor.
 171  */
 172 static void __open_session(struct ceph_mon_client *monc)
 173 {
 174         int ret;
 175 
 176         pick_new_mon(monc);
 177 
 178         monc->hunting = true;
 179         if (monc->had_a_connection) {
 180                 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
 181                 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
 182                         monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
 183         }
 184 
 185         monc->sub_renew_after = jiffies; /* i.e., expired */
 186         monc->sub_renew_sent = 0;
 187 
 188         dout("%s opening mon%d\n", __func__, monc->cur_mon);
 189         ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
 190                       &monc->monmap->mon_inst[monc->cur_mon].addr);
 191 
 192         /*
 193          * send an initial keepalive to ensure our timestamp is valid
 194          * by the time we are in an OPENED state
 195          */
 196         ceph_con_keepalive(&monc->con);
 197 
 198         /* initiate authentication handshake */
 199         ret = ceph_auth_build_hello(monc->auth,
 200                                     monc->m_auth->front.iov_base,
 201                                     monc->m_auth->front_alloc_len);
 202         BUG_ON(ret <= 0);
 203         __send_prepared_auth_request(monc, ret);
 204 }
 205 
 206 static void reopen_session(struct ceph_mon_client *monc)
 207 {
 208         if (!monc->hunting)
 209                 pr_info("mon%d %s session lost, hunting for new mon\n",
 210                     monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr));
 211 
 212         __close_session(monc);
 213         __open_session(monc);
 214 }
 215 
 216 void ceph_monc_reopen_session(struct ceph_mon_client *monc)
 217 {
 218         mutex_lock(&monc->mutex);
 219         reopen_session(monc);
 220         mutex_unlock(&monc->mutex);
 221 }
 222 
 223 static void un_backoff(struct ceph_mon_client *monc)
 224 {
 225         monc->hunt_mult /= 2; /* reduce by 50% */
 226         if (monc->hunt_mult < 1)
 227                 monc->hunt_mult = 1;
 228         dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult);
 229 }
 230 
 231 /*
 232  * Reschedule delayed work timer.
 233  */
 234 static void __schedule_delayed(struct ceph_mon_client *monc)
 235 {
 236         unsigned long delay;
 237 
 238         if (monc->hunting)
 239                 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
 240         else
 241                 delay = CEPH_MONC_PING_INTERVAL;
 242 
 243         dout("__schedule_delayed after %lu\n", delay);
 244         mod_delayed_work(system_wq, &monc->delayed_work,
 245                          round_jiffies_relative(delay));
 246 }
 247 
 248 const char *ceph_sub_str[] = {
 249         [CEPH_SUB_MONMAP] = "monmap",
 250         [CEPH_SUB_OSDMAP] = "osdmap",
 251         [CEPH_SUB_FSMAP]  = "fsmap.user",
 252         [CEPH_SUB_MDSMAP] = "mdsmap",
 253 };
 254 
 255 /*
 256  * Send subscribe request for one or more maps, according to
 257  * monc->subs.
 258  */
 259 static void __send_subscribe(struct ceph_mon_client *monc)
 260 {
 261         struct ceph_msg *msg = monc->m_subscribe;
 262         void *p = msg->front.iov_base;
 263         void *const end = p + msg->front_alloc_len;
 264         int num = 0;
 265         int i;
 266 
 267         dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
 268 
 269         BUG_ON(monc->cur_mon < 0);
 270 
 271         if (!monc->sub_renew_sent)
 272                 monc->sub_renew_sent = jiffies | 1; /* never 0 */
 273 
 274         msg->hdr.version = cpu_to_le16(2);
 275 
 276         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
 277                 if (monc->subs[i].want)
 278                         num++;
 279         }
 280         BUG_ON(num < 1); /* monmap sub is always there */
 281         ceph_encode_32(&p, num);
 282         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
 283                 char buf[32];
 284                 int len;
 285 
 286                 if (!monc->subs[i].want)
 287                         continue;
 288 
 289                 len = sprintf(buf, "%s", ceph_sub_str[i]);
 290                 if (i == CEPH_SUB_MDSMAP &&
 291                     monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
 292                         len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
 293 
 294                 dout("%s %s start %llu flags 0x%x\n", __func__, buf,
 295                      le64_to_cpu(monc->subs[i].item.start),
 296                      monc->subs[i].item.flags);
 297                 ceph_encode_string(&p, end, buf, len);
 298                 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
 299                 p += sizeof(monc->subs[i].item);
 300         }
 301 
 302         BUG_ON(p > end);
 303         msg->front.iov_len = p - msg->front.iov_base;
 304         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 305         ceph_msg_revoke(msg);
 306         ceph_con_send(&monc->con, ceph_msg_get(msg));
 307 }
 308 
 309 static void handle_subscribe_ack(struct ceph_mon_client *monc,
 310                                  struct ceph_msg *msg)
 311 {
 312         unsigned int seconds;
 313         struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
 314 
 315         if (msg->front.iov_len < sizeof(*h))
 316                 goto bad;
 317         seconds = le32_to_cpu(h->duration);
 318 
 319         mutex_lock(&monc->mutex);
 320         if (monc->sub_renew_sent) {
 321                 /*
 322                  * This is only needed for legacy (infernalis or older)
 323                  * MONs -- see delayed_work().
 324                  */
 325                 monc->sub_renew_after = monc->sub_renew_sent +
 326                                             (seconds >> 1) * HZ - 1;
 327                 dout("%s sent %lu duration %d renew after %lu\n", __func__,
 328                      monc->sub_renew_sent, seconds, monc->sub_renew_after);
 329                 monc->sub_renew_sent = 0;
 330         } else {
 331                 dout("%s sent %lu renew after %lu, ignoring\n", __func__,
 332                      monc->sub_renew_sent, monc->sub_renew_after);
 333         }
 334         mutex_unlock(&monc->mutex);
 335         return;
 336 bad:
 337         pr_err("got corrupt subscribe-ack msg\n");
 338         ceph_msg_dump(msg);
 339 }
 340 
 341 /*
 342  * Register interest in a map
 343  *
 344  * @sub: one of CEPH_SUB_*
 345  * @epoch: X for "every map since X", or 0 for "just the latest"
 346  */
 347 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
 348                                  u32 epoch, bool continuous)
 349 {
 350         __le64 start = cpu_to_le64(epoch);
 351         u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
 352 
 353         dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
 354              epoch, continuous);
 355 
 356         if (monc->subs[sub].want &&
 357             monc->subs[sub].item.start == start &&
 358             monc->subs[sub].item.flags == flags)
 359                 return false;
 360 
 361         monc->subs[sub].item.start = start;
 362         monc->subs[sub].item.flags = flags;
 363         monc->subs[sub].want = true;
 364 
 365         return true;
 366 }
 367 
 368 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
 369                         bool continuous)
 370 {
 371         bool need_request;
 372 
 373         mutex_lock(&monc->mutex);
 374         need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
 375         mutex_unlock(&monc->mutex);
 376 
 377         return need_request;
 378 }
 379 EXPORT_SYMBOL(ceph_monc_want_map);
 380 
 381 /*
 382  * Keep track of which maps we have
 383  *
 384  * @sub: one of CEPH_SUB_*
 385  */
 386 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
 387                                 u32 epoch)
 388 {
 389         dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
 390 
 391         if (monc->subs[sub].want) {
 392                 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
 393                         monc->subs[sub].want = false;
 394                 else
 395                         monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
 396         }
 397 
 398         monc->subs[sub].have = epoch;
 399 }
 400 
 401 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
 402 {
 403         mutex_lock(&monc->mutex);
 404         __ceph_monc_got_map(monc, sub, epoch);
 405         mutex_unlock(&monc->mutex);
 406 }
 407 EXPORT_SYMBOL(ceph_monc_got_map);
 408 
 409 void ceph_monc_renew_subs(struct ceph_mon_client *monc)
 410 {
 411         mutex_lock(&monc->mutex);
 412         __send_subscribe(monc);
 413         mutex_unlock(&monc->mutex);
 414 }
 415 EXPORT_SYMBOL(ceph_monc_renew_subs);
 416 
 417 /*
 418  * Wait for an osdmap with a given epoch.
 419  *
 420  * @epoch: epoch to wait for
 421  * @timeout: in jiffies, 0 means "wait forever"
 422  */
 423 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
 424                           unsigned long timeout)
 425 {
 426         unsigned long started = jiffies;
 427         long ret;
 428 
 429         mutex_lock(&monc->mutex);
 430         while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
 431                 mutex_unlock(&monc->mutex);
 432 
 433                 if (timeout && time_after_eq(jiffies, started + timeout))
 434                         return -ETIMEDOUT;
 435 
 436                 ret = wait_event_interruptible_timeout(monc->client->auth_wq,
 437                                      monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
 438                                      ceph_timeout_jiffies(timeout));
 439                 if (ret < 0)
 440                         return ret;
 441 
 442                 mutex_lock(&monc->mutex);
 443         }
 444 
 445         mutex_unlock(&monc->mutex);
 446         return 0;
 447 }
 448 EXPORT_SYMBOL(ceph_monc_wait_osdmap);
 449 
 450 /*
 451  * Open a session with a random monitor.  Request monmap and osdmap,
 452  * which are waited upon in __ceph_open_session().
 453  */
 454 int ceph_monc_open_session(struct ceph_mon_client *monc)
 455 {
 456         mutex_lock(&monc->mutex);
 457         __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
 458         __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
 459         __open_session(monc);
 460         __schedule_delayed(monc);
 461         mutex_unlock(&monc->mutex);
 462         return 0;
 463 }
 464 EXPORT_SYMBOL(ceph_monc_open_session);
 465 
 466 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
 467                                  struct ceph_msg *msg)
 468 {
 469         struct ceph_client *client = monc->client;
 470         struct ceph_monmap *monmap = NULL, *old = monc->monmap;
 471         void *p, *end;
 472 
 473         mutex_lock(&monc->mutex);
 474 
 475         dout("handle_monmap\n");
 476         p = msg->front.iov_base;
 477         end = p + msg->front.iov_len;
 478 
 479         monmap = ceph_monmap_decode(p, end);
 480         if (IS_ERR(monmap)) {
 481                 pr_err("problem decoding monmap, %d\n",
 482                        (int)PTR_ERR(monmap));
 483                 ceph_msg_dump(msg);
 484                 goto out;
 485         }
 486 
 487         if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
 488                 kfree(monmap);
 489                 goto out;
 490         }
 491 
 492         client->monc.monmap = monmap;
 493         kfree(old);
 494 
 495         __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
 496         client->have_fsid = true;
 497 
 498 out:
 499         mutex_unlock(&monc->mutex);
 500         wake_up_all(&client->auth_wq);
 501 }
 502 
 503 /*
 504  * generic requests (currently statfs, mon_get_version)
 505  */
 506 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
 507 
 508 static void release_generic_request(struct kref *kref)
 509 {
 510         struct ceph_mon_generic_request *req =
 511                 container_of(kref, struct ceph_mon_generic_request, kref);
 512 
 513         dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
 514              req->reply);
 515         WARN_ON(!RB_EMPTY_NODE(&req->node));
 516 
 517         if (req->reply)
 518                 ceph_msg_put(req->reply);
 519         if (req->request)
 520                 ceph_msg_put(req->request);
 521 
 522         kfree(req);
 523 }
 524 
 525 static void put_generic_request(struct ceph_mon_generic_request *req)
 526 {
 527         if (req)
 528                 kref_put(&req->kref, release_generic_request);
 529 }
 530 
 531 static void get_generic_request(struct ceph_mon_generic_request *req)
 532 {
 533         kref_get(&req->kref);
 534 }
 535 
 536 static struct ceph_mon_generic_request *
 537 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
 538 {
 539         struct ceph_mon_generic_request *req;
 540 
 541         req = kzalloc(sizeof(*req), gfp);
 542         if (!req)
 543                 return NULL;
 544 
 545         req->monc = monc;
 546         kref_init(&req->kref);
 547         RB_CLEAR_NODE(&req->node);
 548         init_completion(&req->completion);
 549 
 550         dout("%s greq %p\n", __func__, req);
 551         return req;
 552 }
 553 
 554 static void register_generic_request(struct ceph_mon_generic_request *req)
 555 {
 556         struct ceph_mon_client *monc = req->monc;
 557 
 558         WARN_ON(req->tid);
 559 
 560         get_generic_request(req);
 561         req->tid = ++monc->last_tid;
 562         insert_generic_request(&monc->generic_request_tree, req);
 563 }
 564 
 565 static void send_generic_request(struct ceph_mon_client *monc,
 566                                  struct ceph_mon_generic_request *req)
 567 {
 568         WARN_ON(!req->tid);
 569 
 570         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
 571         req->request->hdr.tid = cpu_to_le64(req->tid);
 572         ceph_con_send(&monc->con, ceph_msg_get(req->request));
 573 }
 574 
 575 static void __finish_generic_request(struct ceph_mon_generic_request *req)
 576 {
 577         struct ceph_mon_client *monc = req->monc;
 578 
 579         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
 580         erase_generic_request(&monc->generic_request_tree, req);
 581 
 582         ceph_msg_revoke(req->request);
 583         ceph_msg_revoke_incoming(req->reply);
 584 }
 585 
 586 static void finish_generic_request(struct ceph_mon_generic_request *req)
 587 {
 588         __finish_generic_request(req);
 589         put_generic_request(req);
 590 }
 591 
 592 static void complete_generic_request(struct ceph_mon_generic_request *req)
 593 {
 594         if (req->complete_cb)
 595                 req->complete_cb(req);
 596         else
 597                 complete_all(&req->completion);
 598         put_generic_request(req);
 599 }
 600 
 601 static void cancel_generic_request(struct ceph_mon_generic_request *req)
 602 {
 603         struct ceph_mon_client *monc = req->monc;
 604         struct ceph_mon_generic_request *lookup_req;
 605 
 606         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
 607 
 608         mutex_lock(&monc->mutex);
 609         lookup_req = lookup_generic_request(&monc->generic_request_tree,
 610                                             req->tid);
 611         if (lookup_req) {
 612                 WARN_ON(lookup_req != req);
 613                 finish_generic_request(req);
 614         }
 615 
 616         mutex_unlock(&monc->mutex);
 617 }
 618 
 619 static int wait_generic_request(struct ceph_mon_generic_request *req)
 620 {
 621         int ret;
 622 
 623         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
 624         ret = wait_for_completion_interruptible(&req->completion);
 625         if (ret)
 626                 cancel_generic_request(req);
 627         else
 628                 ret = req->result; /* completed */
 629 
 630         return ret;
 631 }
 632 
 633 static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
 634                                          struct ceph_msg_header *hdr,
 635                                          int *skip)
 636 {
 637         struct ceph_mon_client *monc = con->private;
 638         struct ceph_mon_generic_request *req;
 639         u64 tid = le64_to_cpu(hdr->tid);
 640         struct ceph_msg *m;
 641 
 642         mutex_lock(&monc->mutex);
 643         req = lookup_generic_request(&monc->generic_request_tree, tid);
 644         if (!req) {
 645                 dout("get_generic_reply %lld dne\n", tid);
 646                 *skip = 1;
 647                 m = NULL;
 648         } else {
 649                 dout("get_generic_reply %lld got %p\n", tid, req->reply);
 650                 *skip = 0;
 651                 m = ceph_msg_get(req->reply);
 652                 /*
 653                  * we don't need to track the connection reading into
 654                  * this reply because we only have one open connection
 655                  * at a time, ever.
 656                  */
 657         }
 658         mutex_unlock(&monc->mutex);
 659         return m;
 660 }
 661 
 662 /*
 663  * statfs
 664  */
 665 static void handle_statfs_reply(struct ceph_mon_client *monc,
 666                                 struct ceph_msg *msg)
 667 {
 668         struct ceph_mon_generic_request *req;
 669         struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
 670         u64 tid = le64_to_cpu(msg->hdr.tid);
 671 
 672         dout("%s msg %p tid %llu\n", __func__, msg, tid);
 673 
 674         if (msg->front.iov_len != sizeof(*reply))
 675                 goto bad;
 676 
 677         mutex_lock(&monc->mutex);
 678         req = lookup_generic_request(&monc->generic_request_tree, tid);
 679         if (!req) {
 680                 mutex_unlock(&monc->mutex);
 681                 return;
 682         }
 683 
 684         req->result = 0;
 685         *req->u.st = reply->st; /* struct */
 686         __finish_generic_request(req);
 687         mutex_unlock(&monc->mutex);
 688 
 689         complete_generic_request(req);
 690         return;
 691 
 692 bad:
 693         pr_err("corrupt statfs reply, tid %llu\n", tid);
 694         ceph_msg_dump(msg);
 695 }
 696 
 697 /*
 698  * Do a synchronous statfs().
 699  */
 700 int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool,
 701                         struct ceph_statfs *buf)
 702 {
 703         struct ceph_mon_generic_request *req;
 704         struct ceph_mon_statfs *h;
 705         int ret = -ENOMEM;
 706 
 707         req = alloc_generic_request(monc, GFP_NOFS);
 708         if (!req)
 709                 goto out;
 710 
 711         req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
 712                                     true);
 713         if (!req->request)
 714                 goto out;
 715 
 716         req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
 717         if (!req->reply)
 718                 goto out;
 719 
 720         req->u.st = buf;
 721         req->request->hdr.version = cpu_to_le16(2);
 722 
 723         mutex_lock(&monc->mutex);
 724         register_generic_request(req);
 725         /* fill out request */
 726         h = req->request->front.iov_base;
 727         h->monhdr.have_version = 0;
 728         h->monhdr.session_mon = cpu_to_le16(-1);
 729         h->monhdr.session_mon_tid = 0;
 730         h->fsid = monc->monmap->fsid;
 731         h->contains_data_pool = (data_pool != CEPH_NOPOOL);
 732         h->data_pool = cpu_to_le64(data_pool);
 733         send_generic_request(monc, req);
 734         mutex_unlock(&monc->mutex);
 735 
 736         ret = wait_generic_request(req);
 737 out:
 738         put_generic_request(req);
 739         return ret;
 740 }
 741 EXPORT_SYMBOL(ceph_monc_do_statfs);
 742 
 743 static void handle_get_version_reply(struct ceph_mon_client *monc,
 744                                      struct ceph_msg *msg)
 745 {
 746         struct ceph_mon_generic_request *req;
 747         u64 tid = le64_to_cpu(msg->hdr.tid);
 748         void *p = msg->front.iov_base;
 749         void *end = p + msg->front_alloc_len;
 750         u64 handle;
 751 
 752         dout("%s msg %p tid %llu\n", __func__, msg, tid);
 753 
 754         ceph_decode_need(&p, end, 2*sizeof(u64), bad);
 755         handle = ceph_decode_64(&p);
 756         if (tid != 0 && tid != handle)
 757                 goto bad;
 758 
 759         mutex_lock(&monc->mutex);
 760         req = lookup_generic_request(&monc->generic_request_tree, handle);
 761         if (!req) {
 762                 mutex_unlock(&monc->mutex);
 763                 return;
 764         }
 765 
 766         req->result = 0;
 767         req->u.newest = ceph_decode_64(&p);
 768         __finish_generic_request(req);
 769         mutex_unlock(&monc->mutex);
 770 
 771         complete_generic_request(req);
 772         return;
 773 
 774 bad:
 775         pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
 776         ceph_msg_dump(msg);
 777 }
 778 
 779 static struct ceph_mon_generic_request *
 780 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
 781                         ceph_monc_callback_t cb, u64 private_data)
 782 {
 783         struct ceph_mon_generic_request *req;
 784 
 785         req = alloc_generic_request(monc, GFP_NOIO);
 786         if (!req)
 787                 goto err_put_req;
 788 
 789         req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
 790                                     sizeof(u64) + sizeof(u32) + strlen(what),
 791                                     GFP_NOIO, true);
 792         if (!req->request)
 793                 goto err_put_req;
 794 
 795         req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
 796                                   true);
 797         if (!req->reply)
 798                 goto err_put_req;
 799 
 800         req->complete_cb = cb;
 801         req->private_data = private_data;
 802 
 803         mutex_lock(&monc->mutex);
 804         register_generic_request(req);
 805         {
 806                 void *p = req->request->front.iov_base;
 807                 void *const end = p + req->request->front_alloc_len;
 808 
 809                 ceph_encode_64(&p, req->tid); /* handle */
 810                 ceph_encode_string(&p, end, what, strlen(what));
 811                 WARN_ON(p != end);
 812         }
 813         send_generic_request(monc, req);
 814         mutex_unlock(&monc->mutex);
 815 
 816         return req;
 817 
 818 err_put_req:
 819         put_generic_request(req);
 820         return ERR_PTR(-ENOMEM);
 821 }
 822 
 823 /*
 824  * Send MMonGetVersion and wait for the reply.
 825  *
 826  * @what: one of "mdsmap", "osdmap" or "monmap"
 827  */
 828 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
 829                           u64 *newest)
 830 {
 831         struct ceph_mon_generic_request *req;
 832         int ret;
 833 
 834         req = __ceph_monc_get_version(monc, what, NULL, 0);
 835         if (IS_ERR(req))
 836                 return PTR_ERR(req);
 837 
 838         ret = wait_generic_request(req);
 839         if (!ret)
 840                 *newest = req->u.newest;
 841 
 842         put_generic_request(req);
 843         return ret;
 844 }
 845 EXPORT_SYMBOL(ceph_monc_get_version);
 846 
 847 /*
 848  * Send MMonGetVersion,
 849  *
 850  * @what: one of "mdsmap", "osdmap" or "monmap"
 851  */
 852 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
 853                                 ceph_monc_callback_t cb, u64 private_data)
 854 {
 855         struct ceph_mon_generic_request *req;
 856 
 857         req = __ceph_monc_get_version(monc, what, cb, private_data);
 858         if (IS_ERR(req))
 859                 return PTR_ERR(req);
 860 
 861         put_generic_request(req);
 862         return 0;
 863 }
 864 EXPORT_SYMBOL(ceph_monc_get_version_async);
 865 
 866 static void handle_command_ack(struct ceph_mon_client *monc,
 867                                struct ceph_msg *msg)
 868 {
 869         struct ceph_mon_generic_request *req;
 870         void *p = msg->front.iov_base;
 871         void *const end = p + msg->front_alloc_len;
 872         u64 tid = le64_to_cpu(msg->hdr.tid);
 873 
 874         dout("%s msg %p tid %llu\n", __func__, msg, tid);
 875 
 876         ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
 877                                                             sizeof(u32), bad);
 878         p += sizeof(struct ceph_mon_request_header);
 879 
 880         mutex_lock(&monc->mutex);
 881         req = lookup_generic_request(&monc->generic_request_tree, tid);
 882         if (!req) {
 883                 mutex_unlock(&monc->mutex);
 884                 return;
 885         }
 886 
 887         req->result = ceph_decode_32(&p);
 888         __finish_generic_request(req);
 889         mutex_unlock(&monc->mutex);
 890 
 891         complete_generic_request(req);
 892         return;
 893 
 894 bad:
 895         pr_err("corrupt mon_command ack, tid %llu\n", tid);
 896         ceph_msg_dump(msg);
 897 }
 898 
 899 int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
 900                             struct ceph_entity_addr *client_addr)
 901 {
 902         struct ceph_mon_generic_request *req;
 903         struct ceph_mon_command *h;
 904         int ret = -ENOMEM;
 905         int len;
 906 
 907         req = alloc_generic_request(monc, GFP_NOIO);
 908         if (!req)
 909                 goto out;
 910 
 911         req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
 912         if (!req->request)
 913                 goto out;
 914 
 915         req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
 916                                   true);
 917         if (!req->reply)
 918                 goto out;
 919 
 920         mutex_lock(&monc->mutex);
 921         register_generic_request(req);
 922         h = req->request->front.iov_base;
 923         h->monhdr.have_version = 0;
 924         h->monhdr.session_mon = cpu_to_le16(-1);
 925         h->monhdr.session_mon_tid = 0;
 926         h->fsid = monc->monmap->fsid;
 927         h->num_strs = cpu_to_le32(1);
 928         len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
 929                                  \"blacklistop\": \"add\", \
 930                                  \"addr\": \"%pISpc/%u\" }",
 931                       &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
 932         h->str_len = cpu_to_le32(len);
 933         send_generic_request(monc, req);
 934         mutex_unlock(&monc->mutex);
 935 
 936         ret = wait_generic_request(req);
 937         if (!ret)
 938                 /*
 939                  * Make sure we have the osdmap that includes the blacklist
 940                  * entry.  This is needed to ensure that the OSDs pick up the
 941                  * new blacklist before processing any future requests from
 942                  * this client.
 943                  */
 944                 ret = ceph_wait_for_latest_osdmap(monc->client, 0);
 945 
 946 out:
 947         put_generic_request(req);
 948         return ret;
 949 }
 950 EXPORT_SYMBOL(ceph_monc_blacklist_add);
 951 
 952 /*
 953  * Resend pending generic requests.
 954  */
 955 static void __resend_generic_request(struct ceph_mon_client *monc)
 956 {
 957         struct ceph_mon_generic_request *req;
 958         struct rb_node *p;
 959 
 960         for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
 961                 req = rb_entry(p, struct ceph_mon_generic_request, node);
 962                 ceph_msg_revoke(req->request);
 963                 ceph_msg_revoke_incoming(req->reply);
 964                 ceph_con_send(&monc->con, ceph_msg_get(req->request));
 965         }
 966 }
 967 
 968 /*
 969  * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
 970  * renew/retry subscription as needed (in case it is timing out, or we
 971  * got an ENOMEM).  And keep the monitor connection alive.
 972  */
 973 static void delayed_work(struct work_struct *work)
 974 {
 975         struct ceph_mon_client *monc =
 976                 container_of(work, struct ceph_mon_client, delayed_work.work);
 977 
 978         dout("monc delayed_work\n");
 979         mutex_lock(&monc->mutex);
 980         if (monc->hunting) {
 981                 dout("%s continuing hunt\n", __func__);
 982                 reopen_session(monc);
 983         } else {
 984                 int is_auth = ceph_auth_is_authenticated(monc->auth);
 985                 if (ceph_con_keepalive_expired(&monc->con,
 986                                                CEPH_MONC_PING_TIMEOUT)) {
 987                         dout("monc keepalive timeout\n");
 988                         is_auth = 0;
 989                         reopen_session(monc);
 990                 }
 991 
 992                 if (!monc->hunting) {
 993                         ceph_con_keepalive(&monc->con);
 994                         __validate_auth(monc);
 995                         un_backoff(monc);
 996                 }
 997 
 998                 if (is_auth &&
 999                     !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
1000                         unsigned long now = jiffies;
1001 
1002                         dout("%s renew subs? now %lu renew after %lu\n",
1003                              __func__, now, monc->sub_renew_after);
1004                         if (time_after_eq(now, monc->sub_renew_after))
1005                                 __send_subscribe(monc);
1006                 }
1007         }
1008         __schedule_delayed(monc);
1009         mutex_unlock(&monc->mutex);
1010 }
1011 
1012 /*
1013  * On startup, we build a temporary monmap populated with the IPs
1014  * provided by mount(2).
1015  */
1016 static int build_initial_monmap(struct ceph_mon_client *monc)
1017 {
1018         struct ceph_options *opt = monc->client->options;
1019         struct ceph_entity_addr *mon_addr = opt->mon_addr;
1020         int num_mon = opt->num_mon;
1021         int i;
1022 
1023         /* build initial monmap */
1024         monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon),
1025                                GFP_KERNEL);
1026         if (!monc->monmap)
1027                 return -ENOMEM;
1028         for (i = 0; i < num_mon; i++) {
1029                 monc->monmap->mon_inst[i].addr = mon_addr[i];
1030                 monc->monmap->mon_inst[i].addr.nonce = 0;
1031                 monc->monmap->mon_inst[i].name.type =
1032                         CEPH_ENTITY_TYPE_MON;
1033                 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
1034         }
1035         monc->monmap->num_mon = num_mon;
1036         return 0;
1037 }
1038 
1039 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
1040 {
1041         int err = 0;
1042 
1043         dout("init\n");
1044         memset(monc, 0, sizeof(*monc));
1045         monc->client = cl;
1046         monc->monmap = NULL;
1047         mutex_init(&monc->mutex);
1048 
1049         err = build_initial_monmap(monc);
1050         if (err)
1051                 goto out;
1052 
1053         /* connection */
1054         /* authentication */
1055         monc->auth = ceph_auth_init(cl->options->name,
1056                                     cl->options->key);
1057         if (IS_ERR(monc->auth)) {
1058                 err = PTR_ERR(monc->auth);
1059                 goto out_monmap;
1060         }
1061         monc->auth->want_keys =
1062                 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
1063                 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
1064 
1065         /* msgs */
1066         err = -ENOMEM;
1067         monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
1068                                      sizeof(struct ceph_mon_subscribe_ack),
1069                                      GFP_KERNEL, true);
1070         if (!monc->m_subscribe_ack)
1071                 goto out_auth;
1072 
1073         monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
1074                                          GFP_KERNEL, true);
1075         if (!monc->m_subscribe)
1076                 goto out_subscribe_ack;
1077 
1078         monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
1079                                           GFP_KERNEL, true);
1080         if (!monc->m_auth_reply)
1081                 goto out_subscribe;
1082 
1083         monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
1084         monc->pending_auth = 0;
1085         if (!monc->m_auth)
1086                 goto out_auth_reply;
1087 
1088         ceph_con_init(&monc->con, monc, &mon_con_ops,
1089                       &monc->client->msgr);
1090 
1091         monc->cur_mon = -1;
1092         monc->had_a_connection = false;
1093         monc->hunt_mult = 1;
1094 
1095         INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
1096         monc->generic_request_tree = RB_ROOT;
1097         monc->last_tid = 0;
1098 
1099         monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
1100 
1101         return 0;
1102 
1103 out_auth_reply:
1104         ceph_msg_put(monc->m_auth_reply);
1105 out_subscribe:
1106         ceph_msg_put(monc->m_subscribe);
1107 out_subscribe_ack:
1108         ceph_msg_put(monc->m_subscribe_ack);
1109 out_auth:
1110         ceph_auth_destroy(monc->auth);
1111 out_monmap:
1112         kfree(monc->monmap);
1113 out:
1114         return err;
1115 }
1116 EXPORT_SYMBOL(ceph_monc_init);
1117 
1118 void ceph_monc_stop(struct ceph_mon_client *monc)
1119 {
1120         dout("stop\n");
1121         cancel_delayed_work_sync(&monc->delayed_work);
1122 
1123         mutex_lock(&monc->mutex);
1124         __close_session(monc);
1125         monc->cur_mon = -1;
1126         mutex_unlock(&monc->mutex);
1127 
1128         /*
1129          * flush msgr queue before we destroy ourselves to ensure that:
1130          *  - any work that references our embedded con is finished.
1131          *  - any osd_client or other work that may reference an authorizer
1132          *    finishes before we shut down the auth subsystem.
1133          */
1134         ceph_msgr_flush();
1135 
1136         ceph_auth_destroy(monc->auth);
1137 
1138         WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
1139 
1140         ceph_msg_put(monc->m_auth);
1141         ceph_msg_put(monc->m_auth_reply);
1142         ceph_msg_put(monc->m_subscribe);
1143         ceph_msg_put(monc->m_subscribe_ack);
1144 
1145         kfree(monc->monmap);
1146 }
1147 EXPORT_SYMBOL(ceph_monc_stop);
1148 
1149 static void finish_hunting(struct ceph_mon_client *monc)
1150 {
1151         if (monc->hunting) {
1152                 dout("%s found mon%d\n", __func__, monc->cur_mon);
1153                 monc->hunting = false;
1154                 monc->had_a_connection = true;
1155                 un_backoff(monc);
1156                 __schedule_delayed(monc);
1157         }
1158 }
1159 
1160 static void handle_auth_reply(struct ceph_mon_client *monc,
1161                               struct ceph_msg *msg)
1162 {
1163         int ret;
1164         int was_auth = 0;
1165 
1166         mutex_lock(&monc->mutex);
1167         was_auth = ceph_auth_is_authenticated(monc->auth);
1168         monc->pending_auth = 0;
1169         ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
1170                                      msg->front.iov_len,
1171                                      monc->m_auth->front.iov_base,
1172                                      monc->m_auth->front_alloc_len);
1173         if (ret > 0) {
1174                 __send_prepared_auth_request(monc, ret);
1175                 goto out;
1176         }
1177 
1178         finish_hunting(monc);
1179 
1180         if (ret < 0) {
1181                 monc->client->auth_err = ret;
1182         } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
1183                 dout("authenticated, starting session\n");
1184 
1185                 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
1186                 monc->client->msgr.inst.name.num =
1187                                         cpu_to_le64(monc->auth->global_id);
1188 
1189                 __send_subscribe(monc);
1190                 __resend_generic_request(monc);
1191 
1192                 pr_info("mon%d %s session established\n", monc->cur_mon,
1193                         ceph_pr_addr(&monc->con.peer_addr));
1194         }
1195 
1196 out:
1197         mutex_unlock(&monc->mutex);
1198         if (monc->client->auth_err < 0)
1199                 wake_up_all(&monc->client->auth_wq);
1200 }
1201 
1202 static int __validate_auth(struct ceph_mon_client *monc)
1203 {
1204         int ret;
1205 
1206         if (monc->pending_auth)
1207                 return 0;
1208 
1209         ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
1210                               monc->m_auth->front_alloc_len);
1211         if (ret <= 0)
1212                 return ret; /* either an error, or no need to authenticate */
1213         __send_prepared_auth_request(monc, ret);
1214         return 0;
1215 }
1216 
1217 int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1218 {
1219         int ret;
1220 
1221         mutex_lock(&monc->mutex);
1222         ret = __validate_auth(monc);
1223         mutex_unlock(&monc->mutex);
1224         return ret;
1225 }
1226 EXPORT_SYMBOL(ceph_monc_validate_auth);
1227 
1228 /*
1229  * handle incoming message
1230  */
1231 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1232 {
1233         struct ceph_mon_client *monc = con->private;
1234         int type = le16_to_cpu(msg->hdr.type);
1235 
1236         if (!monc)
1237                 return;
1238 
1239         switch (type) {
1240         case CEPH_MSG_AUTH_REPLY:
1241                 handle_auth_reply(monc, msg);
1242                 break;
1243 
1244         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1245                 handle_subscribe_ack(monc, msg);
1246                 break;
1247 
1248         case CEPH_MSG_STATFS_REPLY:
1249                 handle_statfs_reply(monc, msg);
1250                 break;
1251 
1252         case CEPH_MSG_MON_GET_VERSION_REPLY:
1253                 handle_get_version_reply(monc, msg);
1254                 break;
1255 
1256         case CEPH_MSG_MON_COMMAND_ACK:
1257                 handle_command_ack(monc, msg);
1258                 break;
1259 
1260         case CEPH_MSG_MON_MAP:
1261                 ceph_monc_handle_map(monc, msg);
1262                 break;
1263 
1264         case CEPH_MSG_OSD_MAP:
1265                 ceph_osdc_handle_map(&monc->client->osdc, msg);
1266                 break;
1267 
1268         default:
1269                 /* can the chained handler handle it? */
1270                 if (monc->client->extra_mon_dispatch &&
1271                     monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1272                         break;
1273 
1274                 pr_err("received unknown message type %d %s\n", type,
1275                        ceph_msg_type_name(type));
1276         }
1277         ceph_msg_put(msg);
1278 }
1279 
1280 /*
1281  * Allocate memory for incoming message
1282  */
1283 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1284                                       struct ceph_msg_header *hdr,
1285                                       int *skip)
1286 {
1287         struct ceph_mon_client *monc = con->private;
1288         int type = le16_to_cpu(hdr->type);
1289         int front_len = le32_to_cpu(hdr->front_len);
1290         struct ceph_msg *m = NULL;
1291 
1292         *skip = 0;
1293 
1294         switch (type) {
1295         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1296                 m = ceph_msg_get(monc->m_subscribe_ack);
1297                 break;
1298         case CEPH_MSG_STATFS_REPLY:
1299         case CEPH_MSG_MON_COMMAND_ACK:
1300                 return get_generic_reply(con, hdr, skip);
1301         case CEPH_MSG_AUTH_REPLY:
1302                 m = ceph_msg_get(monc->m_auth_reply);
1303                 break;
1304         case CEPH_MSG_MON_GET_VERSION_REPLY:
1305                 if (le64_to_cpu(hdr->tid) != 0)
1306                         return get_generic_reply(con, hdr, skip);
1307 
1308                 /*
1309                  * Older OSDs don't set reply tid even if the orignal
1310                  * request had a non-zero tid.  Work around this weirdness
1311                  * by allocating a new message.
1312                  */
1313                 /* fall through */
1314         case CEPH_MSG_MON_MAP:
1315         case CEPH_MSG_MDS_MAP:
1316         case CEPH_MSG_OSD_MAP:
1317         case CEPH_MSG_FS_MAP_USER:
1318                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1319                 if (!m)
1320                         return NULL;    /* ENOMEM--return skip == 0 */
1321                 break;
1322         }
1323 
1324         if (!m) {
1325                 pr_info("alloc_msg unknown type %d\n", type);
1326                 *skip = 1;
1327         } else if (front_len > m->front_alloc_len) {
1328                 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1329                         front_len, m->front_alloc_len,
1330                         (unsigned int)con->peer_name.type,
1331                         le64_to_cpu(con->peer_name.num));
1332                 ceph_msg_put(m);
1333                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1334         }
1335 
1336         return m;
1337 }
1338 
1339 /*
1340  * If the monitor connection resets, pick a new monitor and resubmit
1341  * any pending requests.
1342  */
1343 static void mon_fault(struct ceph_connection *con)
1344 {
1345         struct ceph_mon_client *monc = con->private;
1346 
1347         mutex_lock(&monc->mutex);
1348         dout("%s mon%d\n", __func__, monc->cur_mon);
1349         if (monc->cur_mon >= 0) {
1350                 if (!monc->hunting) {
1351                         dout("%s hunting for new mon\n", __func__);
1352                         reopen_session(monc);
1353                         __schedule_delayed(monc);
1354                 } else {
1355                         dout("%s already hunting\n", __func__);
1356                 }
1357         }
1358         mutex_unlock(&monc->mutex);
1359 }
1360 
1361 /*
1362  * We can ignore refcounting on the connection struct, as all references
1363  * will come from the messenger workqueue, which is drained prior to
1364  * mon_client destruction.
1365  */
1366 static struct ceph_connection *con_get(struct ceph_connection *con)
1367 {
1368         return con;
1369 }
1370 
1371 static void con_put(struct ceph_connection *con)
1372 {
1373 }
1374 
1375 static const struct ceph_connection_operations mon_con_ops = {
1376         .get = con_get,
1377         .put = con_put,
1378         .dispatch = dispatch,
1379         .fault = mon_fault,
1380         .alloc_msg = mon_alloc_msg,
1381 };

/* [<][>][^][v][top][bottom][index][help] */