This source file includes following definitions.
- parse_reply_info_quota
- parse_reply_info_in
- parse_reply_info_dir
- parse_reply_info_lease
- parse_reply_info_trace
- parse_reply_info_readdir
- parse_reply_info_filelock
- parse_reply_info_create
- parse_reply_info_extra
- parse_reply_info
- destroy_reply_info
- ceph_session_state_name
- get_session
- ceph_put_mds_session
- __ceph_lookup_mds_session
- __have_session
- __verify_registered_session
- register_session
- __unregister_session
- put_request_session
- ceph_mdsc_release_request
- DEFINE_RB_FUNCS
- __register_request
- __unregister_request
- get_nonsnap_parent
- __choose_mds
- create_session_msg
- encode_supported_features
- create_session_open_msg
- __open_session
- __open_export_target_session
- ceph_mdsc_open_export_target_session
- __open_export_target_sessions
- ceph_mdsc_open_export_target_sessions
- detach_cap_releases
- dispose_cap_releases
- cleanup_session_requests
- ceph_iterate_session_caps
- remove_session_caps_cb
- remove_session_caps
- wake_up_session_cb
- wake_up_session_caps
- send_renew_caps
- send_flushmsg_ack
- renewed_caps
- request_close_session
- __close_session
- drop_negative_children
- trim_caps_cb
- ceph_trim_caps
- check_caps_flush
- wait_caps_flush
- ceph_send_cap_releases
- ceph_cap_release_work
- ceph_flush_cap_releases
- __ceph_queue_cap_release
- ceph_cap_reclaim_work
- ceph_queue_cap_reclaim_work
- ceph_reclaim_caps_nr
- ceph_alloc_readdir_reply_buffer
- ceph_mdsc_create_request
- __get_oldest_req
- __get_oldest_tid
- ceph_mdsc_build_path
- build_dentry_path
- build_inode_path
- set_request_path_attr
- create_request_message
- complete_request
- __prepare_send_request
- __do_request
- __wake_requests
- kick_requests
- ceph_mdsc_submit_request
- ceph_mdsc_wait_request
- ceph_mdsc_do_request
- ceph_invalidate_dir_request
- handle_reply
- handle_forward
- __decode_session_metadata
- handle_session
- replay_unsafe_requests
- send_reconnect_partial
- encode_caps_cb
- encode_snap_realms
- send_mds_reconnect
- check_new_map
- __ceph_mdsc_drop_dentry_lease
- handle_lease
- ceph_mdsc_lease_send_msg
- lock_unlock_sessions
- maybe_recover_session
- schedule_delayed
- delayed_work
- ceph_mdsc_init
- wait_requests
- ceph_mdsc_pre_umount
- wait_unsafe_requests
- ceph_mdsc_sync
- done_closing_sessions
- ceph_mdsc_close_sessions
- ceph_mdsc_force_umount
- ceph_mdsc_stop
- ceph_mdsc_destroy
- ceph_mdsc_handle_fsmap
- ceph_mdsc_handle_mdsmap
- con_get
- con_put
- peer_reset
- dispatch
- get_authorizer
- add_authorizer_challenge
- verify_authorizer_reply
- invalidate_authorizer
- mds_alloc_msg
- mds_sign_message
- mds_check_message_signature
1
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/fs.h>
5 #include <linux/wait.h>
6 #include <linux/slab.h>
7 #include <linux/gfp.h>
8 #include <linux/sched.h>
9 #include <linux/debugfs.h>
10 #include <linux/seq_file.h>
11 #include <linux/ratelimit.h>
12
13 #include "super.h"
14 #include "mds_client.h"
15
16 #include <linux/ceph/ceph_features.h>
17 #include <linux/ceph/messenger.h>
18 #include <linux/ceph/decode.h>
19 #include <linux/ceph/pagelist.h>
20 #include <linux/ceph/auth.h>
21 #include <linux/ceph/debugfs.h>
22
23 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 struct ceph_reconnect_state {
51 struct ceph_mds_session *session;
52 int nr_caps, nr_realms;
53 struct ceph_pagelist *pagelist;
54 unsigned msg_version;
55 bool allow_multi;
56 };
57
58 static void __wake_requests(struct ceph_mds_client *mdsc,
59 struct list_head *head);
60 static void ceph_cap_release_work(struct work_struct *work);
61 static void ceph_cap_reclaim_work(struct work_struct *work);
62
63 static const struct ceph_connection_operations mds_con_ops;
64
65
66
67
68
69
70 static int parse_reply_info_quota(void **p, void *end,
71 struct ceph_mds_reply_info_in *info)
72 {
73 u8 struct_v, struct_compat;
74 u32 struct_len;
75
76 ceph_decode_8_safe(p, end, struct_v, bad);
77 ceph_decode_8_safe(p, end, struct_compat, bad);
78
79
80 if (!struct_v || struct_compat != 1)
81 goto bad;
82 ceph_decode_32_safe(p, end, struct_len, bad);
83 ceph_decode_need(p, end, struct_len, bad);
84 end = *p + struct_len;
85 ceph_decode_64_safe(p, end, info->max_bytes, bad);
86 ceph_decode_64_safe(p, end, info->max_files, bad);
87 *p = end;
88 return 0;
89 bad:
90 return -EIO;
91 }
92
93
94
95
96 static int parse_reply_info_in(void **p, void *end,
97 struct ceph_mds_reply_info_in *info,
98 u64 features)
99 {
100 int err = 0;
101 u8 struct_v = 0;
102
103 if (features == (u64)-1) {
104 u32 struct_len;
105 u8 struct_compat;
106 ceph_decode_8_safe(p, end, struct_v, bad);
107 ceph_decode_8_safe(p, end, struct_compat, bad);
108
109
110 if (!struct_v || struct_compat != 1)
111 goto bad;
112 ceph_decode_32_safe(p, end, struct_len, bad);
113 ceph_decode_need(p, end, struct_len, bad);
114 end = *p + struct_len;
115 }
116
117 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
118 info->in = *p;
119 *p += sizeof(struct ceph_mds_reply_inode) +
120 sizeof(*info->in->fragtree.splits) *
121 le32_to_cpu(info->in->fragtree.nsplits);
122
123 ceph_decode_32_safe(p, end, info->symlink_len, bad);
124 ceph_decode_need(p, end, info->symlink_len, bad);
125 info->symlink = *p;
126 *p += info->symlink_len;
127
128 ceph_decode_copy_safe(p, end, &info->dir_layout,
129 sizeof(info->dir_layout), bad);
130 ceph_decode_32_safe(p, end, info->xattr_len, bad);
131 ceph_decode_need(p, end, info->xattr_len, bad);
132 info->xattr_data = *p;
133 *p += info->xattr_len;
134
135 if (features == (u64)-1) {
136
137 ceph_decode_64_safe(p, end, info->inline_version, bad);
138 ceph_decode_32_safe(p, end, info->inline_len, bad);
139 ceph_decode_need(p, end, info->inline_len, bad);
140 info->inline_data = *p;
141 *p += info->inline_len;
142
143 err = parse_reply_info_quota(p, end, info);
144 if (err < 0)
145 goto out_bad;
146
147 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
148 if (info->pool_ns_len > 0) {
149 ceph_decode_need(p, end, info->pool_ns_len, bad);
150 info->pool_ns_data = *p;
151 *p += info->pool_ns_len;
152 }
153
154
155 ceph_decode_need(p, end, sizeof(info->btime), bad);
156 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
157
158
159 ceph_decode_64_safe(p, end, info->change_attr, bad);
160
161
162 if (struct_v >= 2) {
163 ceph_decode_32_safe(p, end, info->dir_pin, bad);
164 } else {
165 info->dir_pin = -ENODATA;
166 }
167
168
169 if (struct_v >= 3) {
170 ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
171 ceph_decode_copy(p, &info->snap_btime,
172 sizeof(info->snap_btime));
173 } else {
174 memset(&info->snap_btime, 0, sizeof(info->snap_btime));
175 }
176
177 *p = end;
178 } else {
179 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
180 ceph_decode_64_safe(p, end, info->inline_version, bad);
181 ceph_decode_32_safe(p, end, info->inline_len, bad);
182 ceph_decode_need(p, end, info->inline_len, bad);
183 info->inline_data = *p;
184 *p += info->inline_len;
185 } else
186 info->inline_version = CEPH_INLINE_NONE;
187
188 if (features & CEPH_FEATURE_MDS_QUOTA) {
189 err = parse_reply_info_quota(p, end, info);
190 if (err < 0)
191 goto out_bad;
192 } else {
193 info->max_bytes = 0;
194 info->max_files = 0;
195 }
196
197 info->pool_ns_len = 0;
198 info->pool_ns_data = NULL;
199 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
200 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
201 if (info->pool_ns_len > 0) {
202 ceph_decode_need(p, end, info->pool_ns_len, bad);
203 info->pool_ns_data = *p;
204 *p += info->pool_ns_len;
205 }
206 }
207
208 if (features & CEPH_FEATURE_FS_BTIME) {
209 ceph_decode_need(p, end, sizeof(info->btime), bad);
210 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
211 ceph_decode_64_safe(p, end, info->change_attr, bad);
212 }
213
214 info->dir_pin = -ENODATA;
215
216 }
217 return 0;
218 bad:
219 err = -EIO;
220 out_bad:
221 return err;
222 }
223
224 static int parse_reply_info_dir(void **p, void *end,
225 struct ceph_mds_reply_dirfrag **dirfrag,
226 u64 features)
227 {
228 if (features == (u64)-1) {
229 u8 struct_v, struct_compat;
230 u32 struct_len;
231 ceph_decode_8_safe(p, end, struct_v, bad);
232 ceph_decode_8_safe(p, end, struct_compat, bad);
233
234
235 if (!struct_v || struct_compat != 1)
236 goto bad;
237 ceph_decode_32_safe(p, end, struct_len, bad);
238 ceph_decode_need(p, end, struct_len, bad);
239 end = *p + struct_len;
240 }
241
242 ceph_decode_need(p, end, sizeof(**dirfrag), bad);
243 *dirfrag = *p;
244 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
245 if (unlikely(*p > end))
246 goto bad;
247 if (features == (u64)-1)
248 *p = end;
249 return 0;
250 bad:
251 return -EIO;
252 }
253
254 static int parse_reply_info_lease(void **p, void *end,
255 struct ceph_mds_reply_lease **lease,
256 u64 features)
257 {
258 if (features == (u64)-1) {
259 u8 struct_v, struct_compat;
260 u32 struct_len;
261 ceph_decode_8_safe(p, end, struct_v, bad);
262 ceph_decode_8_safe(p, end, struct_compat, bad);
263
264
265 if (!struct_v || struct_compat != 1)
266 goto bad;
267 ceph_decode_32_safe(p, end, struct_len, bad);
268 ceph_decode_need(p, end, struct_len, bad);
269 end = *p + struct_len;
270 }
271
272 ceph_decode_need(p, end, sizeof(**lease), bad);
273 *lease = *p;
274 *p += sizeof(**lease);
275 if (features == (u64)-1)
276 *p = end;
277 return 0;
278 bad:
279 return -EIO;
280 }
281
282
283
284
285
286 static int parse_reply_info_trace(void **p, void *end,
287 struct ceph_mds_reply_info_parsed *info,
288 u64 features)
289 {
290 int err;
291
292 if (info->head->is_dentry) {
293 err = parse_reply_info_in(p, end, &info->diri, features);
294 if (err < 0)
295 goto out_bad;
296
297 err = parse_reply_info_dir(p, end, &info->dirfrag, features);
298 if (err < 0)
299 goto out_bad;
300
301 ceph_decode_32_safe(p, end, info->dname_len, bad);
302 ceph_decode_need(p, end, info->dname_len, bad);
303 info->dname = *p;
304 *p += info->dname_len;
305
306 err = parse_reply_info_lease(p, end, &info->dlease, features);
307 if (err < 0)
308 goto out_bad;
309 }
310
311 if (info->head->is_target) {
312 err = parse_reply_info_in(p, end, &info->targeti, features);
313 if (err < 0)
314 goto out_bad;
315 }
316
317 if (unlikely(*p != end))
318 goto bad;
319 return 0;
320
321 bad:
322 err = -EIO;
323 out_bad:
324 pr_err("problem parsing mds trace %d\n", err);
325 return err;
326 }
327
328
329
330
331 static int parse_reply_info_readdir(void **p, void *end,
332 struct ceph_mds_reply_info_parsed *info,
333 u64 features)
334 {
335 u32 num, i = 0;
336 int err;
337
338 err = parse_reply_info_dir(p, end, &info->dir_dir, features);
339 if (err < 0)
340 goto out_bad;
341
342 ceph_decode_need(p, end, sizeof(num) + 2, bad);
343 num = ceph_decode_32(p);
344 {
345 u16 flags = ceph_decode_16(p);
346 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
347 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
348 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
349 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
350 }
351 if (num == 0)
352 goto done;
353
354 BUG_ON(!info->dir_entries);
355 if ((unsigned long)(info->dir_entries + num) >
356 (unsigned long)info->dir_entries + info->dir_buf_size) {
357 pr_err("dir contents are larger than expected\n");
358 WARN_ON(1);
359 goto bad;
360 }
361
362 info->dir_nr = num;
363 while (num) {
364 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
365
366 ceph_decode_32_safe(p, end, rde->name_len, bad);
367 ceph_decode_need(p, end, rde->name_len, bad);
368 rde->name = *p;
369 *p += rde->name_len;
370 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
371
372
373 err = parse_reply_info_lease(p, end, &rde->lease, features);
374 if (err)
375 goto out_bad;
376
377 err = parse_reply_info_in(p, end, &rde->inode, features);
378 if (err < 0)
379 goto out_bad;
380
381 rde->offset = 0;
382 i++;
383 num--;
384 }
385
386 done:
387
388 *p = end;
389 return 0;
390
391 bad:
392 err = -EIO;
393 out_bad:
394 pr_err("problem parsing dir contents %d\n", err);
395 return err;
396 }
397
398
399
400
401 static int parse_reply_info_filelock(void **p, void *end,
402 struct ceph_mds_reply_info_parsed *info,
403 u64 features)
404 {
405 if (*p + sizeof(*info->filelock_reply) > end)
406 goto bad;
407
408 info->filelock_reply = *p;
409
410
411 *p = end;
412 return 0;
413 bad:
414 return -EIO;
415 }
416
417
418
419
420 static int parse_reply_info_create(void **p, void *end,
421 struct ceph_mds_reply_info_parsed *info,
422 u64 features)
423 {
424 if (features == (u64)-1 ||
425 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
426
427 if (*p == end) {
428 info->has_create_ino = false;
429 } else {
430 info->has_create_ino = true;
431 ceph_decode_64_safe(p, end, info->ino, bad);
432 }
433 } else {
434 if (*p != end)
435 goto bad;
436 }
437
438
439 *p = end;
440 return 0;
441 bad:
442 return -EIO;
443 }
444
445
446
447
448 static int parse_reply_info_extra(void **p, void *end,
449 struct ceph_mds_reply_info_parsed *info,
450 u64 features)
451 {
452 u32 op = le32_to_cpu(info->head->op);
453
454 if (op == CEPH_MDS_OP_GETFILELOCK)
455 return parse_reply_info_filelock(p, end, info, features);
456 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
457 return parse_reply_info_readdir(p, end, info, features);
458 else if (op == CEPH_MDS_OP_CREATE)
459 return parse_reply_info_create(p, end, info, features);
460 else
461 return -EIO;
462 }
463
464
465
466
467 static int parse_reply_info(struct ceph_msg *msg,
468 struct ceph_mds_reply_info_parsed *info,
469 u64 features)
470 {
471 void *p, *end;
472 u32 len;
473 int err;
474
475 info->head = msg->front.iov_base;
476 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
477 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
478
479
480 ceph_decode_32_safe(&p, end, len, bad);
481 if (len > 0) {
482 ceph_decode_need(&p, end, len, bad);
483 err = parse_reply_info_trace(&p, p+len, info, features);
484 if (err < 0)
485 goto out_bad;
486 }
487
488
489 ceph_decode_32_safe(&p, end, len, bad);
490 if (len > 0) {
491 ceph_decode_need(&p, end, len, bad);
492 err = parse_reply_info_extra(&p, p+len, info, features);
493 if (err < 0)
494 goto out_bad;
495 }
496
497
498 ceph_decode_32_safe(&p, end, len, bad);
499 info->snapblob_len = len;
500 info->snapblob = p;
501 p += len;
502
503 if (p != end)
504 goto bad;
505 return 0;
506
507 bad:
508 err = -EIO;
509 out_bad:
510 pr_err("mds parse_reply err %d\n", err);
511 return err;
512 }
513
514 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
515 {
516 if (!info->dir_entries)
517 return;
518 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
519 }
520
521
522
523
524
525 const char *ceph_session_state_name(int s)
526 {
527 switch (s) {
528 case CEPH_MDS_SESSION_NEW: return "new";
529 case CEPH_MDS_SESSION_OPENING: return "opening";
530 case CEPH_MDS_SESSION_OPEN: return "open";
531 case CEPH_MDS_SESSION_HUNG: return "hung";
532 case CEPH_MDS_SESSION_CLOSING: return "closing";
533 case CEPH_MDS_SESSION_RESTARTING: return "restarting";
534 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
535 case CEPH_MDS_SESSION_REJECTED: return "rejected";
536 default: return "???";
537 }
538 }
539
540 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
541 {
542 if (refcount_inc_not_zero(&s->s_ref)) {
543 dout("mdsc get_session %p %d -> %d\n", s,
544 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
545 return s;
546 } else {
547 dout("mdsc get_session %p 0 -- FAIL\n", s);
548 return NULL;
549 }
550 }
551
552 void ceph_put_mds_session(struct ceph_mds_session *s)
553 {
554 dout("mdsc put_session %p %d -> %d\n", s,
555 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
556 if (refcount_dec_and_test(&s->s_ref)) {
557 if (s->s_auth.authorizer)
558 ceph_auth_destroy_authorizer(s->s_auth.authorizer);
559 kfree(s);
560 }
561 }
562
563
564
565
566 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
567 int mds)
568 {
569 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
570 return NULL;
571 return get_session(mdsc->sessions[mds]);
572 }
573
574 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
575 {
576 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
577 return false;
578 else
579 return true;
580 }
581
582 static int __verify_registered_session(struct ceph_mds_client *mdsc,
583 struct ceph_mds_session *s)
584 {
585 if (s->s_mds >= mdsc->max_sessions ||
586 mdsc->sessions[s->s_mds] != s)
587 return -ENOENT;
588 return 0;
589 }
590
591
592
593
594
595 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
596 int mds)
597 {
598 struct ceph_mds_session *s;
599
600 if (mds >= mdsc->mdsmap->m_num_mds)
601 return ERR_PTR(-EINVAL);
602
603 s = kzalloc(sizeof(*s), GFP_NOFS);
604 if (!s)
605 return ERR_PTR(-ENOMEM);
606
607 if (mds >= mdsc->max_sessions) {
608 int newmax = 1 << get_count_order(mds + 1);
609 struct ceph_mds_session **sa;
610
611 dout("%s: realloc to %d\n", __func__, newmax);
612 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
613 if (!sa)
614 goto fail_realloc;
615 if (mdsc->sessions) {
616 memcpy(sa, mdsc->sessions,
617 mdsc->max_sessions * sizeof(void *));
618 kfree(mdsc->sessions);
619 }
620 mdsc->sessions = sa;
621 mdsc->max_sessions = newmax;
622 }
623
624 dout("%s: mds%d\n", __func__, mds);
625 s->s_mdsc = mdsc;
626 s->s_mds = mds;
627 s->s_state = CEPH_MDS_SESSION_NEW;
628 s->s_ttl = 0;
629 s->s_seq = 0;
630 mutex_init(&s->s_mutex);
631
632 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
633
634 spin_lock_init(&s->s_gen_ttl_lock);
635 s->s_cap_gen = 1;
636 s->s_cap_ttl = jiffies - 1;
637
638 spin_lock_init(&s->s_cap_lock);
639 s->s_renew_requested = 0;
640 s->s_renew_seq = 0;
641 INIT_LIST_HEAD(&s->s_caps);
642 s->s_nr_caps = 0;
643 refcount_set(&s->s_ref, 1);
644 INIT_LIST_HEAD(&s->s_waiting);
645 INIT_LIST_HEAD(&s->s_unsafe);
646 s->s_num_cap_releases = 0;
647 s->s_cap_reconnect = 0;
648 s->s_cap_iterator = NULL;
649 INIT_LIST_HEAD(&s->s_cap_releases);
650 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
651
652 INIT_LIST_HEAD(&s->s_cap_flushing);
653
654 mdsc->sessions[mds] = s;
655 atomic_inc(&mdsc->num_sessions);
656 refcount_inc(&s->s_ref);
657
658 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
659 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
660
661 return s;
662
663 fail_realloc:
664 kfree(s);
665 return ERR_PTR(-ENOMEM);
666 }
667
668
669
670
671 static void __unregister_session(struct ceph_mds_client *mdsc,
672 struct ceph_mds_session *s)
673 {
674 dout("__unregister_session mds%d %p\n", s->s_mds, s);
675 BUG_ON(mdsc->sessions[s->s_mds] != s);
676 mdsc->sessions[s->s_mds] = NULL;
677 s->s_state = 0;
678 ceph_con_close(&s->s_con);
679 ceph_put_mds_session(s);
680 atomic_dec(&mdsc->num_sessions);
681 }
682
683
684
685
686
687
688 static void put_request_session(struct ceph_mds_request *req)
689 {
690 if (req->r_session) {
691 ceph_put_mds_session(req->r_session);
692 req->r_session = NULL;
693 }
694 }
695
696 void ceph_mdsc_release_request(struct kref *kref)
697 {
698 struct ceph_mds_request *req = container_of(kref,
699 struct ceph_mds_request,
700 r_kref);
701 destroy_reply_info(&req->r_reply_info);
702 if (req->r_request)
703 ceph_msg_put(req->r_request);
704 if (req->r_reply)
705 ceph_msg_put(req->r_reply);
706 if (req->r_inode) {
707 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
708
709 ceph_async_iput(req->r_inode);
710 }
711 if (req->r_parent) {
712 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
713 ceph_async_iput(req->r_parent);
714 }
715 ceph_async_iput(req->r_target_inode);
716 if (req->r_dentry)
717 dput(req->r_dentry);
718 if (req->r_old_dentry)
719 dput(req->r_old_dentry);
720 if (req->r_old_dentry_dir) {
721
722
723
724
725
726
727 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
728 CEPH_CAP_PIN);
729 ceph_async_iput(req->r_old_dentry_dir);
730 }
731 kfree(req->r_path1);
732 kfree(req->r_path2);
733 if (req->r_pagelist)
734 ceph_pagelist_release(req->r_pagelist);
735 put_request_session(req);
736 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
737 WARN_ON_ONCE(!list_empty(&req->r_wait));
738 kfree(req);
739 }
740
741 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
742
743
744
745
746
747
748 static struct ceph_mds_request *
749 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
750 {
751 struct ceph_mds_request *req;
752
753 req = lookup_request(&mdsc->request_tree, tid);
754 if (req)
755 ceph_mdsc_get_request(req);
756
757 return req;
758 }
759
760
761
762
763
764
765
766 static void __register_request(struct ceph_mds_client *mdsc,
767 struct ceph_mds_request *req,
768 struct inode *dir)
769 {
770 int ret = 0;
771
772 req->r_tid = ++mdsc->last_tid;
773 if (req->r_num_caps) {
774 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
775 req->r_num_caps);
776 if (ret < 0) {
777 pr_err("__register_request %p "
778 "failed to reserve caps: %d\n", req, ret);
779
780 req->r_err = ret;
781 return;
782 }
783 }
784 dout("__register_request %p tid %lld\n", req, req->r_tid);
785 ceph_mdsc_get_request(req);
786 insert_request(&mdsc->request_tree, req);
787
788 req->r_uid = current_fsuid();
789 req->r_gid = current_fsgid();
790
791 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
792 mdsc->oldest_tid = req->r_tid;
793
794 if (dir) {
795 ihold(dir);
796 req->r_unsafe_dir = dir;
797 }
798 }
799
800 static void __unregister_request(struct ceph_mds_client *mdsc,
801 struct ceph_mds_request *req)
802 {
803 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
804
805
806 list_del_init(&req->r_unsafe_item);
807
808 if (req->r_tid == mdsc->oldest_tid) {
809 struct rb_node *p = rb_next(&req->r_node);
810 mdsc->oldest_tid = 0;
811 while (p) {
812 struct ceph_mds_request *next_req =
813 rb_entry(p, struct ceph_mds_request, r_node);
814 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
815 mdsc->oldest_tid = next_req->r_tid;
816 break;
817 }
818 p = rb_next(p);
819 }
820 }
821
822 erase_request(&mdsc->request_tree, req);
823
824 if (req->r_unsafe_dir &&
825 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
826 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
827 spin_lock(&ci->i_unsafe_lock);
828 list_del_init(&req->r_unsafe_dir_item);
829 spin_unlock(&ci->i_unsafe_lock);
830 }
831 if (req->r_target_inode &&
832 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
833 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
834 spin_lock(&ci->i_unsafe_lock);
835 list_del_init(&req->r_unsafe_target_item);
836 spin_unlock(&ci->i_unsafe_lock);
837 }
838
839 if (req->r_unsafe_dir) {
840
841 ceph_async_iput(req->r_unsafe_dir);
842 req->r_unsafe_dir = NULL;
843 }
844
845 complete_all(&req->r_safe_completion);
846
847 ceph_mdsc_put_request(req);
848 }
849
850
851
852
853
854
855
856
857 static struct inode *get_nonsnap_parent(struct dentry *dentry)
858 {
859 struct inode *inode = NULL;
860
861 while (dentry && !IS_ROOT(dentry)) {
862 inode = d_inode_rcu(dentry);
863 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
864 break;
865 dentry = dentry->d_parent;
866 }
867 if (inode)
868 inode = igrab(inode);
869 return inode;
870 }
871
872
873
874
875
876
877
878
879
880 static int __choose_mds(struct ceph_mds_client *mdsc,
881 struct ceph_mds_request *req)
882 {
883 struct inode *inode;
884 struct ceph_inode_info *ci;
885 struct ceph_cap *cap;
886 int mode = req->r_direct_mode;
887 int mds = -1;
888 u32 hash = req->r_direct_hash;
889 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
890
891
892
893
894
895 if (req->r_resend_mds >= 0 &&
896 (__have_session(mdsc, req->r_resend_mds) ||
897 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
898 dout("choose_mds using resend_mds mds%d\n",
899 req->r_resend_mds);
900 return req->r_resend_mds;
901 }
902
903 if (mode == USE_RANDOM_MDS)
904 goto random;
905
906 inode = NULL;
907 if (req->r_inode) {
908 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
909 inode = req->r_inode;
910 ihold(inode);
911 } else {
912
913 rcu_read_lock();
914 inode = get_nonsnap_parent(req->r_dentry);
915 rcu_read_unlock();
916 dout("__choose_mds using snapdir's parent %p\n", inode);
917 }
918 } else if (req->r_dentry) {
919
920 struct dentry *parent;
921 struct inode *dir;
922
923 rcu_read_lock();
924 parent = READ_ONCE(req->r_dentry->d_parent);
925 dir = req->r_parent ? : d_inode_rcu(parent);
926
927 if (!dir || dir->i_sb != mdsc->fsc->sb) {
928
929 inode = d_inode(req->r_dentry);
930 if (inode)
931 ihold(inode);
932 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
933
934
935 inode = get_nonsnap_parent(parent);
936 dout("__choose_mds using nonsnap parent %p\n", inode);
937 } else {
938
939 inode = d_inode(req->r_dentry);
940 if (!inode || mode == USE_AUTH_MDS) {
941
942 inode = igrab(dir);
943 hash = ceph_dentry_hash(dir, req->r_dentry);
944 is_hash = true;
945 } else {
946 ihold(inode);
947 }
948 }
949 rcu_read_unlock();
950 }
951
952 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
953 (int)hash, mode);
954 if (!inode)
955 goto random;
956 ci = ceph_inode(inode);
957
958 if (is_hash && S_ISDIR(inode->i_mode)) {
959 struct ceph_inode_frag frag;
960 int found;
961
962 ceph_choose_frag(ci, hash, &frag, &found);
963 if (found) {
964 if (mode == USE_ANY_MDS && frag.ndist > 0) {
965 u8 r;
966
967
968 get_random_bytes(&r, 1);
969 r %= frag.ndist;
970 mds = frag.dist[r];
971 dout("choose_mds %p %llx.%llx "
972 "frag %u mds%d (%d/%d)\n",
973 inode, ceph_vinop(inode),
974 frag.frag, mds,
975 (int)r, frag.ndist);
976 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
977 CEPH_MDS_STATE_ACTIVE)
978 goto out;
979 }
980
981
982
983
984 mode = USE_AUTH_MDS;
985 if (frag.mds >= 0) {
986
987 mds = frag.mds;
988 dout("choose_mds %p %llx.%llx "
989 "frag %u mds%d (auth)\n",
990 inode, ceph_vinop(inode), frag.frag, mds);
991 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
992 CEPH_MDS_STATE_ACTIVE)
993 goto out;
994 }
995 }
996 }
997
998 spin_lock(&ci->i_ceph_lock);
999 cap = NULL;
1000 if (mode == USE_AUTH_MDS)
1001 cap = ci->i_auth_cap;
1002 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1003 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1004 if (!cap) {
1005 spin_unlock(&ci->i_ceph_lock);
1006 ceph_async_iput(inode);
1007 goto random;
1008 }
1009 mds = cap->session->s_mds;
1010 dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
1011 inode, ceph_vinop(inode), mds,
1012 cap == ci->i_auth_cap ? "auth " : "", cap);
1013 spin_unlock(&ci->i_ceph_lock);
1014 out:
1015
1016
1017 ceph_async_iput(inode);
1018 return mds;
1019
1020 random:
1021 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1022 dout("choose_mds chose random mds%d\n", mds);
1023 return mds;
1024 }
1025
1026
1027
1028
1029
1030 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
1031 {
1032 struct ceph_msg *msg;
1033 struct ceph_mds_session_head *h;
1034
1035 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1036 false);
1037 if (!msg) {
1038 pr_err("create_session_msg ENOMEM creating msg\n");
1039 return NULL;
1040 }
1041 h = msg->front.iov_base;
1042 h->op = cpu_to_le32(op);
1043 h->seq = cpu_to_le64(seq);
1044
1045 return msg;
1046 }
1047
1048 static void encode_supported_features(void **p, void *end)
1049 {
1050 static const unsigned char bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1051 static const size_t count = ARRAY_SIZE(bits);
1052
1053 if (count > 0) {
1054 size_t i;
1055 size_t size = ((size_t)bits[count - 1] + 64) / 64 * 8;
1056
1057 BUG_ON(*p + 4 + size > end);
1058 ceph_encode_32(p, size);
1059 memset(*p, 0, size);
1060 for (i = 0; i < count; i++)
1061 ((unsigned char*)(*p))[i / 8] |= 1 << (bits[i] % 8);
1062 *p += size;
1063 } else {
1064 BUG_ON(*p + 4 > end);
1065 ceph_encode_32(p, 0);
1066 }
1067 }
1068
1069
1070
1071
1072
1073 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1074 {
1075 struct ceph_msg *msg;
1076 struct ceph_mds_session_head *h;
1077 int i = -1;
1078 int extra_bytes = 0;
1079 int metadata_key_count = 0;
1080 struct ceph_options *opt = mdsc->fsc->client->options;
1081 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1082 void *p, *end;
1083
1084 const char* metadata[][2] = {
1085 {"hostname", mdsc->nodename},
1086 {"kernel_version", init_utsname()->release},
1087 {"entity_id", opt->name ? : ""},
1088 {"root", fsopt->server_path ? : "/"},
1089 {NULL, NULL}
1090 };
1091
1092
1093 extra_bytes = 4;
1094 for (i = 0; metadata[i][0]; ++i) {
1095 extra_bytes += 8 + strlen(metadata[i][0]) +
1096 strlen(metadata[i][1]);
1097 metadata_key_count++;
1098 }
1099
1100 extra_bytes += 4 + 8;
1101
1102
1103 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1104 GFP_NOFS, false);
1105 if (!msg) {
1106 pr_err("create_session_msg ENOMEM creating msg\n");
1107 return NULL;
1108 }
1109 p = msg->front.iov_base;
1110 end = p + msg->front.iov_len;
1111
1112 h = p;
1113 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1114 h->seq = cpu_to_le64(seq);
1115
1116
1117
1118
1119
1120
1121
1122 msg->hdr.version = cpu_to_le16(3);
1123 msg->hdr.compat_version = cpu_to_le16(1);
1124
1125
1126 p += sizeof(*h);
1127
1128
1129 ceph_encode_32(&p, metadata_key_count);
1130
1131
1132 for (i = 0; metadata[i][0]; ++i) {
1133 size_t const key_len = strlen(metadata[i][0]);
1134 size_t const val_len = strlen(metadata[i][1]);
1135
1136 ceph_encode_32(&p, key_len);
1137 memcpy(p, metadata[i][0], key_len);
1138 p += key_len;
1139 ceph_encode_32(&p, val_len);
1140 memcpy(p, metadata[i][1], val_len);
1141 p += val_len;
1142 }
1143
1144 encode_supported_features(&p, end);
1145 msg->front.iov_len = p - msg->front.iov_base;
1146 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1147
1148 return msg;
1149 }
1150
1151
1152
1153
1154
1155
1156 static int __open_session(struct ceph_mds_client *mdsc,
1157 struct ceph_mds_session *session)
1158 {
1159 struct ceph_msg *msg;
1160 int mstate;
1161 int mds = session->s_mds;
1162
1163
1164 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1165 dout("open_session to mds%d (%s)\n", mds,
1166 ceph_mds_state_name(mstate));
1167 session->s_state = CEPH_MDS_SESSION_OPENING;
1168 session->s_renew_requested = jiffies;
1169
1170
1171 msg = create_session_open_msg(mdsc, session->s_seq);
1172 if (!msg)
1173 return -ENOMEM;
1174 ceph_con_send(&session->s_con, msg);
1175 return 0;
1176 }
1177
1178
1179
1180
1181
1182
1183 static struct ceph_mds_session *
1184 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
1185 {
1186 struct ceph_mds_session *session;
1187
1188 session = __ceph_lookup_mds_session(mdsc, target);
1189 if (!session) {
1190 session = register_session(mdsc, target);
1191 if (IS_ERR(session))
1192 return session;
1193 }
1194 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1195 session->s_state == CEPH_MDS_SESSION_CLOSING)
1196 __open_session(mdsc, session);
1197
1198 return session;
1199 }
1200
1201 struct ceph_mds_session *
1202 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1203 {
1204 struct ceph_mds_session *session;
1205
1206 dout("open_export_target_session to mds%d\n", target);
1207
1208 mutex_lock(&mdsc->mutex);
1209 session = __open_export_target_session(mdsc, target);
1210 mutex_unlock(&mdsc->mutex);
1211
1212 return session;
1213 }
1214
1215 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1216 struct ceph_mds_session *session)
1217 {
1218 struct ceph_mds_info *mi;
1219 struct ceph_mds_session *ts;
1220 int i, mds = session->s_mds;
1221
1222 if (mds >= mdsc->mdsmap->m_num_mds)
1223 return;
1224
1225 mi = &mdsc->mdsmap->m_info[mds];
1226 dout("open_export_target_sessions for mds%d (%d targets)\n",
1227 session->s_mds, mi->num_export_targets);
1228
1229 for (i = 0; i < mi->num_export_targets; i++) {
1230 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1231 if (!IS_ERR(ts))
1232 ceph_put_mds_session(ts);
1233 }
1234 }
1235
1236 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1237 struct ceph_mds_session *session)
1238 {
1239 mutex_lock(&mdsc->mutex);
1240 __open_export_target_sessions(mdsc, session);
1241 mutex_unlock(&mdsc->mutex);
1242 }
1243
1244
1245
1246
1247
1248 static void detach_cap_releases(struct ceph_mds_session *session,
1249 struct list_head *target)
1250 {
1251 lockdep_assert_held(&session->s_cap_lock);
1252
1253 list_splice_init(&session->s_cap_releases, target);
1254 session->s_num_cap_releases = 0;
1255 dout("dispose_cap_releases mds%d\n", session->s_mds);
1256 }
1257
1258 static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1259 struct list_head *dispose)
1260 {
1261 while (!list_empty(dispose)) {
1262 struct ceph_cap *cap;
1263
1264 cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1265 list_del(&cap->session_caps);
1266 ceph_put_cap(mdsc, cap);
1267 }
1268 }
1269
1270 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1271 struct ceph_mds_session *session)
1272 {
1273 struct ceph_mds_request *req;
1274 struct rb_node *p;
1275 struct ceph_inode_info *ci;
1276
1277 dout("cleanup_session_requests mds%d\n", session->s_mds);
1278 mutex_lock(&mdsc->mutex);
1279 while (!list_empty(&session->s_unsafe)) {
1280 req = list_first_entry(&session->s_unsafe,
1281 struct ceph_mds_request, r_unsafe_item);
1282 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1283 req->r_tid);
1284 if (req->r_target_inode) {
1285
1286 ci = ceph_inode(req->r_target_inode);
1287 errseq_set(&ci->i_meta_err, -EIO);
1288 }
1289 if (req->r_unsafe_dir) {
1290
1291 ci = ceph_inode(req->r_unsafe_dir);
1292 errseq_set(&ci->i_meta_err, -EIO);
1293 }
1294 __unregister_request(mdsc, req);
1295 }
1296
1297 p = rb_first(&mdsc->request_tree);
1298 while (p) {
1299 req = rb_entry(p, struct ceph_mds_request, r_node);
1300 p = rb_next(p);
1301 if (req->r_session &&
1302 req->r_session->s_mds == session->s_mds)
1303 req->r_attempts = 0;
1304 }
1305 mutex_unlock(&mdsc->mutex);
1306 }
1307
1308
1309
1310
1311
1312
1313
1314 int ceph_iterate_session_caps(struct ceph_mds_session *session,
1315 int (*cb)(struct inode *, struct ceph_cap *,
1316 void *), void *arg)
1317 {
1318 struct list_head *p;
1319 struct ceph_cap *cap;
1320 struct inode *inode, *last_inode = NULL;
1321 struct ceph_cap *old_cap = NULL;
1322 int ret;
1323
1324 dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1325 spin_lock(&session->s_cap_lock);
1326 p = session->s_caps.next;
1327 while (p != &session->s_caps) {
1328 cap = list_entry(p, struct ceph_cap, session_caps);
1329 inode = igrab(&cap->ci->vfs_inode);
1330 if (!inode) {
1331 p = p->next;
1332 continue;
1333 }
1334 session->s_cap_iterator = cap;
1335 spin_unlock(&session->s_cap_lock);
1336
1337 if (last_inode) {
1338
1339
1340 ceph_async_iput(last_inode);
1341 last_inode = NULL;
1342 }
1343 if (old_cap) {
1344 ceph_put_cap(session->s_mdsc, old_cap);
1345 old_cap = NULL;
1346 }
1347
1348 ret = cb(inode, cap, arg);
1349 last_inode = inode;
1350
1351 spin_lock(&session->s_cap_lock);
1352 p = p->next;
1353 if (!cap->ci) {
1354 dout("iterate_session_caps finishing cap %p removal\n",
1355 cap);
1356 BUG_ON(cap->session != session);
1357 cap->session = NULL;
1358 list_del_init(&cap->session_caps);
1359 session->s_nr_caps--;
1360 if (cap->queue_release)
1361 __ceph_queue_cap_release(session, cap);
1362 else
1363 old_cap = cap;
1364 }
1365 if (ret < 0)
1366 goto out;
1367 }
1368 ret = 0;
1369 out:
1370 session->s_cap_iterator = NULL;
1371 spin_unlock(&session->s_cap_lock);
1372
1373 ceph_async_iput(last_inode);
1374 if (old_cap)
1375 ceph_put_cap(session->s_mdsc, old_cap);
1376
1377 return ret;
1378 }
1379
1380 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1381 void *arg)
1382 {
1383 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1384 struct ceph_inode_info *ci = ceph_inode(inode);
1385 LIST_HEAD(to_remove);
1386 bool dirty_dropped = false;
1387 bool invalidate = false;
1388
1389 dout("removing cap %p, ci is %p, inode is %p\n",
1390 cap, ci, &ci->vfs_inode);
1391 spin_lock(&ci->i_ceph_lock);
1392 if (cap->mds_wanted | cap->issued)
1393 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1394 __ceph_remove_cap(cap, false);
1395 if (!ci->i_auth_cap) {
1396 struct ceph_cap_flush *cf;
1397 struct ceph_mds_client *mdsc = fsc->mdsc;
1398
1399 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1400 if (inode->i_data.nrpages > 0)
1401 invalidate = true;
1402 if (ci->i_wrbuffer_ref > 0)
1403 mapping_set_error(&inode->i_data, -EIO);
1404 }
1405
1406 while (!list_empty(&ci->i_cap_flush_list)) {
1407 cf = list_first_entry(&ci->i_cap_flush_list,
1408 struct ceph_cap_flush, i_list);
1409 list_move(&cf->i_list, &to_remove);
1410 }
1411
1412 spin_lock(&mdsc->cap_dirty_lock);
1413
1414 list_for_each_entry(cf, &to_remove, i_list)
1415 list_del(&cf->g_list);
1416
1417 if (!list_empty(&ci->i_dirty_item)) {
1418 pr_warn_ratelimited(
1419 " dropping dirty %s state for %p %lld\n",
1420 ceph_cap_string(ci->i_dirty_caps),
1421 inode, ceph_ino(inode));
1422 ci->i_dirty_caps = 0;
1423 list_del_init(&ci->i_dirty_item);
1424 dirty_dropped = true;
1425 }
1426 if (!list_empty(&ci->i_flushing_item)) {
1427 pr_warn_ratelimited(
1428 " dropping dirty+flushing %s state for %p %lld\n",
1429 ceph_cap_string(ci->i_flushing_caps),
1430 inode, ceph_ino(inode));
1431 ci->i_flushing_caps = 0;
1432 list_del_init(&ci->i_flushing_item);
1433 mdsc->num_cap_flushing--;
1434 dirty_dropped = true;
1435 }
1436 spin_unlock(&mdsc->cap_dirty_lock);
1437
1438 if (dirty_dropped) {
1439 errseq_set(&ci->i_meta_err, -EIO);
1440
1441 if (ci->i_wrbuffer_ref_head == 0 &&
1442 ci->i_wr_ref == 0 &&
1443 ci->i_dirty_caps == 0 &&
1444 ci->i_flushing_caps == 0) {
1445 ceph_put_snap_context(ci->i_head_snapc);
1446 ci->i_head_snapc = NULL;
1447 }
1448 }
1449
1450 if (atomic_read(&ci->i_filelock_ref) > 0) {
1451
1452 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1453 pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1454 inode, ceph_ino(inode));
1455 }
1456
1457 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1458 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1459 ci->i_prealloc_cap_flush = NULL;
1460 }
1461 }
1462 spin_unlock(&ci->i_ceph_lock);
1463 while (!list_empty(&to_remove)) {
1464 struct ceph_cap_flush *cf;
1465 cf = list_first_entry(&to_remove,
1466 struct ceph_cap_flush, i_list);
1467 list_del(&cf->i_list);
1468 ceph_free_cap_flush(cf);
1469 }
1470
1471 wake_up_all(&ci->i_cap_wq);
1472 if (invalidate)
1473 ceph_queue_invalidate(inode);
1474 if (dirty_dropped)
1475 iput(inode);
1476 return 0;
1477 }
1478
1479
1480
1481
1482 static void remove_session_caps(struct ceph_mds_session *session)
1483 {
1484 struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1485 struct super_block *sb = fsc->sb;
1486 LIST_HEAD(dispose);
1487
1488 dout("remove_session_caps on %p\n", session);
1489 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1490
1491 wake_up_all(&fsc->mdsc->cap_flushing_wq);
1492
1493 spin_lock(&session->s_cap_lock);
1494 if (session->s_nr_caps > 0) {
1495 struct inode *inode;
1496 struct ceph_cap *cap, *prev = NULL;
1497 struct ceph_vino vino;
1498
1499
1500
1501
1502
1503
1504
1505 while (!list_empty(&session->s_caps)) {
1506 cap = list_entry(session->s_caps.next,
1507 struct ceph_cap, session_caps);
1508 if (cap == prev)
1509 break;
1510 prev = cap;
1511 vino = cap->ci->i_vino;
1512 spin_unlock(&session->s_cap_lock);
1513
1514 inode = ceph_find_inode(sb, vino);
1515
1516 ceph_async_iput(inode);
1517
1518 spin_lock(&session->s_cap_lock);
1519 }
1520 }
1521
1522
1523 detach_cap_releases(session, &dispose);
1524
1525 BUG_ON(session->s_nr_caps > 0);
1526 BUG_ON(!list_empty(&session->s_cap_flushing));
1527 spin_unlock(&session->s_cap_lock);
1528 dispose_cap_releases(session->s_mdsc, &dispose);
1529 }
1530
1531 enum {
1532 RECONNECT,
1533 RENEWCAPS,
1534 FORCE_RO,
1535 };
1536
1537
1538
1539
1540
1541
1542
1543 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1544 void *arg)
1545 {
1546 struct ceph_inode_info *ci = ceph_inode(inode);
1547 unsigned long ev = (unsigned long)arg;
1548
1549 if (ev == RECONNECT) {
1550 spin_lock(&ci->i_ceph_lock);
1551 ci->i_wanted_max_size = 0;
1552 ci->i_requested_max_size = 0;
1553 spin_unlock(&ci->i_ceph_lock);
1554 } else if (ev == RENEWCAPS) {
1555 if (cap->cap_gen < cap->session->s_cap_gen) {
1556
1557 spin_lock(&ci->i_ceph_lock);
1558 cap->issued = cap->implemented = CEPH_CAP_PIN;
1559
1560 if (__ceph_caps_file_wanted(ci) & ~cap->mds_wanted)
1561 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1562 spin_unlock(&ci->i_ceph_lock);
1563 }
1564 } else if (ev == FORCE_RO) {
1565 }
1566 wake_up_all(&ci->i_cap_wq);
1567 return 0;
1568 }
1569
1570 static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1571 {
1572 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1573 ceph_iterate_session_caps(session, wake_up_session_cb,
1574 (void *)(unsigned long)ev);
1575 }
1576
1577
1578
1579
1580
1581
1582
1583 static int send_renew_caps(struct ceph_mds_client *mdsc,
1584 struct ceph_mds_session *session)
1585 {
1586 struct ceph_msg *msg;
1587 int state;
1588
1589 if (time_after_eq(jiffies, session->s_cap_ttl) &&
1590 time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1591 pr_info("mds%d caps stale\n", session->s_mds);
1592 session->s_renew_requested = jiffies;
1593
1594
1595
1596 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1597 if (state < CEPH_MDS_STATE_RECONNECT) {
1598 dout("send_renew_caps ignoring mds%d (%s)\n",
1599 session->s_mds, ceph_mds_state_name(state));
1600 return 0;
1601 }
1602
1603 dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1604 ceph_mds_state_name(state));
1605 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1606 ++session->s_renew_seq);
1607 if (!msg)
1608 return -ENOMEM;
1609 ceph_con_send(&session->s_con, msg);
1610 return 0;
1611 }
1612
1613 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1614 struct ceph_mds_session *session, u64 seq)
1615 {
1616 struct ceph_msg *msg;
1617
1618 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1619 session->s_mds, ceph_session_state_name(session->s_state), seq);
1620 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1621 if (!msg)
1622 return -ENOMEM;
1623 ceph_con_send(&session->s_con, msg);
1624 return 0;
1625 }
1626
1627
1628
1629
1630
1631
1632
1633 static void renewed_caps(struct ceph_mds_client *mdsc,
1634 struct ceph_mds_session *session, int is_renew)
1635 {
1636 int was_stale;
1637 int wake = 0;
1638
1639 spin_lock(&session->s_cap_lock);
1640 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1641
1642 session->s_cap_ttl = session->s_renew_requested +
1643 mdsc->mdsmap->m_session_timeout*HZ;
1644
1645 if (was_stale) {
1646 if (time_before(jiffies, session->s_cap_ttl)) {
1647 pr_info("mds%d caps renewed\n", session->s_mds);
1648 wake = 1;
1649 } else {
1650 pr_info("mds%d caps still stale\n", session->s_mds);
1651 }
1652 }
1653 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1654 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1655 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1656 spin_unlock(&session->s_cap_lock);
1657
1658 if (wake)
1659 wake_up_session_caps(session, RENEWCAPS);
1660 }
1661
1662
1663
1664
1665 static int request_close_session(struct ceph_mds_client *mdsc,
1666 struct ceph_mds_session *session)
1667 {
1668 struct ceph_msg *msg;
1669
1670 dout("request_close_session mds%d state %s seq %lld\n",
1671 session->s_mds, ceph_session_state_name(session->s_state),
1672 session->s_seq);
1673 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1674 if (!msg)
1675 return -ENOMEM;
1676 ceph_con_send(&session->s_con, msg);
1677 return 1;
1678 }
1679
1680
1681
1682
1683 static int __close_session(struct ceph_mds_client *mdsc,
1684 struct ceph_mds_session *session)
1685 {
1686 if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1687 return 0;
1688 session->s_state = CEPH_MDS_SESSION_CLOSING;
1689 return request_close_session(mdsc, session);
1690 }
1691
1692 static bool drop_negative_children(struct dentry *dentry)
1693 {
1694 struct dentry *child;
1695 bool all_negative = true;
1696
1697 if (!d_is_dir(dentry))
1698 goto out;
1699
1700 spin_lock(&dentry->d_lock);
1701 list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1702 if (d_really_is_positive(child)) {
1703 all_negative = false;
1704 break;
1705 }
1706 }
1707 spin_unlock(&dentry->d_lock);
1708
1709 if (all_negative)
1710 shrink_dcache_parent(dentry);
1711 out:
1712 return all_negative;
1713 }
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1726 {
1727 int *remaining = arg;
1728 struct ceph_inode_info *ci = ceph_inode(inode);
1729 int used, wanted, oissued, mine;
1730
1731 if (*remaining <= 0)
1732 return -1;
1733
1734 spin_lock(&ci->i_ceph_lock);
1735 mine = cap->issued | cap->implemented;
1736 used = __ceph_caps_used(ci);
1737 wanted = __ceph_caps_file_wanted(ci);
1738 oissued = __ceph_caps_issued_other(ci, cap);
1739
1740 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1741 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1742 ceph_cap_string(used), ceph_cap_string(wanted));
1743 if (cap == ci->i_auth_cap) {
1744 if (ci->i_dirty_caps || ci->i_flushing_caps ||
1745 !list_empty(&ci->i_cap_snaps))
1746 goto out;
1747 if ((used | wanted) & CEPH_CAP_ANY_WR)
1748 goto out;
1749
1750
1751
1752 if (atomic_read(&ci->i_filelock_ref) > 0)
1753 goto out;
1754 }
1755
1756
1757 if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1758 !(oissued & CEPH_CAP_FILE_CACHE)) {
1759 used = 0;
1760 oissued = 0;
1761 }
1762 if ((used | wanted) & ~oissued & mine)
1763 goto out;
1764
1765 if (oissued) {
1766
1767 __ceph_remove_cap(cap, true);
1768 (*remaining)--;
1769 } else {
1770 struct dentry *dentry;
1771
1772 spin_unlock(&ci->i_ceph_lock);
1773 dentry = d_find_any_alias(inode);
1774 if (dentry && drop_negative_children(dentry)) {
1775 int count;
1776 dput(dentry);
1777 d_prune_aliases(inode);
1778 count = atomic_read(&inode->i_count);
1779 if (count == 1)
1780 (*remaining)--;
1781 dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1782 inode, cap, count);
1783 } else {
1784 dput(dentry);
1785 }
1786 return 0;
1787 }
1788
1789 out:
1790 spin_unlock(&ci->i_ceph_lock);
1791 return 0;
1792 }
1793
1794
1795
1796
1797 int ceph_trim_caps(struct ceph_mds_client *mdsc,
1798 struct ceph_mds_session *session,
1799 int max_caps)
1800 {
1801 int trim_caps = session->s_nr_caps - max_caps;
1802
1803 dout("trim_caps mds%d start: %d / %d, trim %d\n",
1804 session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1805 if (trim_caps > 0) {
1806 int remaining = trim_caps;
1807
1808 ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
1809 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1810 session->s_mds, session->s_nr_caps, max_caps,
1811 trim_caps - remaining);
1812 }
1813
1814 ceph_flush_cap_releases(mdsc, session);
1815 return 0;
1816 }
1817
1818 static int check_caps_flush(struct ceph_mds_client *mdsc,
1819 u64 want_flush_tid)
1820 {
1821 int ret = 1;
1822
1823 spin_lock(&mdsc->cap_dirty_lock);
1824 if (!list_empty(&mdsc->cap_flush_list)) {
1825 struct ceph_cap_flush *cf =
1826 list_first_entry(&mdsc->cap_flush_list,
1827 struct ceph_cap_flush, g_list);
1828 if (cf->tid <= want_flush_tid) {
1829 dout("check_caps_flush still flushing tid "
1830 "%llu <= %llu\n", cf->tid, want_flush_tid);
1831 ret = 0;
1832 }
1833 }
1834 spin_unlock(&mdsc->cap_dirty_lock);
1835 return ret;
1836 }
1837
1838
1839
1840
1841
1842
1843 static void wait_caps_flush(struct ceph_mds_client *mdsc,
1844 u64 want_flush_tid)
1845 {
1846 dout("check_caps_flush want %llu\n", want_flush_tid);
1847
1848 wait_event(mdsc->cap_flushing_wq,
1849 check_caps_flush(mdsc, want_flush_tid));
1850
1851 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1852 }
1853
1854
1855
1856
1857 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1858 struct ceph_mds_session *session)
1859 {
1860 struct ceph_msg *msg = NULL;
1861 struct ceph_mds_cap_release *head;
1862 struct ceph_mds_cap_item *item;
1863 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
1864 struct ceph_cap *cap;
1865 LIST_HEAD(tmp_list);
1866 int num_cap_releases;
1867 __le32 barrier, *cap_barrier;
1868
1869 down_read(&osdc->lock);
1870 barrier = cpu_to_le32(osdc->epoch_barrier);
1871 up_read(&osdc->lock);
1872
1873 spin_lock(&session->s_cap_lock);
1874 again:
1875 list_splice_init(&session->s_cap_releases, &tmp_list);
1876 num_cap_releases = session->s_num_cap_releases;
1877 session->s_num_cap_releases = 0;
1878 spin_unlock(&session->s_cap_lock);
1879
1880 while (!list_empty(&tmp_list)) {
1881 if (!msg) {
1882 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
1883 PAGE_SIZE, GFP_NOFS, false);
1884 if (!msg)
1885 goto out_err;
1886 head = msg->front.iov_base;
1887 head->num = cpu_to_le32(0);
1888 msg->front.iov_len = sizeof(*head);
1889
1890 msg->hdr.version = cpu_to_le16(2);
1891 msg->hdr.compat_version = cpu_to_le16(1);
1892 }
1893
1894 cap = list_first_entry(&tmp_list, struct ceph_cap,
1895 session_caps);
1896 list_del(&cap->session_caps);
1897 num_cap_releases--;
1898
1899 head = msg->front.iov_base;
1900 put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
1901 &head->num);
1902 item = msg->front.iov_base + msg->front.iov_len;
1903 item->ino = cpu_to_le64(cap->cap_ino);
1904 item->cap_id = cpu_to_le64(cap->cap_id);
1905 item->migrate_seq = cpu_to_le32(cap->mseq);
1906 item->seq = cpu_to_le32(cap->issue_seq);
1907 msg->front.iov_len += sizeof(*item);
1908
1909 ceph_put_cap(mdsc, cap);
1910
1911 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1912
1913 cap_barrier = msg->front.iov_base + msg->front.iov_len;
1914 *cap_barrier = barrier;
1915 msg->front.iov_len += sizeof(*cap_barrier);
1916
1917 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1918 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1919 ceph_con_send(&session->s_con, msg);
1920 msg = NULL;
1921 }
1922 }
1923
1924 BUG_ON(num_cap_releases != 0);
1925
1926 spin_lock(&session->s_cap_lock);
1927 if (!list_empty(&session->s_cap_releases))
1928 goto again;
1929 spin_unlock(&session->s_cap_lock);
1930
1931 if (msg) {
1932
1933 cap_barrier = msg->front.iov_base + msg->front.iov_len;
1934 *cap_barrier = barrier;
1935 msg->front.iov_len += sizeof(*cap_barrier);
1936
1937 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1938 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1939 ceph_con_send(&session->s_con, msg);
1940 }
1941 return;
1942 out_err:
1943 pr_err("send_cap_releases mds%d, failed to allocate message\n",
1944 session->s_mds);
1945 spin_lock(&session->s_cap_lock);
1946 list_splice(&tmp_list, &session->s_cap_releases);
1947 session->s_num_cap_releases += num_cap_releases;
1948 spin_unlock(&session->s_cap_lock);
1949 }
1950
1951 static void ceph_cap_release_work(struct work_struct *work)
1952 {
1953 struct ceph_mds_session *session =
1954 container_of(work, struct ceph_mds_session, s_cap_release_work);
1955
1956 mutex_lock(&session->s_mutex);
1957 if (session->s_state == CEPH_MDS_SESSION_OPEN ||
1958 session->s_state == CEPH_MDS_SESSION_HUNG)
1959 ceph_send_cap_releases(session->s_mdsc, session);
1960 mutex_unlock(&session->s_mutex);
1961 ceph_put_mds_session(session);
1962 }
1963
1964 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
1965 struct ceph_mds_session *session)
1966 {
1967 if (mdsc->stopping)
1968 return;
1969
1970 get_session(session);
1971 if (queue_work(mdsc->fsc->cap_wq,
1972 &session->s_cap_release_work)) {
1973 dout("cap release work queued\n");
1974 } else {
1975 ceph_put_mds_session(session);
1976 dout("failed to queue cap release work\n");
1977 }
1978 }
1979
1980
1981
1982
1983 void __ceph_queue_cap_release(struct ceph_mds_session *session,
1984 struct ceph_cap *cap)
1985 {
1986 list_add_tail(&cap->session_caps, &session->s_cap_releases);
1987 session->s_num_cap_releases++;
1988
1989 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
1990 ceph_flush_cap_releases(session->s_mdsc, session);
1991 }
1992
1993 static void ceph_cap_reclaim_work(struct work_struct *work)
1994 {
1995 struct ceph_mds_client *mdsc =
1996 container_of(work, struct ceph_mds_client, cap_reclaim_work);
1997 int ret = ceph_trim_dentries(mdsc);
1998 if (ret == -EAGAIN)
1999 ceph_queue_cap_reclaim_work(mdsc);
2000 }
2001
2002 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2003 {
2004 if (mdsc->stopping)
2005 return;
2006
2007 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2008 dout("caps reclaim work queued\n");
2009 } else {
2010 dout("failed to queue caps release work\n");
2011 }
2012 }
2013
2014 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2015 {
2016 int val;
2017 if (!nr)
2018 return;
2019 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2020 if (!(val % CEPH_CAPS_PER_RELEASE)) {
2021 atomic_set(&mdsc->cap_reclaim_pending, 0);
2022 ceph_queue_cap_reclaim_work(mdsc);
2023 }
2024 }
2025
2026
2027
2028
2029
2030 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2031 struct inode *dir)
2032 {
2033 struct ceph_inode_info *ci = ceph_inode(dir);
2034 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2035 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2036 size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2037 int order, num_entries;
2038
2039 spin_lock(&ci->i_ceph_lock);
2040 num_entries = ci->i_files + ci->i_subdirs;
2041 spin_unlock(&ci->i_ceph_lock);
2042 num_entries = max(num_entries, 1);
2043 num_entries = min(num_entries, opt->max_readdir);
2044
2045 order = get_order(size * num_entries);
2046 while (order >= 0) {
2047 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2048 __GFP_NOWARN,
2049 order);
2050 if (rinfo->dir_entries)
2051 break;
2052 order--;
2053 }
2054 if (!rinfo->dir_entries)
2055 return -ENOMEM;
2056
2057 num_entries = (PAGE_SIZE << order) / size;
2058 num_entries = min(num_entries, opt->max_readdir);
2059
2060 rinfo->dir_buf_size = PAGE_SIZE << order;
2061 req->r_num_caps = num_entries + 1;
2062 req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2063 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2064 return 0;
2065 }
2066
2067
2068
2069
2070 struct ceph_mds_request *
2071 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2072 {
2073 struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
2074 struct timespec64 ts;
2075
2076 if (!req)
2077 return ERR_PTR(-ENOMEM);
2078
2079 mutex_init(&req->r_fill_mutex);
2080 req->r_mdsc = mdsc;
2081 req->r_started = jiffies;
2082 req->r_resend_mds = -1;
2083 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2084 INIT_LIST_HEAD(&req->r_unsafe_target_item);
2085 req->r_fmode = -1;
2086 kref_init(&req->r_kref);
2087 RB_CLEAR_NODE(&req->r_node);
2088 INIT_LIST_HEAD(&req->r_wait);
2089 init_completion(&req->r_completion);
2090 init_completion(&req->r_safe_completion);
2091 INIT_LIST_HEAD(&req->r_unsafe_item);
2092
2093 ktime_get_coarse_real_ts64(&ts);
2094 req->r_stamp = timespec64_trunc(ts, mdsc->fsc->sb->s_time_gran);
2095
2096 req->r_op = op;
2097 req->r_direct_mode = mode;
2098 return req;
2099 }
2100
2101
2102
2103
2104
2105
2106 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2107 {
2108 if (RB_EMPTY_ROOT(&mdsc->request_tree))
2109 return NULL;
2110 return rb_entry(rb_first(&mdsc->request_tree),
2111 struct ceph_mds_request, r_node);
2112 }
2113
2114 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2115 {
2116 return mdsc->oldest_tid;
2117 }
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2130 int stop_on_nosnap)
2131 {
2132 struct dentry *temp;
2133 char *path;
2134 int pos;
2135 unsigned seq;
2136 u64 base;
2137
2138 if (!dentry)
2139 return ERR_PTR(-EINVAL);
2140
2141 path = __getname();
2142 if (!path)
2143 return ERR_PTR(-ENOMEM);
2144 retry:
2145 pos = PATH_MAX - 1;
2146 path[pos] = '\0';
2147
2148 seq = read_seqbegin(&rename_lock);
2149 rcu_read_lock();
2150 temp = dentry;
2151 for (;;) {
2152 struct inode *inode;
2153
2154 spin_lock(&temp->d_lock);
2155 inode = d_inode(temp);
2156 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2157 dout("build_path path+%d: %p SNAPDIR\n",
2158 pos, temp);
2159 } else if (stop_on_nosnap && inode && dentry != temp &&
2160 ceph_snap(inode) == CEPH_NOSNAP) {
2161 spin_unlock(&temp->d_lock);
2162 pos++;
2163 break;
2164 } else {
2165 pos -= temp->d_name.len;
2166 if (pos < 0) {
2167 spin_unlock(&temp->d_lock);
2168 break;
2169 }
2170 memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2171 }
2172 spin_unlock(&temp->d_lock);
2173 temp = READ_ONCE(temp->d_parent);
2174
2175
2176 if (IS_ROOT(temp))
2177 break;
2178
2179
2180 if (--pos < 0)
2181 break;
2182
2183 path[pos] = '/';
2184 }
2185 base = ceph_ino(d_inode(temp));
2186 rcu_read_unlock();
2187 if (pos < 0 || read_seqretry(&rename_lock, seq)) {
2188 pr_err("build_path did not end path lookup where "
2189 "expected, pos is %d\n", pos);
2190
2191
2192
2193
2194 goto retry;
2195 }
2196
2197 *pbase = base;
2198 *plen = PATH_MAX - 1 - pos;
2199 dout("build_path on %p %d built %llx '%.*s'\n",
2200 dentry, d_count(dentry), base, *plen, path + pos);
2201 return path + pos;
2202 }
2203
2204 static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2205 const char **ppath, int *ppathlen, u64 *pino,
2206 bool *pfreepath, bool parent_locked)
2207 {
2208 char *path;
2209
2210 rcu_read_lock();
2211 if (!dir)
2212 dir = d_inode_rcu(dentry->d_parent);
2213 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2214 *pino = ceph_ino(dir);
2215 rcu_read_unlock();
2216 *ppath = dentry->d_name.name;
2217 *ppathlen = dentry->d_name.len;
2218 return 0;
2219 }
2220 rcu_read_unlock();
2221 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2222 if (IS_ERR(path))
2223 return PTR_ERR(path);
2224 *ppath = path;
2225 *pfreepath = true;
2226 return 0;
2227 }
2228
2229 static int build_inode_path(struct inode *inode,
2230 const char **ppath, int *ppathlen, u64 *pino,
2231 bool *pfreepath)
2232 {
2233 struct dentry *dentry;
2234 char *path;
2235
2236 if (ceph_snap(inode) == CEPH_NOSNAP) {
2237 *pino = ceph_ino(inode);
2238 *ppathlen = 0;
2239 return 0;
2240 }
2241 dentry = d_find_alias(inode);
2242 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2243 dput(dentry);
2244 if (IS_ERR(path))
2245 return PTR_ERR(path);
2246 *ppath = path;
2247 *pfreepath = true;
2248 return 0;
2249 }
2250
2251
2252
2253
2254
2255 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2256 struct inode *rdiri, const char *rpath,
2257 u64 rino, const char **ppath, int *pathlen,
2258 u64 *ino, bool *freepath, bool parent_locked)
2259 {
2260 int r = 0;
2261
2262 if (rinode) {
2263 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2264 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2265 ceph_snap(rinode));
2266 } else if (rdentry) {
2267 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2268 freepath, parent_locked);
2269 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2270 *ppath);
2271 } else if (rpath || rino) {
2272 *ino = rino;
2273 *ppath = rpath;
2274 *pathlen = rpath ? strlen(rpath) : 0;
2275 dout(" path %.*s\n", *pathlen, rpath);
2276 }
2277
2278 return r;
2279 }
2280
2281
2282
2283
2284 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
2285 struct ceph_mds_request *req,
2286 int mds, bool drop_cap_releases)
2287 {
2288 struct ceph_msg *msg;
2289 struct ceph_mds_request_head *head;
2290 const char *path1 = NULL;
2291 const char *path2 = NULL;
2292 u64 ino1 = 0, ino2 = 0;
2293 int pathlen1 = 0, pathlen2 = 0;
2294 bool freepath1 = false, freepath2 = false;
2295 int len;
2296 u16 releases;
2297 void *p, *end;
2298 int ret;
2299
2300 ret = set_request_path_attr(req->r_inode, req->r_dentry,
2301 req->r_parent, req->r_path1, req->r_ino1.ino,
2302 &path1, &pathlen1, &ino1, &freepath1,
2303 test_bit(CEPH_MDS_R_PARENT_LOCKED,
2304 &req->r_req_flags));
2305 if (ret < 0) {
2306 msg = ERR_PTR(ret);
2307 goto out;
2308 }
2309
2310
2311 ret = set_request_path_attr(NULL, req->r_old_dentry,
2312 req->r_old_dentry_dir,
2313 req->r_path2, req->r_ino2.ino,
2314 &path2, &pathlen2, &ino2, &freepath2, true);
2315 if (ret < 0) {
2316 msg = ERR_PTR(ret);
2317 goto out_free1;
2318 }
2319
2320 len = sizeof(*head) +
2321 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2322 sizeof(struct ceph_timespec);
2323
2324
2325 len += sizeof(struct ceph_mds_request_release) *
2326 (!!req->r_inode_drop + !!req->r_dentry_drop +
2327 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2328 if (req->r_dentry_drop)
2329 len += pathlen1;
2330 if (req->r_old_dentry_drop)
2331 len += pathlen2;
2332
2333 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2334 if (!msg) {
2335 msg = ERR_PTR(-ENOMEM);
2336 goto out_free2;
2337 }
2338
2339 msg->hdr.version = cpu_to_le16(2);
2340 msg->hdr.tid = cpu_to_le64(req->r_tid);
2341
2342 head = msg->front.iov_base;
2343 p = msg->front.iov_base + sizeof(*head);
2344 end = msg->front.iov_base + msg->front.iov_len;
2345
2346 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2347 head->op = cpu_to_le32(req->r_op);
2348 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
2349 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
2350 head->args = req->r_args;
2351
2352 ceph_encode_filepath(&p, end, ino1, path1);
2353 ceph_encode_filepath(&p, end, ino2, path2);
2354
2355
2356 req->r_request_release_offset = p - msg->front.iov_base;
2357
2358
2359 releases = 0;
2360 if (req->r_inode_drop)
2361 releases += ceph_encode_inode_release(&p,
2362 req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2363 mds, req->r_inode_drop, req->r_inode_unless, 0);
2364 if (req->r_dentry_drop)
2365 releases += ceph_encode_dentry_release(&p, req->r_dentry,
2366 req->r_parent, mds, req->r_dentry_drop,
2367 req->r_dentry_unless);
2368 if (req->r_old_dentry_drop)
2369 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2370 req->r_old_dentry_dir, mds,
2371 req->r_old_dentry_drop,
2372 req->r_old_dentry_unless);
2373 if (req->r_old_inode_drop)
2374 releases += ceph_encode_inode_release(&p,
2375 d_inode(req->r_old_dentry),
2376 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2377
2378 if (drop_cap_releases) {
2379 releases = 0;
2380 p = msg->front.iov_base + req->r_request_release_offset;
2381 }
2382
2383 head->num_releases = cpu_to_le16(releases);
2384
2385
2386 {
2387 struct ceph_timespec ts;
2388 ceph_encode_timespec64(&ts, &req->r_stamp);
2389 ceph_encode_copy(&p, &ts, sizeof(ts));
2390 }
2391
2392 BUG_ON(p > end);
2393 msg->front.iov_len = p - msg->front.iov_base;
2394 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2395
2396 if (req->r_pagelist) {
2397 struct ceph_pagelist *pagelist = req->r_pagelist;
2398 ceph_msg_data_add_pagelist(msg, pagelist);
2399 msg->hdr.data_len = cpu_to_le32(pagelist->length);
2400 } else {
2401 msg->hdr.data_len = 0;
2402 }
2403
2404 msg->hdr.data_off = cpu_to_le16(0);
2405
2406 out_free2:
2407 if (freepath2)
2408 ceph_mdsc_free_path((char *)path2, pathlen2);
2409 out_free1:
2410 if (freepath1)
2411 ceph_mdsc_free_path((char *)path1, pathlen1);
2412 out:
2413 return msg;
2414 }
2415
2416
2417
2418
2419
2420 static void complete_request(struct ceph_mds_client *mdsc,
2421 struct ceph_mds_request *req)
2422 {
2423 if (req->r_callback)
2424 req->r_callback(mdsc, req);
2425 complete_all(&req->r_completion);
2426 }
2427
2428
2429
2430
2431 static int __prepare_send_request(struct ceph_mds_client *mdsc,
2432 struct ceph_mds_request *req,
2433 int mds, bool drop_cap_releases)
2434 {
2435 struct ceph_mds_request_head *rhead;
2436 struct ceph_msg *msg;
2437 int flags = 0;
2438
2439 req->r_attempts++;
2440 if (req->r_inode) {
2441 struct ceph_cap *cap =
2442 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2443
2444 if (cap)
2445 req->r_sent_on_mseq = cap->mseq;
2446 else
2447 req->r_sent_on_mseq = -1;
2448 }
2449 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2450 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2451
2452 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2453 void *p;
2454
2455
2456
2457
2458
2459
2460 msg = req->r_request;
2461 rhead = msg->front.iov_base;
2462
2463 flags = le32_to_cpu(rhead->flags);
2464 flags |= CEPH_MDS_FLAG_REPLAY;
2465 rhead->flags = cpu_to_le32(flags);
2466
2467 if (req->r_target_inode)
2468 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2469
2470 rhead->num_retry = req->r_attempts - 1;
2471
2472
2473 rhead->num_releases = 0;
2474
2475
2476 p = msg->front.iov_base + req->r_request_release_offset;
2477 {
2478 struct ceph_timespec ts;
2479 ceph_encode_timespec64(&ts, &req->r_stamp);
2480 ceph_encode_copy(&p, &ts, sizeof(ts));
2481 }
2482
2483 msg->front.iov_len = p - msg->front.iov_base;
2484 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2485 return 0;
2486 }
2487
2488 if (req->r_request) {
2489 ceph_msg_put(req->r_request);
2490 req->r_request = NULL;
2491 }
2492 msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2493 if (IS_ERR(msg)) {
2494 req->r_err = PTR_ERR(msg);
2495 return PTR_ERR(msg);
2496 }
2497 req->r_request = msg;
2498
2499 rhead = msg->front.iov_base;
2500 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2501 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2502 flags |= CEPH_MDS_FLAG_REPLAY;
2503 if (req->r_parent)
2504 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2505 rhead->flags = cpu_to_le32(flags);
2506 rhead->num_fwd = req->r_num_fwd;
2507 rhead->num_retry = req->r_attempts - 1;
2508 rhead->ino = 0;
2509
2510 dout(" r_parent = %p\n", req->r_parent);
2511 return 0;
2512 }
2513
2514
2515
2516
2517 static void __do_request(struct ceph_mds_client *mdsc,
2518 struct ceph_mds_request *req)
2519 {
2520 struct ceph_mds_session *session = NULL;
2521 int mds = -1;
2522 int err = 0;
2523
2524 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2525 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2526 __unregister_request(mdsc, req);
2527 return;
2528 }
2529
2530 if (req->r_timeout &&
2531 time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2532 dout("do_request timed out\n");
2533 err = -EIO;
2534 goto finish;
2535 }
2536 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2537 dout("do_request forced umount\n");
2538 err = -EIO;
2539 goto finish;
2540 }
2541 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2542 if (mdsc->mdsmap_err) {
2543 err = mdsc->mdsmap_err;
2544 dout("do_request mdsmap err %d\n", err);
2545 goto finish;
2546 }
2547 if (mdsc->mdsmap->m_epoch == 0) {
2548 dout("do_request no mdsmap, waiting for map\n");
2549 list_add(&req->r_wait, &mdsc->waiting_for_map);
2550 return;
2551 }
2552 if (!(mdsc->fsc->mount_options->flags &
2553 CEPH_MOUNT_OPT_MOUNTWAIT) &&
2554 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2555 err = -EHOSTUNREACH;
2556 goto finish;
2557 }
2558 }
2559
2560 put_request_session(req);
2561
2562 mds = __choose_mds(mdsc, req);
2563 if (mds < 0 ||
2564 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2565 dout("do_request no mds or not active, waiting for map\n");
2566 list_add(&req->r_wait, &mdsc->waiting_for_map);
2567 return;
2568 }
2569
2570
2571 session = __ceph_lookup_mds_session(mdsc, mds);
2572 if (!session) {
2573 session = register_session(mdsc, mds);
2574 if (IS_ERR(session)) {
2575 err = PTR_ERR(session);
2576 goto finish;
2577 }
2578 }
2579 req->r_session = get_session(session);
2580
2581 dout("do_request mds%d session %p state %s\n", mds, session,
2582 ceph_session_state_name(session->s_state));
2583 if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2584 session->s_state != CEPH_MDS_SESSION_HUNG) {
2585 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2586 err = -EACCES;
2587 goto out_session;
2588 }
2589 if (session->s_state == CEPH_MDS_SESSION_NEW ||
2590 session->s_state == CEPH_MDS_SESSION_CLOSING)
2591 __open_session(mdsc, session);
2592 list_add(&req->r_wait, &session->s_waiting);
2593 goto out_session;
2594 }
2595
2596
2597 req->r_resend_mds = -1;
2598
2599 if (req->r_request_started == 0)
2600 req->r_request_started = jiffies;
2601
2602 err = __prepare_send_request(mdsc, req, mds, false);
2603 if (!err) {
2604 ceph_msg_get(req->r_request);
2605 ceph_con_send(&session->s_con, req->r_request);
2606 }
2607
2608 out_session:
2609 ceph_put_mds_session(session);
2610 finish:
2611 if (err) {
2612 dout("__do_request early error %d\n", err);
2613 req->r_err = err;
2614 complete_request(mdsc, req);
2615 __unregister_request(mdsc, req);
2616 }
2617 return;
2618 }
2619
2620
2621
2622
2623 static void __wake_requests(struct ceph_mds_client *mdsc,
2624 struct list_head *head)
2625 {
2626 struct ceph_mds_request *req;
2627 LIST_HEAD(tmp_list);
2628
2629 list_splice_init(head, &tmp_list);
2630
2631 while (!list_empty(&tmp_list)) {
2632 req = list_entry(tmp_list.next,
2633 struct ceph_mds_request, r_wait);
2634 list_del_init(&req->r_wait);
2635 dout(" wake request %p tid %llu\n", req, req->r_tid);
2636 __do_request(mdsc, req);
2637 }
2638 }
2639
2640
2641
2642
2643
2644 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2645 {
2646 struct ceph_mds_request *req;
2647 struct rb_node *p = rb_first(&mdsc->request_tree);
2648
2649 dout("kick_requests mds%d\n", mds);
2650 while (p) {
2651 req = rb_entry(p, struct ceph_mds_request, r_node);
2652 p = rb_next(p);
2653 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2654 continue;
2655 if (req->r_attempts > 0)
2656 continue;
2657 if (req->r_session &&
2658 req->r_session->s_mds == mds) {
2659 dout(" kicking tid %llu\n", req->r_tid);
2660 list_del_init(&req->r_wait);
2661 __do_request(mdsc, req);
2662 }
2663 }
2664 }
2665
2666 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2667 struct ceph_mds_request *req)
2668 {
2669 int err;
2670
2671
2672 if (req->r_inode)
2673 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2674 if (req->r_parent) {
2675 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
2676 ihold(req->r_parent);
2677 }
2678 if (req->r_old_dentry_dir)
2679 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2680 CEPH_CAP_PIN);
2681
2682 dout("submit_request on %p for inode %p\n", req, dir);
2683 mutex_lock(&mdsc->mutex);
2684 __register_request(mdsc, req, dir);
2685 __do_request(mdsc, req);
2686 err = req->r_err;
2687 mutex_unlock(&mdsc->mutex);
2688 return err;
2689 }
2690
2691 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
2692 struct ceph_mds_request *req)
2693 {
2694 int err;
2695
2696
2697 dout("do_request waiting\n");
2698 if (!req->r_timeout && req->r_wait_for_completion) {
2699 err = req->r_wait_for_completion(mdsc, req);
2700 } else {
2701 long timeleft = wait_for_completion_killable_timeout(
2702 &req->r_completion,
2703 ceph_timeout_jiffies(req->r_timeout));
2704 if (timeleft > 0)
2705 err = 0;
2706 else if (!timeleft)
2707 err = -EIO;
2708 else
2709 err = timeleft;
2710 }
2711 dout("do_request waited, got %d\n", err);
2712 mutex_lock(&mdsc->mutex);
2713
2714
2715 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2716 err = le32_to_cpu(req->r_reply_info.head->result);
2717 } else if (err < 0) {
2718 dout("aborted request %lld with %d\n", req->r_tid, err);
2719
2720
2721
2722
2723
2724
2725 mutex_lock(&req->r_fill_mutex);
2726 req->r_err = err;
2727 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
2728 mutex_unlock(&req->r_fill_mutex);
2729
2730 if (req->r_parent &&
2731 (req->r_op & CEPH_MDS_OP_WRITE))
2732 ceph_invalidate_dir_request(req);
2733 } else {
2734 err = req->r_err;
2735 }
2736
2737 mutex_unlock(&mdsc->mutex);
2738 return err;
2739 }
2740
2741
2742
2743
2744
2745 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2746 struct inode *dir,
2747 struct ceph_mds_request *req)
2748 {
2749 int err;
2750
2751 dout("do_request on %p\n", req);
2752
2753
2754 err = ceph_mdsc_submit_request(mdsc, dir, req);
2755 if (!err)
2756 err = ceph_mdsc_wait_request(mdsc, req);
2757 dout("do_request %p done, result %d\n", req, err);
2758 return err;
2759 }
2760
2761
2762
2763
2764
2765 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2766 {
2767 struct inode *dir = req->r_parent;
2768 struct inode *old_dir = req->r_old_dentry_dir;
2769
2770 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
2771
2772 ceph_dir_clear_complete(dir);
2773 if (old_dir)
2774 ceph_dir_clear_complete(old_dir);
2775 if (req->r_dentry)
2776 ceph_invalidate_dentry_lease(req->r_dentry);
2777 if (req->r_old_dentry)
2778 ceph_invalidate_dentry_lease(req->r_old_dentry);
2779 }
2780
2781
2782
2783
2784
2785
2786
2787
2788 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2789 {
2790 struct ceph_mds_client *mdsc = session->s_mdsc;
2791 struct ceph_mds_request *req;
2792 struct ceph_mds_reply_head *head = msg->front.iov_base;
2793 struct ceph_mds_reply_info_parsed *rinfo;
2794 struct ceph_snap_realm *realm;
2795 u64 tid;
2796 int err, result;
2797 int mds = session->s_mds;
2798
2799 if (msg->front.iov_len < sizeof(*head)) {
2800 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2801 ceph_msg_dump(msg);
2802 return;
2803 }
2804
2805
2806 tid = le64_to_cpu(msg->hdr.tid);
2807 mutex_lock(&mdsc->mutex);
2808 req = lookup_get_request(mdsc, tid);
2809 if (!req) {
2810 dout("handle_reply on unknown tid %llu\n", tid);
2811 mutex_unlock(&mdsc->mutex);
2812 return;
2813 }
2814 dout("handle_reply %p\n", req);
2815
2816
2817 if (req->r_session != session) {
2818 pr_err("mdsc_handle_reply got %llu on session mds%d"
2819 " not mds%d\n", tid, session->s_mds,
2820 req->r_session ? req->r_session->s_mds : -1);
2821 mutex_unlock(&mdsc->mutex);
2822 goto out;
2823 }
2824
2825
2826 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
2827 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
2828 pr_warn("got a dup %s reply on %llu from mds%d\n",
2829 head->safe ? "safe" : "unsafe", tid, mds);
2830 mutex_unlock(&mdsc->mutex);
2831 goto out;
2832 }
2833 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
2834 pr_warn("got unsafe after safe on %llu from mds%d\n",
2835 tid, mds);
2836 mutex_unlock(&mdsc->mutex);
2837 goto out;
2838 }
2839
2840 result = le32_to_cpu(head->result);
2841
2842
2843
2844
2845
2846
2847
2848
2849 if (result == -ESTALE) {
2850 dout("got ESTALE on request %llu\n", req->r_tid);
2851 req->r_resend_mds = -1;
2852 if (req->r_direct_mode != USE_AUTH_MDS) {
2853 dout("not using auth, setting for that now\n");
2854 req->r_direct_mode = USE_AUTH_MDS;
2855 __do_request(mdsc, req);
2856 mutex_unlock(&mdsc->mutex);
2857 goto out;
2858 } else {
2859 int mds = __choose_mds(mdsc, req);
2860 if (mds >= 0 && mds != req->r_session->s_mds) {
2861 dout("but auth changed, so resending\n");
2862 __do_request(mdsc, req);
2863 mutex_unlock(&mdsc->mutex);
2864 goto out;
2865 }
2866 }
2867 dout("have to return ESTALE on request %llu\n", req->r_tid);
2868 }
2869
2870
2871 if (head->safe) {
2872 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
2873 __unregister_request(mdsc, req);
2874
2875 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2876
2877
2878
2879
2880
2881
2882
2883 dout("got safe reply %llu, mds%d\n", tid, mds);
2884
2885
2886 if (mdsc->stopping && !__get_oldest_req(mdsc))
2887 complete_all(&mdsc->safe_umount_waiters);
2888 mutex_unlock(&mdsc->mutex);
2889 goto out;
2890 }
2891 } else {
2892 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
2893 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2894 if (req->r_unsafe_dir) {
2895 struct ceph_inode_info *ci =
2896 ceph_inode(req->r_unsafe_dir);
2897 spin_lock(&ci->i_unsafe_lock);
2898 list_add_tail(&req->r_unsafe_dir_item,
2899 &ci->i_unsafe_dirops);
2900 spin_unlock(&ci->i_unsafe_lock);
2901 }
2902 }
2903
2904 dout("handle_reply tid %lld result %d\n", tid, result);
2905 rinfo = &req->r_reply_info;
2906 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
2907 err = parse_reply_info(msg, rinfo, (u64)-1);
2908 else
2909 err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2910 mutex_unlock(&mdsc->mutex);
2911
2912 mutex_lock(&session->s_mutex);
2913 if (err < 0) {
2914 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2915 ceph_msg_dump(msg);
2916 goto out_err;
2917 }
2918
2919
2920 realm = NULL;
2921 if (rinfo->snapblob_len) {
2922 down_write(&mdsc->snap_rwsem);
2923 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2924 rinfo->snapblob + rinfo->snapblob_len,
2925 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2926 &realm);
2927 downgrade_write(&mdsc->snap_rwsem);
2928 } else {
2929 down_read(&mdsc->snap_rwsem);
2930 }
2931
2932
2933 mutex_lock(&req->r_fill_mutex);
2934 current->journal_info = req;
2935 err = ceph_fill_trace(mdsc->fsc->sb, req);
2936 if (err == 0) {
2937 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2938 req->r_op == CEPH_MDS_OP_LSSNAP))
2939 ceph_readdir_prepopulate(req, req->r_session);
2940 }
2941 current->journal_info = NULL;
2942 mutex_unlock(&req->r_fill_mutex);
2943
2944 up_read(&mdsc->snap_rwsem);
2945 if (realm)
2946 ceph_put_snap_realm(mdsc, realm);
2947
2948 if (err == 0) {
2949 if (req->r_target_inode &&
2950 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2951 struct ceph_inode_info *ci =
2952 ceph_inode(req->r_target_inode);
2953 spin_lock(&ci->i_unsafe_lock);
2954 list_add_tail(&req->r_unsafe_target_item,
2955 &ci->i_unsafe_iops);
2956 spin_unlock(&ci->i_unsafe_lock);
2957 }
2958
2959 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2960 }
2961 out_err:
2962 mutex_lock(&mdsc->mutex);
2963 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2964 if (err) {
2965 req->r_err = err;
2966 } else {
2967 req->r_reply = ceph_msg_get(msg);
2968 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
2969 }
2970 } else {
2971 dout("reply arrived after request %lld was aborted\n", tid);
2972 }
2973 mutex_unlock(&mdsc->mutex);
2974
2975 mutex_unlock(&session->s_mutex);
2976
2977
2978 complete_request(mdsc, req);
2979 out:
2980 ceph_mdsc_put_request(req);
2981 return;
2982 }
2983
2984
2985
2986
2987
2988
2989 static void handle_forward(struct ceph_mds_client *mdsc,
2990 struct ceph_mds_session *session,
2991 struct ceph_msg *msg)
2992 {
2993 struct ceph_mds_request *req;
2994 u64 tid = le64_to_cpu(msg->hdr.tid);
2995 u32 next_mds;
2996 u32 fwd_seq;
2997 int err = -EINVAL;
2998 void *p = msg->front.iov_base;
2999 void *end = p + msg->front.iov_len;
3000
3001 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3002 next_mds = ceph_decode_32(&p);
3003 fwd_seq = ceph_decode_32(&p);
3004
3005 mutex_lock(&mdsc->mutex);
3006 req = lookup_get_request(mdsc, tid);
3007 if (!req) {
3008 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3009 goto out;
3010 }
3011
3012 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3013 dout("forward tid %llu aborted, unregistering\n", tid);
3014 __unregister_request(mdsc, req);
3015 } else if (fwd_seq <= req->r_num_fwd) {
3016 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3017 tid, next_mds, req->r_num_fwd, fwd_seq);
3018 } else {
3019
3020 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3021 BUG_ON(req->r_err);
3022 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3023 req->r_attempts = 0;
3024 req->r_num_fwd = fwd_seq;
3025 req->r_resend_mds = next_mds;
3026 put_request_session(req);
3027 __do_request(mdsc, req);
3028 }
3029 ceph_mdsc_put_request(req);
3030 out:
3031 mutex_unlock(&mdsc->mutex);
3032 return;
3033
3034 bad:
3035 pr_err("mdsc_handle_forward decode error err=%d\n", err);
3036 }
3037
3038 static int __decode_session_metadata(void **p, void *end,
3039 bool *blacklisted)
3040 {
3041
3042 u32 n;
3043 bool err_str;
3044 ceph_decode_32_safe(p, end, n, bad);
3045 while (n-- > 0) {
3046 u32 len;
3047 ceph_decode_32_safe(p, end, len, bad);
3048 ceph_decode_need(p, end, len, bad);
3049 err_str = !strncmp(*p, "error_string", len);
3050 *p += len;
3051 ceph_decode_32_safe(p, end, len, bad);
3052 ceph_decode_need(p, end, len, bad);
3053 if (err_str && strnstr(*p, "blacklisted", len))
3054 *blacklisted = true;
3055 *p += len;
3056 }
3057 return 0;
3058 bad:
3059 return -1;
3060 }
3061
3062
3063
3064
3065 static void handle_session(struct ceph_mds_session *session,
3066 struct ceph_msg *msg)
3067 {
3068 struct ceph_mds_client *mdsc = session->s_mdsc;
3069 int mds = session->s_mds;
3070 int msg_version = le16_to_cpu(msg->hdr.version);
3071 void *p = msg->front.iov_base;
3072 void *end = p + msg->front.iov_len;
3073 struct ceph_mds_session_head *h;
3074 u32 op;
3075 u64 seq, features = 0;
3076 int wake = 0;
3077 bool blacklisted = false;
3078
3079
3080 ceph_decode_need(&p, end, sizeof(*h), bad);
3081 h = p;
3082 p += sizeof(*h);
3083
3084 op = le32_to_cpu(h->op);
3085 seq = le64_to_cpu(h->seq);
3086
3087 if (msg_version >= 3) {
3088 u32 len;
3089
3090 if (__decode_session_metadata(&p, end, &blacklisted) < 0)
3091 goto bad;
3092
3093 ceph_decode_32_safe(&p, end, len, bad);
3094 ceph_decode_64_safe(&p, end, features, bad);
3095 p += len - sizeof(features);
3096 }
3097
3098 mutex_lock(&mdsc->mutex);
3099 if (op == CEPH_SESSION_CLOSE) {
3100 get_session(session);
3101 __unregister_session(mdsc, session);
3102 }
3103
3104 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3105 mutex_unlock(&mdsc->mutex);
3106
3107 mutex_lock(&session->s_mutex);
3108
3109 dout("handle_session mds%d %s %p state %s seq %llu\n",
3110 mds, ceph_session_op_name(op), session,
3111 ceph_session_state_name(session->s_state), seq);
3112
3113 if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3114 session->s_state = CEPH_MDS_SESSION_OPEN;
3115 pr_info("mds%d came back\n", session->s_mds);
3116 }
3117
3118 switch (op) {
3119 case CEPH_SESSION_OPEN:
3120 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3121 pr_info("mds%d reconnect success\n", session->s_mds);
3122 session->s_state = CEPH_MDS_SESSION_OPEN;
3123 session->s_features = features;
3124 renewed_caps(mdsc, session, 0);
3125 wake = 1;
3126 if (mdsc->stopping)
3127 __close_session(mdsc, session);
3128 break;
3129
3130 case CEPH_SESSION_RENEWCAPS:
3131 if (session->s_renew_seq == seq)
3132 renewed_caps(mdsc, session, 1);
3133 break;
3134
3135 case CEPH_SESSION_CLOSE:
3136 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3137 pr_info("mds%d reconnect denied\n", session->s_mds);
3138 cleanup_session_requests(mdsc, session);
3139 remove_session_caps(session);
3140 wake = 2;
3141 wake_up_all(&mdsc->session_close_wq);
3142 break;
3143
3144 case CEPH_SESSION_STALE:
3145 pr_info("mds%d caps went stale, renewing\n",
3146 session->s_mds);
3147 spin_lock(&session->s_gen_ttl_lock);
3148 session->s_cap_gen++;
3149 session->s_cap_ttl = jiffies - 1;
3150 spin_unlock(&session->s_gen_ttl_lock);
3151 send_renew_caps(mdsc, session);
3152 break;
3153
3154 case CEPH_SESSION_RECALL_STATE:
3155 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3156 break;
3157
3158 case CEPH_SESSION_FLUSHMSG:
3159 send_flushmsg_ack(mdsc, session, seq);
3160 break;
3161
3162 case CEPH_SESSION_FORCE_RO:
3163 dout("force_session_readonly %p\n", session);
3164 spin_lock(&session->s_cap_lock);
3165 session->s_readonly = true;
3166 spin_unlock(&session->s_cap_lock);
3167 wake_up_session_caps(session, FORCE_RO);
3168 break;
3169
3170 case CEPH_SESSION_REJECT:
3171 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3172 pr_info("mds%d rejected session\n", session->s_mds);
3173 session->s_state = CEPH_MDS_SESSION_REJECTED;
3174 cleanup_session_requests(mdsc, session);
3175 remove_session_caps(session);
3176 if (blacklisted)
3177 mdsc->fsc->blacklisted = true;
3178 wake = 2;
3179 break;
3180
3181 default:
3182 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3183 WARN_ON(1);
3184 }
3185
3186 mutex_unlock(&session->s_mutex);
3187 if (wake) {
3188 mutex_lock(&mdsc->mutex);
3189 __wake_requests(mdsc, &session->s_waiting);
3190 if (wake == 2)
3191 kick_requests(mdsc, mds);
3192 mutex_unlock(&mdsc->mutex);
3193 }
3194 if (op == CEPH_SESSION_CLOSE)
3195 ceph_put_mds_session(session);
3196 return;
3197
3198 bad:
3199 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3200 (int)msg->front.iov_len);
3201 ceph_msg_dump(msg);
3202 return;
3203 }
3204
3205
3206
3207
3208
3209 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3210 struct ceph_mds_session *session)
3211 {
3212 struct ceph_mds_request *req, *nreq;
3213 struct rb_node *p;
3214 int err;
3215
3216 dout("replay_unsafe_requests mds%d\n", session->s_mds);
3217
3218 mutex_lock(&mdsc->mutex);
3219 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
3220 err = __prepare_send_request(mdsc, req, session->s_mds, true);
3221 if (!err) {
3222 ceph_msg_get(req->r_request);
3223 ceph_con_send(&session->s_con, req->r_request);
3224 }
3225 }
3226
3227
3228
3229
3230
3231 p = rb_first(&mdsc->request_tree);
3232 while (p) {
3233 req = rb_entry(p, struct ceph_mds_request, r_node);
3234 p = rb_next(p);
3235 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3236 continue;
3237 if (req->r_attempts == 0)
3238 continue;
3239 if (req->r_session &&
3240 req->r_session->s_mds == session->s_mds) {
3241 err = __prepare_send_request(mdsc, req,
3242 session->s_mds, true);
3243 if (!err) {
3244 ceph_msg_get(req->r_request);
3245 ceph_con_send(&session->s_con, req->r_request);
3246 }
3247 }
3248 }
3249 mutex_unlock(&mdsc->mutex);
3250 }
3251
3252 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3253 {
3254 struct ceph_msg *reply;
3255 struct ceph_pagelist *_pagelist;
3256 struct page *page;
3257 __le32 *addr;
3258 int err = -ENOMEM;
3259
3260 if (!recon_state->allow_multi)
3261 return -ENOSPC;
3262
3263
3264 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3265
3266
3267 _pagelist = ceph_pagelist_alloc(GFP_NOFS);
3268 if (!_pagelist)
3269 return -ENOMEM;
3270
3271 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3272 if (!reply)
3273 goto fail_msg;
3274
3275
3276 err = ceph_pagelist_encode_32(_pagelist, 0);
3277 if (err < 0)
3278 goto fail;
3279
3280 if (recon_state->nr_caps) {
3281
3282 err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3283 if (err)
3284 goto fail;
3285 } else {
3286
3287 err = ceph_pagelist_encode_32(_pagelist, 0);
3288 if (err < 0)
3289 goto fail;
3290 }
3291
3292 err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3293 if (err)
3294 goto fail;
3295
3296 page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3297 addr = kmap_atomic(page);
3298 if (recon_state->nr_caps) {
3299
3300 *addr = cpu_to_le32(recon_state->nr_caps);
3301 } else {
3302
3303 *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3304 }
3305 kunmap_atomic(addr);
3306
3307 reply->hdr.version = cpu_to_le16(5);
3308 reply->hdr.compat_version = cpu_to_le16(4);
3309
3310 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3311 ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3312
3313 ceph_con_send(&recon_state->session->s_con, reply);
3314 ceph_pagelist_release(recon_state->pagelist);
3315
3316 recon_state->pagelist = _pagelist;
3317 recon_state->nr_caps = 0;
3318 recon_state->nr_realms = 0;
3319 recon_state->msg_version = 5;
3320 return 0;
3321 fail:
3322 ceph_msg_put(reply);
3323 fail_msg:
3324 ceph_pagelist_release(_pagelist);
3325 return err;
3326 }
3327
3328
3329
3330
3331 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
3332 void *arg)
3333 {
3334 union {
3335 struct ceph_mds_cap_reconnect v2;
3336 struct ceph_mds_cap_reconnect_v1 v1;
3337 } rec;
3338 struct ceph_inode_info *ci = cap->ci;
3339 struct ceph_reconnect_state *recon_state = arg;
3340 struct ceph_pagelist *pagelist = recon_state->pagelist;
3341 int err;
3342 u64 snap_follows;
3343
3344 dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3345 inode, ceph_vinop(inode), cap, cap->cap_id,
3346 ceph_cap_string(cap->issued));
3347
3348 spin_lock(&ci->i_ceph_lock);
3349 cap->seq = 0;
3350 cap->issue_seq = 0;
3351 cap->mseq = 0;
3352 cap->cap_gen = cap->session->s_cap_gen;
3353
3354 if (recon_state->msg_version >= 2) {
3355 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3356 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3357 rec.v2.issued = cpu_to_le32(cap->issued);
3358 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3359 rec.v2.pathbase = 0;
3360 rec.v2.flock_len = (__force __le32)
3361 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3362 } else {
3363 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3364 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3365 rec.v1.issued = cpu_to_le32(cap->issued);
3366 rec.v1.size = cpu_to_le64(inode->i_size);
3367 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3368 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3369 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3370 rec.v1.pathbase = 0;
3371 }
3372
3373 if (list_empty(&ci->i_cap_snaps)) {
3374 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3375 } else {
3376 struct ceph_cap_snap *capsnap =
3377 list_first_entry(&ci->i_cap_snaps,
3378 struct ceph_cap_snap, ci_item);
3379 snap_follows = capsnap->follows;
3380 }
3381 spin_unlock(&ci->i_ceph_lock);
3382
3383 if (recon_state->msg_version >= 2) {
3384 int num_fcntl_locks, num_flock_locks;
3385 struct ceph_filelock *flocks = NULL;
3386 size_t struct_len, total_len = sizeof(u64);
3387 u8 struct_v = 0;
3388
3389 encode_again:
3390 if (rec.v2.flock_len) {
3391 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3392 } else {
3393 num_fcntl_locks = 0;
3394 num_flock_locks = 0;
3395 }
3396 if (num_fcntl_locks + num_flock_locks > 0) {
3397 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3398 sizeof(struct ceph_filelock),
3399 GFP_NOFS);
3400 if (!flocks) {
3401 err = -ENOMEM;
3402 goto out_err;
3403 }
3404 err = ceph_encode_locks_to_buffer(inode, flocks,
3405 num_fcntl_locks,
3406 num_flock_locks);
3407 if (err) {
3408 kfree(flocks);
3409 flocks = NULL;
3410 if (err == -ENOSPC)
3411 goto encode_again;
3412 goto out_err;
3413 }
3414 } else {
3415 kfree(flocks);
3416 flocks = NULL;
3417 }
3418
3419 if (recon_state->msg_version >= 3) {
3420
3421 total_len += 2 * sizeof(u8) + sizeof(u32);
3422 struct_v = 2;
3423 }
3424
3425
3426
3427 struct_len = 2 * sizeof(u32) +
3428 (num_fcntl_locks + num_flock_locks) *
3429 sizeof(struct ceph_filelock);
3430 rec.v2.flock_len = cpu_to_le32(struct_len);
3431
3432 struct_len += sizeof(u32) + sizeof(rec.v2);
3433
3434 if (struct_v >= 2)
3435 struct_len += sizeof(u64);
3436
3437 total_len += struct_len;
3438
3439 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3440 err = send_reconnect_partial(recon_state);
3441 if (err)
3442 goto out_freeflocks;
3443 pagelist = recon_state->pagelist;
3444 }
3445
3446 err = ceph_pagelist_reserve(pagelist, total_len);
3447 if (err)
3448 goto out_freeflocks;
3449
3450 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3451 if (recon_state->msg_version >= 3) {
3452 ceph_pagelist_encode_8(pagelist, struct_v);
3453 ceph_pagelist_encode_8(pagelist, 1);
3454 ceph_pagelist_encode_32(pagelist, struct_len);
3455 }
3456 ceph_pagelist_encode_string(pagelist, NULL, 0);
3457 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3458 ceph_locks_to_pagelist(flocks, pagelist,
3459 num_fcntl_locks, num_flock_locks);
3460 if (struct_v >= 2)
3461 ceph_pagelist_encode_64(pagelist, snap_follows);
3462 out_freeflocks:
3463 kfree(flocks);
3464 } else {
3465 u64 pathbase = 0;
3466 int pathlen = 0;
3467 char *path = NULL;
3468 struct dentry *dentry;
3469
3470 dentry = d_find_alias(inode);
3471 if (dentry) {
3472 path = ceph_mdsc_build_path(dentry,
3473 &pathlen, &pathbase, 0);
3474 dput(dentry);
3475 if (IS_ERR(path)) {
3476 err = PTR_ERR(path);
3477 goto out_err;
3478 }
3479 rec.v1.pathbase = cpu_to_le64(pathbase);
3480 }
3481
3482 err = ceph_pagelist_reserve(pagelist,
3483 sizeof(u64) + sizeof(u32) +
3484 pathlen + sizeof(rec.v1));
3485 if (err) {
3486 goto out_freepath;
3487 }
3488
3489 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3490 ceph_pagelist_encode_string(pagelist, path, pathlen);
3491 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3492 out_freepath:
3493 ceph_mdsc_free_path(path, pathlen);
3494 }
3495
3496 out_err:
3497 if (err >= 0)
3498 recon_state->nr_caps++;
3499 return err;
3500 }
3501
3502 static int encode_snap_realms(struct ceph_mds_client *mdsc,
3503 struct ceph_reconnect_state *recon_state)
3504 {
3505 struct rb_node *p;
3506 struct ceph_pagelist *pagelist = recon_state->pagelist;
3507 int err = 0;
3508
3509 if (recon_state->msg_version >= 4) {
3510 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3511 if (err < 0)
3512 goto fail;
3513 }
3514
3515
3516
3517
3518
3519
3520 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3521 struct ceph_snap_realm *realm =
3522 rb_entry(p, struct ceph_snap_realm, node);
3523 struct ceph_mds_snaprealm_reconnect sr_rec;
3524
3525 if (recon_state->msg_version >= 4) {
3526 size_t need = sizeof(u8) * 2 + sizeof(u32) +
3527 sizeof(sr_rec);
3528
3529 if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3530 err = send_reconnect_partial(recon_state);
3531 if (err)
3532 goto fail;
3533 pagelist = recon_state->pagelist;
3534 }
3535
3536 err = ceph_pagelist_reserve(pagelist, need);
3537 if (err)
3538 goto fail;
3539
3540 ceph_pagelist_encode_8(pagelist, 1);
3541 ceph_pagelist_encode_8(pagelist, 1);
3542 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3543 }
3544
3545 dout(" adding snap realm %llx seq %lld parent %llx\n",
3546 realm->ino, realm->seq, realm->parent_ino);
3547 sr_rec.ino = cpu_to_le64(realm->ino);
3548 sr_rec.seq = cpu_to_le64(realm->seq);
3549 sr_rec.parent = cpu_to_le64(realm->parent_ino);
3550
3551 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3552 if (err)
3553 goto fail;
3554
3555 recon_state->nr_realms++;
3556 }
3557 fail:
3558 return err;
3559 }
3560
3561
3562
3563
3564
3565
3566
3567
3568
3569
3570
3571
3572
3573
3574 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3575 struct ceph_mds_session *session)
3576 {
3577 struct ceph_msg *reply;
3578 int mds = session->s_mds;
3579 int err = -ENOMEM;
3580 struct ceph_reconnect_state recon_state = {
3581 .session = session,
3582 };
3583 LIST_HEAD(dispose);
3584
3585 pr_info("mds%d reconnect start\n", mds);
3586
3587 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3588 if (!recon_state.pagelist)
3589 goto fail_nopagelist;
3590
3591 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3592 if (!reply)
3593 goto fail_nomsg;
3594
3595 mutex_lock(&session->s_mutex);
3596 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
3597 session->s_seq = 0;
3598
3599 dout("session %p state %s\n", session,
3600 ceph_session_state_name(session->s_state));
3601
3602 spin_lock(&session->s_gen_ttl_lock);
3603 session->s_cap_gen++;
3604 spin_unlock(&session->s_gen_ttl_lock);
3605
3606 spin_lock(&session->s_cap_lock);
3607
3608 session->s_readonly = 0;
3609
3610
3611
3612
3613
3614 session->s_cap_reconnect = 1;
3615
3616 detach_cap_releases(session, &dispose);
3617 spin_unlock(&session->s_cap_lock);
3618 dispose_cap_releases(mdsc, &dispose);
3619
3620
3621 if (mdsc->fsc->sb->s_root)
3622 shrink_dcache_parent(mdsc->fsc->sb->s_root);
3623
3624 ceph_con_close(&session->s_con);
3625 ceph_con_open(&session->s_con,
3626 CEPH_ENTITY_TYPE_MDS, mds,
3627 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
3628
3629
3630 replay_unsafe_requests(mdsc, session);
3631
3632 ceph_early_kick_flushing_caps(mdsc, session);
3633
3634 down_read(&mdsc->snap_rwsem);
3635
3636
3637 err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
3638 if (err)
3639 goto fail;
3640
3641 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
3642 recon_state.msg_version = 3;
3643 recon_state.allow_multi = true;
3644 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
3645 recon_state.msg_version = 3;
3646 } else {
3647 recon_state.msg_version = 2;
3648 }
3649
3650 err = ceph_iterate_session_caps(session, encode_caps_cb, &recon_state);
3651
3652 spin_lock(&session->s_cap_lock);
3653 session->s_cap_reconnect = 0;
3654 spin_unlock(&session->s_cap_lock);
3655
3656 if (err < 0)
3657 goto fail;
3658
3659
3660 if (mdsc->num_snap_realms) {
3661 size_t total_len =
3662 recon_state.pagelist->length +
3663 mdsc->num_snap_realms *
3664 sizeof(struct ceph_mds_snaprealm_reconnect);
3665 if (recon_state.msg_version >= 4) {
3666
3667 total_len += sizeof(u32);
3668
3669 total_len += mdsc->num_snap_realms *
3670 (2 * sizeof(u8) + sizeof(u32));
3671 }
3672 if (total_len > RECONNECT_MAX_SIZE) {
3673 if (!recon_state.allow_multi) {
3674 err = -ENOSPC;
3675 goto fail;
3676 }
3677 if (recon_state.nr_caps) {
3678 err = send_reconnect_partial(&recon_state);
3679 if (err)
3680 goto fail;
3681 }
3682 recon_state.msg_version = 5;
3683 }
3684 }
3685
3686 err = encode_snap_realms(mdsc, &recon_state);
3687 if (err < 0)
3688 goto fail;
3689
3690 if (recon_state.msg_version >= 5) {
3691 err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
3692 if (err < 0)
3693 goto fail;
3694 }
3695
3696 if (recon_state.nr_caps || recon_state.nr_realms) {
3697 struct page *page =
3698 list_first_entry(&recon_state.pagelist->head,
3699 struct page, lru);
3700 __le32 *addr = kmap_atomic(page);
3701 if (recon_state.nr_caps) {
3702 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
3703 *addr = cpu_to_le32(recon_state.nr_caps);
3704 } else if (recon_state.msg_version >= 4) {
3705 *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
3706 }
3707 kunmap_atomic(addr);
3708 }
3709
3710 reply->hdr.version = cpu_to_le16(recon_state.msg_version);
3711 if (recon_state.msg_version >= 4)
3712 reply->hdr.compat_version = cpu_to_le16(4);
3713
3714 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
3715 ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
3716
3717 ceph_con_send(&session->s_con, reply);
3718
3719 mutex_unlock(&session->s_mutex);
3720
3721 mutex_lock(&mdsc->mutex);
3722 __wake_requests(mdsc, &session->s_waiting);
3723 mutex_unlock(&mdsc->mutex);
3724
3725 up_read(&mdsc->snap_rwsem);
3726 ceph_pagelist_release(recon_state.pagelist);
3727 return;
3728
3729 fail:
3730 ceph_msg_put(reply);
3731 up_read(&mdsc->snap_rwsem);
3732 mutex_unlock(&session->s_mutex);
3733 fail_nomsg:
3734 ceph_pagelist_release(recon_state.pagelist);
3735 fail_nopagelist:
3736 pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3737 return;
3738 }
3739
3740
3741
3742
3743
3744
3745
3746
3747 static void check_new_map(struct ceph_mds_client *mdsc,
3748 struct ceph_mdsmap *newmap,
3749 struct ceph_mdsmap *oldmap)
3750 {
3751 int i;
3752 int oldstate, newstate;
3753 struct ceph_mds_session *s;
3754
3755 dout("check_new_map new %u old %u\n",
3756 newmap->m_epoch, oldmap->m_epoch);
3757
3758 for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
3759 if (!mdsc->sessions[i])
3760 continue;
3761 s = mdsc->sessions[i];
3762 oldstate = ceph_mdsmap_get_state(oldmap, i);
3763 newstate = ceph_mdsmap_get_state(newmap, i);
3764
3765 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3766 i, ceph_mds_state_name(oldstate),
3767 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3768 ceph_mds_state_name(newstate),
3769 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3770 ceph_session_state_name(s->s_state));
3771
3772 if (i >= newmap->m_num_mds) {
3773
3774 get_session(s);
3775 __unregister_session(mdsc, s);
3776 __wake_requests(mdsc, &s->s_waiting);
3777 mutex_unlock(&mdsc->mutex);
3778
3779 mutex_lock(&s->s_mutex);
3780 cleanup_session_requests(mdsc, s);
3781 remove_session_caps(s);
3782 mutex_unlock(&s->s_mutex);
3783
3784 ceph_put_mds_session(s);
3785
3786 mutex_lock(&mdsc->mutex);
3787 kick_requests(mdsc, i);
3788 continue;
3789 }
3790
3791 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
3792 ceph_mdsmap_get_addr(newmap, i),
3793 sizeof(struct ceph_entity_addr))) {
3794
3795 mutex_unlock(&mdsc->mutex);
3796 mutex_lock(&s->s_mutex);
3797 mutex_lock(&mdsc->mutex);
3798 ceph_con_close(&s->s_con);
3799 mutex_unlock(&s->s_mutex);
3800 s->s_state = CEPH_MDS_SESSION_RESTARTING;
3801 } else if (oldstate == newstate) {
3802 continue;
3803 }
3804
3805
3806
3807
3808 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
3809 newstate >= CEPH_MDS_STATE_RECONNECT) {
3810 mutex_unlock(&mdsc->mutex);
3811 send_mds_reconnect(mdsc, s);
3812 mutex_lock(&mdsc->mutex);
3813 }
3814
3815
3816
3817
3818 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
3819 newstate >= CEPH_MDS_STATE_ACTIVE) {
3820 if (oldstate != CEPH_MDS_STATE_CREATING &&
3821 oldstate != CEPH_MDS_STATE_STARTING)
3822 pr_info("mds%d recovery completed\n", s->s_mds);
3823 kick_requests(mdsc, i);
3824 ceph_kick_flushing_caps(mdsc, s);
3825 wake_up_session_caps(s, RECONNECT);
3826 }
3827 }
3828
3829 for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
3830 s = mdsc->sessions[i];
3831 if (!s)
3832 continue;
3833 if (!ceph_mdsmap_is_laggy(newmap, i))
3834 continue;
3835 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3836 s->s_state == CEPH_MDS_SESSION_HUNG ||
3837 s->s_state == CEPH_MDS_SESSION_CLOSING) {
3838 dout(" connecting to export targets of laggy mds%d\n",
3839 i);
3840 __open_export_target_sessions(mdsc, s);
3841 }
3842 }
3843 }
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
3855 {
3856 struct ceph_dentry_info *di = ceph_dentry(dentry);
3857
3858 ceph_put_mds_session(di->lease_session);
3859 di->lease_session = NULL;
3860 }
3861
3862 static void handle_lease(struct ceph_mds_client *mdsc,
3863 struct ceph_mds_session *session,
3864 struct ceph_msg *msg)
3865 {
3866 struct super_block *sb = mdsc->fsc->sb;
3867 struct inode *inode;
3868 struct dentry *parent, *dentry;
3869 struct ceph_dentry_info *di;
3870 int mds = session->s_mds;
3871 struct ceph_mds_lease *h = msg->front.iov_base;
3872 u32 seq;
3873 struct ceph_vino vino;
3874 struct qstr dname;
3875 int release = 0;
3876
3877 dout("handle_lease from mds%d\n", mds);
3878
3879
3880 if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
3881 goto bad;
3882 vino.ino = le64_to_cpu(h->ino);
3883 vino.snap = CEPH_NOSNAP;
3884 seq = le32_to_cpu(h->seq);
3885 dname.len = get_unaligned_le32(h + 1);
3886 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
3887 goto bad;
3888 dname.name = (void *)(h + 1) + sizeof(u32);
3889
3890
3891 inode = ceph_find_inode(sb, vino);
3892 dout("handle_lease %s, ino %llx %p %.*s\n",
3893 ceph_lease_op_name(h->action), vino.ino, inode,
3894 dname.len, dname.name);
3895
3896 mutex_lock(&session->s_mutex);
3897 session->s_seq++;
3898
3899 if (!inode) {
3900 dout("handle_lease no inode %llx\n", vino.ino);
3901 goto release;
3902 }
3903
3904
3905 parent = d_find_alias(inode);
3906 if (!parent) {
3907 dout("no parent dentry on inode %p\n", inode);
3908 WARN_ON(1);
3909 goto release;
3910 }
3911 dname.hash = full_name_hash(parent, dname.name, dname.len);
3912 dentry = d_lookup(parent, &dname);
3913 dput(parent);
3914 if (!dentry)
3915 goto release;
3916
3917 spin_lock(&dentry->d_lock);
3918 di = ceph_dentry(dentry);
3919 switch (h->action) {
3920 case CEPH_MDS_LEASE_REVOKE:
3921 if (di->lease_session == session) {
3922 if (ceph_seq_cmp(di->lease_seq, seq) > 0)
3923 h->seq = cpu_to_le32(di->lease_seq);
3924 __ceph_mdsc_drop_dentry_lease(dentry);
3925 }
3926 release = 1;
3927 break;
3928
3929 case CEPH_MDS_LEASE_RENEW:
3930 if (di->lease_session == session &&
3931 di->lease_gen == session->s_cap_gen &&
3932 di->lease_renew_from &&
3933 di->lease_renew_after == 0) {
3934 unsigned long duration =
3935 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
3936
3937 di->lease_seq = seq;
3938 di->time = di->lease_renew_from + duration;
3939 di->lease_renew_after = di->lease_renew_from +
3940 (duration >> 1);
3941 di->lease_renew_from = 0;
3942 }
3943 break;
3944 }
3945 spin_unlock(&dentry->d_lock);
3946 dput(dentry);
3947
3948 if (!release)
3949 goto out;
3950
3951 release:
3952
3953 h->action = CEPH_MDS_LEASE_REVOKE_ACK;
3954 ceph_msg_get(msg);
3955 ceph_con_send(&session->s_con, msg);
3956
3957 out:
3958 mutex_unlock(&session->s_mutex);
3959
3960 ceph_async_iput(inode);
3961 return;
3962
3963 bad:
3964 pr_err("corrupt lease message\n");
3965 ceph_msg_dump(msg);
3966 }
3967
3968 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3969 struct dentry *dentry, char action,
3970 u32 seq)
3971 {
3972 struct ceph_msg *msg;
3973 struct ceph_mds_lease *lease;
3974 struct inode *dir;
3975 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
3976
3977 dout("lease_send_msg identry %p %s to mds%d\n",
3978 dentry, ceph_lease_op_name(action), session->s_mds);
3979
3980 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
3981 if (!msg)
3982 return;
3983 lease = msg->front.iov_base;
3984 lease->action = action;
3985 lease->seq = cpu_to_le32(seq);
3986
3987 spin_lock(&dentry->d_lock);
3988 dir = d_inode(dentry->d_parent);
3989 lease->ino = cpu_to_le64(ceph_ino(dir));
3990 lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
3991
3992 put_unaligned_le32(dentry->d_name.len, lease + 1);
3993 memcpy((void *)(lease + 1) + 4,
3994 dentry->d_name.name, dentry->d_name.len);
3995 spin_unlock(&dentry->d_lock);
3996
3997
3998
3999
4000
4001 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
4002
4003 ceph_con_send(&session->s_con, msg);
4004 }
4005
4006
4007
4008
4009 static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
4010 {
4011 int i;
4012
4013 mutex_lock(&mdsc->mutex);
4014 for (i = 0; i < mdsc->max_sessions; i++) {
4015 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4016 if (!s)
4017 continue;
4018 mutex_unlock(&mdsc->mutex);
4019 mutex_lock(&s->s_mutex);
4020 mutex_unlock(&s->s_mutex);
4021 ceph_put_mds_session(s);
4022 mutex_lock(&mdsc->mutex);
4023 }
4024 mutex_unlock(&mdsc->mutex);
4025 }
4026
4027 static void maybe_recover_session(struct ceph_mds_client *mdsc)
4028 {
4029 struct ceph_fs_client *fsc = mdsc->fsc;
4030
4031 if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4032 return;
4033
4034 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4035 return;
4036
4037 if (!READ_ONCE(fsc->blacklisted))
4038 return;
4039
4040 if (fsc->last_auto_reconnect &&
4041 time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
4042 return;
4043
4044 pr_info("auto reconnect after blacklisted\n");
4045 fsc->last_auto_reconnect = jiffies;
4046 ceph_force_reconnect(fsc->sb);
4047 }
4048
4049
4050
4051
4052 static void schedule_delayed(struct ceph_mds_client *mdsc)
4053 {
4054 int delay = 5;
4055 unsigned hz = round_jiffies_relative(HZ * delay);
4056 schedule_delayed_work(&mdsc->delayed_work, hz);
4057 }
4058
4059 static void delayed_work(struct work_struct *work)
4060 {
4061 int i;
4062 struct ceph_mds_client *mdsc =
4063 container_of(work, struct ceph_mds_client, delayed_work.work);
4064 int renew_interval;
4065 int renew_caps;
4066
4067 dout("mdsc delayed_work\n");
4068
4069 mutex_lock(&mdsc->mutex);
4070 renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4071 renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4072 mdsc->last_renew_caps);
4073 if (renew_caps)
4074 mdsc->last_renew_caps = jiffies;
4075
4076 for (i = 0; i < mdsc->max_sessions; i++) {
4077 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4078 if (!s)
4079 continue;
4080 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4081 dout("resending session close request for mds%d\n",
4082 s->s_mds);
4083 request_close_session(mdsc, s);
4084 ceph_put_mds_session(s);
4085 continue;
4086 }
4087 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4088 if (s->s_state == CEPH_MDS_SESSION_OPEN) {
4089 s->s_state = CEPH_MDS_SESSION_HUNG;
4090 pr_info("mds%d hung\n", s->s_mds);
4091 }
4092 }
4093 if (s->s_state == CEPH_MDS_SESSION_NEW ||
4094 s->s_state == CEPH_MDS_SESSION_RESTARTING ||
4095 s->s_state == CEPH_MDS_SESSION_REJECTED) {
4096
4097 ceph_put_mds_session(s);
4098 continue;
4099 }
4100 mutex_unlock(&mdsc->mutex);
4101
4102 mutex_lock(&s->s_mutex);
4103 if (renew_caps)
4104 send_renew_caps(mdsc, s);
4105 else
4106 ceph_con_keepalive(&s->s_con);
4107 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4108 s->s_state == CEPH_MDS_SESSION_HUNG)
4109 ceph_send_cap_releases(mdsc, s);
4110 mutex_unlock(&s->s_mutex);
4111 ceph_put_mds_session(s);
4112
4113 mutex_lock(&mdsc->mutex);
4114 }
4115 mutex_unlock(&mdsc->mutex);
4116
4117 ceph_check_delayed_caps(mdsc);
4118
4119 ceph_queue_cap_reclaim_work(mdsc);
4120
4121 ceph_trim_snapid_map(mdsc);
4122
4123 maybe_recover_session(mdsc);
4124
4125 schedule_delayed(mdsc);
4126 }
4127
4128 int ceph_mdsc_init(struct ceph_fs_client *fsc)
4129
4130 {
4131 struct ceph_mds_client *mdsc;
4132
4133 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4134 if (!mdsc)
4135 return -ENOMEM;
4136 mdsc->fsc = fsc;
4137 mutex_init(&mdsc->mutex);
4138 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4139 if (!mdsc->mdsmap) {
4140 kfree(mdsc);
4141 return -ENOMEM;
4142 }
4143
4144 fsc->mdsc = mdsc;
4145 init_completion(&mdsc->safe_umount_waiters);
4146 init_waitqueue_head(&mdsc->session_close_wq);
4147 INIT_LIST_HEAD(&mdsc->waiting_for_map);
4148 mdsc->sessions = NULL;
4149 atomic_set(&mdsc->num_sessions, 0);
4150 mdsc->max_sessions = 0;
4151 mdsc->stopping = 0;
4152 atomic64_set(&mdsc->quotarealms_count, 0);
4153 mdsc->quotarealms_inodes = RB_ROOT;
4154 mutex_init(&mdsc->quotarealms_inodes_mutex);
4155 mdsc->last_snap_seq = 0;
4156 init_rwsem(&mdsc->snap_rwsem);
4157 mdsc->snap_realms = RB_ROOT;
4158 INIT_LIST_HEAD(&mdsc->snap_empty);
4159 mdsc->num_snap_realms = 0;
4160 spin_lock_init(&mdsc->snap_empty_lock);
4161 mdsc->last_tid = 0;
4162 mdsc->oldest_tid = 0;
4163 mdsc->request_tree = RB_ROOT;
4164 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4165 mdsc->last_renew_caps = jiffies;
4166 INIT_LIST_HEAD(&mdsc->cap_delay_list);
4167 spin_lock_init(&mdsc->cap_delay_lock);
4168 INIT_LIST_HEAD(&mdsc->snap_flush_list);
4169 spin_lock_init(&mdsc->snap_flush_lock);
4170 mdsc->last_cap_flush_tid = 1;
4171 INIT_LIST_HEAD(&mdsc->cap_flush_list);
4172 INIT_LIST_HEAD(&mdsc->cap_dirty);
4173 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4174 mdsc->num_cap_flushing = 0;
4175 spin_lock_init(&mdsc->cap_dirty_lock);
4176 init_waitqueue_head(&mdsc->cap_flushing_wq);
4177 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4178 atomic_set(&mdsc->cap_reclaim_pending, 0);
4179
4180 spin_lock_init(&mdsc->dentry_list_lock);
4181 INIT_LIST_HEAD(&mdsc->dentry_leases);
4182 INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4183
4184 ceph_caps_init(mdsc);
4185 ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4186
4187 spin_lock_init(&mdsc->snapid_map_lock);
4188 mdsc->snapid_map_tree = RB_ROOT;
4189 INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4190
4191 init_rwsem(&mdsc->pool_perm_rwsem);
4192 mdsc->pool_perm_tree = RB_ROOT;
4193
4194 strscpy(mdsc->nodename, utsname()->nodename,
4195 sizeof(mdsc->nodename));
4196 return 0;
4197 }
4198
4199
4200
4201
4202
4203 static void wait_requests(struct ceph_mds_client *mdsc)
4204 {
4205 struct ceph_options *opts = mdsc->fsc->client->options;
4206 struct ceph_mds_request *req;
4207
4208 mutex_lock(&mdsc->mutex);
4209 if (__get_oldest_req(mdsc)) {
4210 mutex_unlock(&mdsc->mutex);
4211
4212 dout("wait_requests waiting for requests\n");
4213 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4214 ceph_timeout_jiffies(opts->mount_timeout));
4215
4216
4217 mutex_lock(&mdsc->mutex);
4218 while ((req = __get_oldest_req(mdsc))) {
4219 dout("wait_requests timed out on tid %llu\n",
4220 req->r_tid);
4221 list_del_init(&req->r_wait);
4222 __unregister_request(mdsc, req);
4223 }
4224 }
4225 mutex_unlock(&mdsc->mutex);
4226 dout("wait_requests done\n");
4227 }
4228
4229
4230
4231
4232
4233 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4234 {
4235 dout("pre_umount\n");
4236 mdsc->stopping = 1;
4237
4238 lock_unlock_sessions(mdsc);
4239 ceph_flush_dirty_caps(mdsc);
4240 wait_requests(mdsc);
4241
4242
4243
4244
4245
4246 ceph_msgr_flush();
4247
4248 ceph_cleanup_quotarealms_inodes(mdsc);
4249 }
4250
4251
4252
4253
4254 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4255 {
4256 struct ceph_mds_request *req = NULL, *nextreq;
4257 struct rb_node *n;
4258
4259 mutex_lock(&mdsc->mutex);
4260 dout("wait_unsafe_requests want %lld\n", want_tid);
4261 restart:
4262 req = __get_oldest_req(mdsc);
4263 while (req && req->r_tid <= want_tid) {
4264
4265 n = rb_next(&req->r_node);
4266 if (n)
4267 nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4268 else
4269 nextreq = NULL;
4270 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4271 (req->r_op & CEPH_MDS_OP_WRITE)) {
4272
4273 ceph_mdsc_get_request(req);
4274 if (nextreq)
4275 ceph_mdsc_get_request(nextreq);
4276 mutex_unlock(&mdsc->mutex);
4277 dout("wait_unsafe_requests wait on %llu (want %llu)\n",
4278 req->r_tid, want_tid);
4279 wait_for_completion(&req->r_safe_completion);
4280 mutex_lock(&mdsc->mutex);
4281 ceph_mdsc_put_request(req);
4282 if (!nextreq)
4283 break;
4284 if (RB_EMPTY_NODE(&nextreq->r_node)) {
4285
4286 ceph_mdsc_put_request(nextreq);
4287 goto restart;
4288 }
4289 ceph_mdsc_put_request(nextreq);
4290 }
4291 req = nextreq;
4292 }
4293 mutex_unlock(&mdsc->mutex);
4294 dout("wait_unsafe_requests done\n");
4295 }
4296
4297 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4298 {
4299 u64 want_tid, want_flush;
4300
4301 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4302 return;
4303
4304 dout("sync\n");
4305 mutex_lock(&mdsc->mutex);
4306 want_tid = mdsc->last_tid;
4307 mutex_unlock(&mdsc->mutex);
4308
4309 ceph_flush_dirty_caps(mdsc);
4310 spin_lock(&mdsc->cap_dirty_lock);
4311 want_flush = mdsc->last_cap_flush_tid;
4312 if (!list_empty(&mdsc->cap_flush_list)) {
4313 struct ceph_cap_flush *cf =
4314 list_last_entry(&mdsc->cap_flush_list,
4315 struct ceph_cap_flush, g_list);
4316 cf->wake = true;
4317 }
4318 spin_unlock(&mdsc->cap_dirty_lock);
4319
4320 dout("sync want tid %lld flush_seq %lld\n",
4321 want_tid, want_flush);
4322
4323 wait_unsafe_requests(mdsc, want_tid);
4324 wait_caps_flush(mdsc, want_flush);
4325 }
4326
4327
4328
4329
4330 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4331 {
4332 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4333 return true;
4334 return atomic_read(&mdsc->num_sessions) <= skipped;
4335 }
4336
4337
4338
4339
4340 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4341 {
4342 struct ceph_options *opts = mdsc->fsc->client->options;
4343 struct ceph_mds_session *session;
4344 int i;
4345 int skipped = 0;
4346
4347 dout("close_sessions\n");
4348
4349
4350 mutex_lock(&mdsc->mutex);
4351 for (i = 0; i < mdsc->max_sessions; i++) {
4352 session = __ceph_lookup_mds_session(mdsc, i);
4353 if (!session)
4354 continue;
4355 mutex_unlock(&mdsc->mutex);
4356 mutex_lock(&session->s_mutex);
4357 if (__close_session(mdsc, session) <= 0)
4358 skipped++;
4359 mutex_unlock(&session->s_mutex);
4360 ceph_put_mds_session(session);
4361 mutex_lock(&mdsc->mutex);
4362 }
4363 mutex_unlock(&mdsc->mutex);
4364
4365 dout("waiting for sessions to close\n");
4366 wait_event_timeout(mdsc->session_close_wq,
4367 done_closing_sessions(mdsc, skipped),
4368 ceph_timeout_jiffies(opts->mount_timeout));
4369
4370
4371 mutex_lock(&mdsc->mutex);
4372 for (i = 0; i < mdsc->max_sessions; i++) {
4373 if (mdsc->sessions[i]) {
4374 session = get_session(mdsc->sessions[i]);
4375 __unregister_session(mdsc, session);
4376 mutex_unlock(&mdsc->mutex);
4377 mutex_lock(&session->s_mutex);
4378 remove_session_caps(session);
4379 mutex_unlock(&session->s_mutex);
4380 ceph_put_mds_session(session);
4381 mutex_lock(&mdsc->mutex);
4382 }
4383 }
4384 WARN_ON(!list_empty(&mdsc->cap_delay_list));
4385 mutex_unlock(&mdsc->mutex);
4386
4387 ceph_cleanup_snapid_map(mdsc);
4388 ceph_cleanup_empty_realms(mdsc);
4389
4390 cancel_work_sync(&mdsc->cap_reclaim_work);
4391 cancel_delayed_work_sync(&mdsc->delayed_work);
4392
4393 dout("stopped\n");
4394 }
4395
4396 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4397 {
4398 struct ceph_mds_session *session;
4399 int mds;
4400
4401 dout("force umount\n");
4402
4403 mutex_lock(&mdsc->mutex);
4404 for (mds = 0; mds < mdsc->max_sessions; mds++) {
4405 session = __ceph_lookup_mds_session(mdsc, mds);
4406 if (!session)
4407 continue;
4408
4409 if (session->s_state == CEPH_MDS_SESSION_REJECTED)
4410 __unregister_session(mdsc, session);
4411 __wake_requests(mdsc, &session->s_waiting);
4412 mutex_unlock(&mdsc->mutex);
4413
4414 mutex_lock(&session->s_mutex);
4415 __close_session(mdsc, session);
4416 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4417 cleanup_session_requests(mdsc, session);
4418 remove_session_caps(session);
4419 }
4420 mutex_unlock(&session->s_mutex);
4421 ceph_put_mds_session(session);
4422
4423 mutex_lock(&mdsc->mutex);
4424 kick_requests(mdsc, mds);
4425 }
4426 __wake_requests(mdsc, &mdsc->waiting_for_map);
4427 mutex_unlock(&mdsc->mutex);
4428 }
4429
4430 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4431 {
4432 dout("stop\n");
4433 cancel_delayed_work_sync(&mdsc->delayed_work);
4434 if (mdsc->mdsmap)
4435 ceph_mdsmap_destroy(mdsc->mdsmap);
4436 kfree(mdsc->sessions);
4437 ceph_caps_finalize(mdsc);
4438 ceph_pool_perm_destroy(mdsc);
4439 }
4440
4441 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4442 {
4443 struct ceph_mds_client *mdsc = fsc->mdsc;
4444 dout("mdsc_destroy %p\n", mdsc);
4445
4446 if (!mdsc)
4447 return;
4448
4449
4450 ceph_msgr_flush();
4451
4452 ceph_mdsc_stop(mdsc);
4453
4454 fsc->mdsc = NULL;
4455 kfree(mdsc);
4456 dout("mdsc_destroy %p done\n", mdsc);
4457 }
4458
4459 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4460 {
4461 struct ceph_fs_client *fsc = mdsc->fsc;
4462 const char *mds_namespace = fsc->mount_options->mds_namespace;
4463 void *p = msg->front.iov_base;
4464 void *end = p + msg->front.iov_len;
4465 u32 epoch;
4466 u32 map_len;
4467 u32 num_fs;
4468 u32 mount_fscid = (u32)-1;
4469 u8 struct_v, struct_cv;
4470 int err = -EINVAL;
4471
4472 ceph_decode_need(&p, end, sizeof(u32), bad);
4473 epoch = ceph_decode_32(&p);
4474
4475 dout("handle_fsmap epoch %u\n", epoch);
4476
4477 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4478 struct_v = ceph_decode_8(&p);
4479 struct_cv = ceph_decode_8(&p);
4480 map_len = ceph_decode_32(&p);
4481
4482 ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
4483 p += sizeof(u32) * 2;
4484
4485 num_fs = ceph_decode_32(&p);
4486 while (num_fs-- > 0) {
4487 void *info_p, *info_end;
4488 u32 info_len;
4489 u8 info_v, info_cv;
4490 u32 fscid, namelen;
4491
4492 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4493 info_v = ceph_decode_8(&p);
4494 info_cv = ceph_decode_8(&p);
4495 info_len = ceph_decode_32(&p);
4496 ceph_decode_need(&p, end, info_len, bad);
4497 info_p = p;
4498 info_end = p + info_len;
4499 p = info_end;
4500
4501 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4502 fscid = ceph_decode_32(&info_p);
4503 namelen = ceph_decode_32(&info_p);
4504 ceph_decode_need(&info_p, info_end, namelen, bad);
4505
4506 if (mds_namespace &&
4507 strlen(mds_namespace) == namelen &&
4508 !strncmp(mds_namespace, (char *)info_p, namelen)) {
4509 mount_fscid = fscid;
4510 break;
4511 }
4512 }
4513
4514 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4515 if (mount_fscid != (u32)-1) {
4516 fsc->client->monc.fs_cluster_id = mount_fscid;
4517 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4518 0, true);
4519 ceph_monc_renew_subs(&fsc->client->monc);
4520 } else {
4521 err = -ENOENT;
4522 goto err_out;
4523 }
4524 return;
4525
4526 bad:
4527 pr_err("error decoding fsmap\n");
4528 err_out:
4529 mutex_lock(&mdsc->mutex);
4530 mdsc->mdsmap_err = err;
4531 __wake_requests(mdsc, &mdsc->waiting_for_map);
4532 mutex_unlock(&mdsc->mutex);
4533 }
4534
4535
4536
4537
4538 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4539 {
4540 u32 epoch;
4541 u32 maplen;
4542 void *p = msg->front.iov_base;
4543 void *end = p + msg->front.iov_len;
4544 struct ceph_mdsmap *newmap, *oldmap;
4545 struct ceph_fsid fsid;
4546 int err = -EINVAL;
4547
4548 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
4549 ceph_decode_copy(&p, &fsid, sizeof(fsid));
4550 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
4551 return;
4552 epoch = ceph_decode_32(&p);
4553 maplen = ceph_decode_32(&p);
4554 dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
4555
4556
4557 mutex_lock(&mdsc->mutex);
4558 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
4559 dout("handle_map epoch %u <= our %u\n",
4560 epoch, mdsc->mdsmap->m_epoch);
4561 mutex_unlock(&mdsc->mutex);
4562 return;
4563 }
4564
4565 newmap = ceph_mdsmap_decode(&p, end);
4566 if (IS_ERR(newmap)) {
4567 err = PTR_ERR(newmap);
4568 goto bad_unlock;
4569 }
4570
4571
4572 if (mdsc->mdsmap) {
4573 oldmap = mdsc->mdsmap;
4574 mdsc->mdsmap = newmap;
4575 check_new_map(mdsc, newmap, oldmap);
4576 ceph_mdsmap_destroy(oldmap);
4577 } else {
4578 mdsc->mdsmap = newmap;
4579 }
4580 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
4581 MAX_LFS_FILESIZE);
4582
4583 __wake_requests(mdsc, &mdsc->waiting_for_map);
4584 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
4585 mdsc->mdsmap->m_epoch);
4586
4587 mutex_unlock(&mdsc->mutex);
4588 schedule_delayed(mdsc);
4589 return;
4590
4591 bad_unlock:
4592 mutex_unlock(&mdsc->mutex);
4593 bad:
4594 pr_err("error decoding mdsmap %d\n", err);
4595 return;
4596 }
4597
4598 static struct ceph_connection *con_get(struct ceph_connection *con)
4599 {
4600 struct ceph_mds_session *s = con->private;
4601
4602 if (get_session(s)) {
4603 dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
4604 return con;
4605 }
4606 dout("mdsc con_get %p FAIL\n", s);
4607 return NULL;
4608 }
4609
4610 static void con_put(struct ceph_connection *con)
4611 {
4612 struct ceph_mds_session *s = con->private;
4613
4614 dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
4615 ceph_put_mds_session(s);
4616 }
4617
4618
4619
4620
4621
4622 static void peer_reset(struct ceph_connection *con)
4623 {
4624 struct ceph_mds_session *s = con->private;
4625 struct ceph_mds_client *mdsc = s->s_mdsc;
4626
4627 pr_warn("mds%d closed our session\n", s->s_mds);
4628 send_mds_reconnect(mdsc, s);
4629 }
4630
4631 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
4632 {
4633 struct ceph_mds_session *s = con->private;
4634 struct ceph_mds_client *mdsc = s->s_mdsc;
4635 int type = le16_to_cpu(msg->hdr.type);
4636
4637 mutex_lock(&mdsc->mutex);
4638 if (__verify_registered_session(mdsc, s) < 0) {
4639 mutex_unlock(&mdsc->mutex);
4640 goto out;
4641 }
4642 mutex_unlock(&mdsc->mutex);
4643
4644 switch (type) {
4645 case CEPH_MSG_MDS_MAP:
4646 ceph_mdsc_handle_mdsmap(mdsc, msg);
4647 break;
4648 case CEPH_MSG_FS_MAP_USER:
4649 ceph_mdsc_handle_fsmap(mdsc, msg);
4650 break;
4651 case CEPH_MSG_CLIENT_SESSION:
4652 handle_session(s, msg);
4653 break;
4654 case CEPH_MSG_CLIENT_REPLY:
4655 handle_reply(s, msg);
4656 break;
4657 case CEPH_MSG_CLIENT_REQUEST_FORWARD:
4658 handle_forward(mdsc, s, msg);
4659 break;
4660 case CEPH_MSG_CLIENT_CAPS:
4661 ceph_handle_caps(s, msg);
4662 break;
4663 case CEPH_MSG_CLIENT_SNAP:
4664 ceph_handle_snap(mdsc, s, msg);
4665 break;
4666 case CEPH_MSG_CLIENT_LEASE:
4667 handle_lease(mdsc, s, msg);
4668 break;
4669 case CEPH_MSG_CLIENT_QUOTA:
4670 ceph_handle_quota(mdsc, s, msg);
4671 break;
4672
4673 default:
4674 pr_err("received unknown message type %d %s\n", type,
4675 ceph_msg_type_name(type));
4676 }
4677 out:
4678 ceph_msg_put(msg);
4679 }
4680
4681
4682
4683
4684
4685
4686
4687
4688
4689 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
4690 int *proto, int force_new)
4691 {
4692 struct ceph_mds_session *s = con->private;
4693 struct ceph_mds_client *mdsc = s->s_mdsc;
4694 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4695 struct ceph_auth_handshake *auth = &s->s_auth;
4696
4697 if (force_new && auth->authorizer) {
4698 ceph_auth_destroy_authorizer(auth->authorizer);
4699 auth->authorizer = NULL;
4700 }
4701 if (!auth->authorizer) {
4702 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4703 auth);
4704 if (ret)
4705 return ERR_PTR(ret);
4706 } else {
4707 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4708 auth);
4709 if (ret)
4710 return ERR_PTR(ret);
4711 }
4712 *proto = ac->protocol;
4713
4714 return auth;
4715 }
4716
4717 static int add_authorizer_challenge(struct ceph_connection *con,
4718 void *challenge_buf, int challenge_buf_len)
4719 {
4720 struct ceph_mds_session *s = con->private;
4721 struct ceph_mds_client *mdsc = s->s_mdsc;
4722 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4723
4724 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
4725 challenge_buf, challenge_buf_len);
4726 }
4727
4728 static int verify_authorizer_reply(struct ceph_connection *con)
4729 {
4730 struct ceph_mds_session *s = con->private;
4731 struct ceph_mds_client *mdsc = s->s_mdsc;
4732 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4733
4734 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
4735 }
4736
4737 static int invalidate_authorizer(struct ceph_connection *con)
4738 {
4739 struct ceph_mds_session *s = con->private;
4740 struct ceph_mds_client *mdsc = s->s_mdsc;
4741 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4742
4743 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
4744
4745 return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
4746 }
4747
4748 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
4749 struct ceph_msg_header *hdr, int *skip)
4750 {
4751 struct ceph_msg *msg;
4752 int type = (int) le16_to_cpu(hdr->type);
4753 int front_len = (int) le32_to_cpu(hdr->front_len);
4754
4755 if (con->in_msg)
4756 return con->in_msg;
4757
4758 *skip = 0;
4759 msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
4760 if (!msg) {
4761 pr_err("unable to allocate msg type %d len %d\n",
4762 type, front_len);
4763 return NULL;
4764 }
4765
4766 return msg;
4767 }
4768
4769 static int mds_sign_message(struct ceph_msg *msg)
4770 {
4771 struct ceph_mds_session *s = msg->con->private;
4772 struct ceph_auth_handshake *auth = &s->s_auth;
4773
4774 return ceph_auth_sign_message(auth, msg);
4775 }
4776
4777 static int mds_check_message_signature(struct ceph_msg *msg)
4778 {
4779 struct ceph_mds_session *s = msg->con->private;
4780 struct ceph_auth_handshake *auth = &s->s_auth;
4781
4782 return ceph_auth_check_message_signature(auth, msg);
4783 }
4784
4785 static const struct ceph_connection_operations mds_con_ops = {
4786 .get = con_get,
4787 .put = con_put,
4788 .dispatch = dispatch,
4789 .get_authorizer = get_authorizer,
4790 .add_authorizer_challenge = add_authorizer_challenge,
4791 .verify_authorizer_reply = verify_authorizer_reply,
4792 .invalidate_authorizer = invalidate_authorizer,
4793 .peer_reset = peer_reset,
4794 .alloc_msg = mds_alloc_msg,
4795 .sign_message = mds_sign_message,
4796 .check_message_signature = mds_check_message_signature,
4797 };
4798
4799