1#include <linux/ceph/ceph_debug.h> 2 3#include <linux/crc32c.h> 4#include <linux/ctype.h> 5#include <linux/highmem.h> 6#include <linux/inet.h> 7#include <linux/kthread.h> 8#include <linux/net.h> 9#include <linux/slab.h> 10#include <linux/socket.h> 11#include <linux/string.h> 12#ifdef CONFIG_BLOCK 13#include <linux/bio.h> 14#endif /* CONFIG_BLOCK */ 15#include <linux/dns_resolver.h> 16#include <net/tcp.h> 17 18#include <linux/ceph/ceph_features.h> 19#include <linux/ceph/libceph.h> 20#include <linux/ceph/messenger.h> 21#include <linux/ceph/decode.h> 22#include <linux/ceph/pagelist.h> 23#include <linux/export.h> 24 25#define list_entry_next(pos, member) \ 26 list_entry(pos->member.next, typeof(*pos), member) 27 28/* 29 * Ceph uses the messenger to exchange ceph_msg messages with other 30 * hosts in the system. The messenger provides ordered and reliable 31 * delivery. We tolerate TCP disconnects by reconnecting (with 32 * exponential backoff) in the case of a fault (disconnection, bad 33 * crc, protocol error). Acks allow sent messages to be discarded by 34 * the sender. 35 */ 36 37/* 38 * We track the state of the socket on a given connection using 39 * values defined below. The transition to a new socket state is 40 * handled by a function which verifies we aren't coming from an 41 * unexpected state. 42 * 43 * -------- 44 * | NEW* | transient initial state 45 * -------- 46 * | con_sock_state_init() 47 * v 48 * ---------- 49 * | CLOSED | initialized, but no socket (and no 50 * ---------- TCP connection) 51 * ^ \ 52 * | \ con_sock_state_connecting() 53 * | ---------------------- 54 * | \ 55 * + con_sock_state_closed() \ 56 * |+--------------------------- \ 57 * | \ \ \ 58 * | ----------- \ \ 59 * | | CLOSING | socket event; \ \ 60 * | ----------- await close \ \ 61 * | ^ \ | 62 * | | \ | 63 * | + con_sock_state_closing() \ | 64 * | / \ | | 65 * | / --------------- | | 66 * | / \ v v 67 * | / -------------- 68 * | / -----------------| CONNECTING | socket created, TCP 69 * | | / -------------- connect initiated 70 * | | | con_sock_state_connected() 71 * | | v 72 * ------------- 73 * | CONNECTED | TCP connection established 74 * ------------- 75 * 76 * State values for ceph_connection->sock_state; NEW is assumed to be 0. 77 */ 78 79#define CON_SOCK_STATE_NEW 0 /* -> CLOSED */ 80#define CON_SOCK_STATE_CLOSED 1 /* -> CONNECTING */ 81#define CON_SOCK_STATE_CONNECTING 2 /* -> CONNECTED or -> CLOSING */ 82#define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ 83#define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ 84 85/* 86 * connection states 87 */ 88#define CON_STATE_CLOSED 1 /* -> PREOPEN */ 89#define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */ 90#define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */ 91#define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */ 92#define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */ 93#define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */ 94 95/* 96 * ceph_connection flag bits 97 */ 98#define CON_FLAG_LOSSYTX 0 /* we can close channel or drop 99 * messages on errors */ 100#define CON_FLAG_KEEPALIVE_PENDING 1 /* we need to send a keepalive */ 101#define CON_FLAG_WRITE_PENDING 2 /* we have data ready to send */ 102#define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */ 103#define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */ 104 105static bool con_flag_valid(unsigned long con_flag) 106{ 107 switch (con_flag) { 108 case CON_FLAG_LOSSYTX: 109 case CON_FLAG_KEEPALIVE_PENDING: 110 case CON_FLAG_WRITE_PENDING: 111 case CON_FLAG_SOCK_CLOSED: 112 case CON_FLAG_BACKOFF: 113 return true; 114 default: 115 return false; 116 } 117} 118 119static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag) 120{ 121 BUG_ON(!con_flag_valid(con_flag)); 122 123 clear_bit(con_flag, &con->flags); 124} 125 126static void con_flag_set(struct ceph_connection *con, unsigned long con_flag) 127{ 128 BUG_ON(!con_flag_valid(con_flag)); 129 130 set_bit(con_flag, &con->flags); 131} 132 133static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag) 134{ 135 BUG_ON(!con_flag_valid(con_flag)); 136 137 return test_bit(con_flag, &con->flags); 138} 139 140static bool con_flag_test_and_clear(struct ceph_connection *con, 141 unsigned long con_flag) 142{ 143 BUG_ON(!con_flag_valid(con_flag)); 144 145 return test_and_clear_bit(con_flag, &con->flags); 146} 147 148static bool con_flag_test_and_set(struct ceph_connection *con, 149 unsigned long con_flag) 150{ 151 BUG_ON(!con_flag_valid(con_flag)); 152 153 return test_and_set_bit(con_flag, &con->flags); 154} 155 156/* Slab caches for frequently-allocated structures */ 157 158static struct kmem_cache *ceph_msg_cache; 159static struct kmem_cache *ceph_msg_data_cache; 160 161/* static tag bytes (protocol control messages) */ 162static char tag_msg = CEPH_MSGR_TAG_MSG; 163static char tag_ack = CEPH_MSGR_TAG_ACK; 164static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; 165 166#ifdef CONFIG_LOCKDEP 167static struct lock_class_key socket_class; 168#endif 169 170/* 171 * When skipping (ignoring) a block of input we read it into a "skip 172 * buffer," which is this many bytes in size. 173 */ 174#define SKIP_BUF_SIZE 1024 175 176static void queue_con(struct ceph_connection *con); 177static void cancel_con(struct ceph_connection *con); 178static void con_work(struct work_struct *); 179static void con_fault(struct ceph_connection *con); 180 181/* 182 * Nicely render a sockaddr as a string. An array of formatted 183 * strings is used, to approximate reentrancy. 184 */ 185#define ADDR_STR_COUNT_LOG 5 /* log2(# address strings in array) */ 186#define ADDR_STR_COUNT (1 << ADDR_STR_COUNT_LOG) 187#define ADDR_STR_COUNT_MASK (ADDR_STR_COUNT - 1) 188#define MAX_ADDR_STR_LEN 64 /* 54 is enough */ 189 190static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN]; 191static atomic_t addr_str_seq = ATOMIC_INIT(0); 192 193static struct page *zero_page; /* used in certain error cases */ 194 195const char *ceph_pr_addr(const struct sockaddr_storage *ss) 196{ 197 int i; 198 char *s; 199 struct sockaddr_in *in4 = (struct sockaddr_in *) ss; 200 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss; 201 202 i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK; 203 s = addr_str[i]; 204 205 switch (ss->ss_family) { 206 case AF_INET: 207 snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%hu", &in4->sin_addr, 208 ntohs(in4->sin_port)); 209 break; 210 211 case AF_INET6: 212 snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%hu", &in6->sin6_addr, 213 ntohs(in6->sin6_port)); 214 break; 215 216 default: 217 snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)", 218 ss->ss_family); 219 } 220 221 return s; 222} 223EXPORT_SYMBOL(ceph_pr_addr); 224 225static void encode_my_addr(struct ceph_messenger *msgr) 226{ 227 memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr)); 228 ceph_encode_addr(&msgr->my_enc_addr); 229} 230 231/* 232 * work queue for all reading and writing to/from the socket. 233 */ 234static struct workqueue_struct *ceph_msgr_wq; 235 236static int ceph_msgr_slab_init(void) 237{ 238 BUG_ON(ceph_msg_cache); 239 ceph_msg_cache = kmem_cache_create("ceph_msg", 240 sizeof (struct ceph_msg), 241 __alignof__(struct ceph_msg), 0, NULL); 242 243 if (!ceph_msg_cache) 244 return -ENOMEM; 245 246 BUG_ON(ceph_msg_data_cache); 247 ceph_msg_data_cache = kmem_cache_create("ceph_msg_data", 248 sizeof (struct ceph_msg_data), 249 __alignof__(struct ceph_msg_data), 250 0, NULL); 251 if (ceph_msg_data_cache) 252 return 0; 253 254 kmem_cache_destroy(ceph_msg_cache); 255 ceph_msg_cache = NULL; 256 257 return -ENOMEM; 258} 259 260static void ceph_msgr_slab_exit(void) 261{ 262 BUG_ON(!ceph_msg_data_cache); 263 kmem_cache_destroy(ceph_msg_data_cache); 264 ceph_msg_data_cache = NULL; 265 266 BUG_ON(!ceph_msg_cache); 267 kmem_cache_destroy(ceph_msg_cache); 268 ceph_msg_cache = NULL; 269} 270 271static void _ceph_msgr_exit(void) 272{ 273 if (ceph_msgr_wq) { 274 destroy_workqueue(ceph_msgr_wq); 275 ceph_msgr_wq = NULL; 276 } 277 278 ceph_msgr_slab_exit(); 279 280 BUG_ON(zero_page == NULL); 281 kunmap(zero_page); 282 page_cache_release(zero_page); 283 zero_page = NULL; 284} 285 286int ceph_msgr_init(void) 287{ 288 BUG_ON(zero_page != NULL); 289 zero_page = ZERO_PAGE(0); 290 page_cache_get(zero_page); 291 292 if (ceph_msgr_slab_init()) 293 return -ENOMEM; 294 295 /* 296 * The number of active work items is limited by the number of 297 * connections, so leave @max_active at default. 298 */ 299 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_MEM_RECLAIM, 0); 300 if (ceph_msgr_wq) 301 return 0; 302 303 pr_err("msgr_init failed to create workqueue\n"); 304 _ceph_msgr_exit(); 305 306 return -ENOMEM; 307} 308EXPORT_SYMBOL(ceph_msgr_init); 309 310void ceph_msgr_exit(void) 311{ 312 BUG_ON(ceph_msgr_wq == NULL); 313 314 _ceph_msgr_exit(); 315} 316EXPORT_SYMBOL(ceph_msgr_exit); 317 318void ceph_msgr_flush(void) 319{ 320 flush_workqueue(ceph_msgr_wq); 321} 322EXPORT_SYMBOL(ceph_msgr_flush); 323 324/* Connection socket state transition functions */ 325 326static void con_sock_state_init(struct ceph_connection *con) 327{ 328 int old_state; 329 330 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); 331 if (WARN_ON(old_state != CON_SOCK_STATE_NEW)) 332 printk("%s: unexpected old state %d\n", __func__, old_state); 333 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, 334 CON_SOCK_STATE_CLOSED); 335} 336 337static void con_sock_state_connecting(struct ceph_connection *con) 338{ 339 int old_state; 340 341 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); 342 if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED)) 343 printk("%s: unexpected old state %d\n", __func__, old_state); 344 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, 345 CON_SOCK_STATE_CONNECTING); 346} 347 348static void con_sock_state_connected(struct ceph_connection *con) 349{ 350 int old_state; 351 352 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); 353 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING)) 354 printk("%s: unexpected old state %d\n", __func__, old_state); 355 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, 356 CON_SOCK_STATE_CONNECTED); 357} 358 359static void con_sock_state_closing(struct ceph_connection *con) 360{ 361 int old_state; 362 363 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING); 364 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING && 365 old_state != CON_SOCK_STATE_CONNECTED && 366 old_state != CON_SOCK_STATE_CLOSING)) 367 printk("%s: unexpected old state %d\n", __func__, old_state); 368 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, 369 CON_SOCK_STATE_CLOSING); 370} 371 372static void con_sock_state_closed(struct ceph_connection *con) 373{ 374 int old_state; 375 376 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); 377 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED && 378 old_state != CON_SOCK_STATE_CLOSING && 379 old_state != CON_SOCK_STATE_CONNECTING && 380 old_state != CON_SOCK_STATE_CLOSED)) 381 printk("%s: unexpected old state %d\n", __func__, old_state); 382 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, 383 CON_SOCK_STATE_CLOSED); 384} 385 386/* 387 * socket callback functions 388 */ 389 390/* data available on socket, or listen socket received a connect */ 391static void ceph_sock_data_ready(struct sock *sk) 392{ 393 struct ceph_connection *con = sk->sk_user_data; 394 if (atomic_read(&con->msgr->stopping)) { 395 return; 396 } 397 398 if (sk->sk_state != TCP_CLOSE_WAIT) { 399 dout("%s on %p state = %lu, queueing work\n", __func__, 400 con, con->state); 401 queue_con(con); 402 } 403} 404 405/* socket has buffer space for writing */ 406static void ceph_sock_write_space(struct sock *sk) 407{ 408 struct ceph_connection *con = sk->sk_user_data; 409 410 /* only queue to workqueue if there is data we want to write, 411 * and there is sufficient space in the socket buffer to accept 412 * more data. clear SOCK_NOSPACE so that ceph_sock_write_space() 413 * doesn't get called again until try_write() fills the socket 414 * buffer. See net/ipv4/tcp_input.c:tcp_check_space() 415 * and net/core/stream.c:sk_stream_write_space(). 416 */ 417 if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) { 418 if (sk_stream_is_writeable(sk)) { 419 dout("%s %p queueing write work\n", __func__, con); 420 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 421 queue_con(con); 422 } 423 } else { 424 dout("%s %p nothing to write\n", __func__, con); 425 } 426} 427 428/* socket's state has changed */ 429static void ceph_sock_state_change(struct sock *sk) 430{ 431 struct ceph_connection *con = sk->sk_user_data; 432 433 dout("%s %p state = %lu sk_state = %u\n", __func__, 434 con, con->state, sk->sk_state); 435 436 switch (sk->sk_state) { 437 case TCP_CLOSE: 438 dout("%s TCP_CLOSE\n", __func__); 439 case TCP_CLOSE_WAIT: 440 dout("%s TCP_CLOSE_WAIT\n", __func__); 441 con_sock_state_closing(con); 442 con_flag_set(con, CON_FLAG_SOCK_CLOSED); 443 queue_con(con); 444 break; 445 case TCP_ESTABLISHED: 446 dout("%s TCP_ESTABLISHED\n", __func__); 447 con_sock_state_connected(con); 448 queue_con(con); 449 break; 450 default: /* Everything else is uninteresting */ 451 break; 452 } 453} 454 455/* 456 * set up socket callbacks 457 */ 458static void set_sock_callbacks(struct socket *sock, 459 struct ceph_connection *con) 460{ 461 struct sock *sk = sock->sk; 462 sk->sk_user_data = con; 463 sk->sk_data_ready = ceph_sock_data_ready; 464 sk->sk_write_space = ceph_sock_write_space; 465 sk->sk_state_change = ceph_sock_state_change; 466} 467 468 469/* 470 * socket helpers 471 */ 472 473/* 474 * initiate connection to a remote socket. 475 */ 476static int ceph_tcp_connect(struct ceph_connection *con) 477{ 478 struct sockaddr_storage *paddr = &con->peer_addr.in_addr; 479 struct socket *sock; 480 int ret; 481 482 BUG_ON(con->sock); 483 ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM, 484 IPPROTO_TCP, &sock); 485 if (ret) 486 return ret; 487 sock->sk->sk_allocation = GFP_NOFS; 488 489#ifdef CONFIG_LOCKDEP 490 lockdep_set_class(&sock->sk->sk_lock, &socket_class); 491#endif 492 493 set_sock_callbacks(sock, con); 494 495 dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); 496 497 con_sock_state_connecting(con); 498 ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), 499 O_NONBLOCK); 500 if (ret == -EINPROGRESS) { 501 dout("connect %s EINPROGRESS sk_state = %u\n", 502 ceph_pr_addr(&con->peer_addr.in_addr), 503 sock->sk->sk_state); 504 } else if (ret < 0) { 505 pr_err("connect %s error %d\n", 506 ceph_pr_addr(&con->peer_addr.in_addr), ret); 507 sock_release(sock); 508 return ret; 509 } 510 511 if (con->msgr->tcp_nodelay) { 512 int optval = 1; 513 514 ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, 515 (char *)&optval, sizeof(optval)); 516 if (ret) 517 pr_err("kernel_setsockopt(TCP_NODELAY) failed: %d", 518 ret); 519 } 520 521 con->sock = sock; 522 return 0; 523} 524 525static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) 526{ 527 struct kvec iov = {buf, len}; 528 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 529 int r; 530 531 r = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); 532 if (r == -EAGAIN) 533 r = 0; 534 return r; 535} 536 537static int ceph_tcp_recvpage(struct socket *sock, struct page *page, 538 int page_offset, size_t length) 539{ 540 void *kaddr; 541 int ret; 542 543 BUG_ON(page_offset + length > PAGE_SIZE); 544 545 kaddr = kmap(page); 546 BUG_ON(!kaddr); 547 ret = ceph_tcp_recvmsg(sock, kaddr + page_offset, length); 548 kunmap(page); 549 550 return ret; 551} 552 553/* 554 * write something. @more is true if caller will be sending more data 555 * shortly. 556 */ 557static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, 558 size_t kvlen, size_t len, int more) 559{ 560 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 561 int r; 562 563 if (more) 564 msg.msg_flags |= MSG_MORE; 565 else 566 msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ 567 568 r = kernel_sendmsg(sock, &msg, iov, kvlen, len); 569 if (r == -EAGAIN) 570 r = 0; 571 return r; 572} 573 574static int __ceph_tcp_sendpage(struct socket *sock, struct page *page, 575 int offset, size_t size, bool more) 576{ 577 int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR); 578 int ret; 579 580 ret = kernel_sendpage(sock, page, offset, size, flags); 581 if (ret == -EAGAIN) 582 ret = 0; 583 584 return ret; 585} 586 587static int ceph_tcp_sendpage(struct socket *sock, struct page *page, 588 int offset, size_t size, bool more) 589{ 590 int ret; 591 struct kvec iov; 592 593 /* sendpage cannot properly handle pages with page_count == 0, 594 * we need to fallback to sendmsg if that's the case */ 595 if (page_count(page) >= 1) 596 return __ceph_tcp_sendpage(sock, page, offset, size, more); 597 598 iov.iov_base = kmap(page) + offset; 599 iov.iov_len = size; 600 ret = ceph_tcp_sendmsg(sock, &iov, 1, size, more); 601 kunmap(page); 602 603 return ret; 604} 605 606/* 607 * Shutdown/close the socket for the given connection. 608 */ 609static int con_close_socket(struct ceph_connection *con) 610{ 611 int rc = 0; 612 613 dout("con_close_socket on %p sock %p\n", con, con->sock); 614 if (con->sock) { 615 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); 616 sock_release(con->sock); 617 con->sock = NULL; 618 } 619 620 /* 621 * Forcibly clear the SOCK_CLOSED flag. It gets set 622 * independent of the connection mutex, and we could have 623 * received a socket close event before we had the chance to 624 * shut the socket down. 625 */ 626 con_flag_clear(con, CON_FLAG_SOCK_CLOSED); 627 628 con_sock_state_closed(con); 629 return rc; 630} 631 632/* 633 * Reset a connection. Discard all incoming and outgoing messages 634 * and clear *_seq state. 635 */ 636static void ceph_msg_remove(struct ceph_msg *msg) 637{ 638 list_del_init(&msg->list_head); 639 BUG_ON(msg->con == NULL); 640 msg->con->ops->put(msg->con); 641 msg->con = NULL; 642 643 ceph_msg_put(msg); 644} 645static void ceph_msg_remove_list(struct list_head *head) 646{ 647 while (!list_empty(head)) { 648 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, 649 list_head); 650 ceph_msg_remove(msg); 651 } 652} 653 654static void reset_connection(struct ceph_connection *con) 655{ 656 /* reset connection, out_queue, msg_ and connect_seq */ 657 /* discard existing out_queue and msg_seq */ 658 dout("reset_connection %p\n", con); 659 ceph_msg_remove_list(&con->out_queue); 660 ceph_msg_remove_list(&con->out_sent); 661 662 if (con->in_msg) { 663 BUG_ON(con->in_msg->con != con); 664 con->in_msg->con = NULL; 665 ceph_msg_put(con->in_msg); 666 con->in_msg = NULL; 667 con->ops->put(con); 668 } 669 670 con->connect_seq = 0; 671 con->out_seq = 0; 672 if (con->out_msg) { 673 ceph_msg_put(con->out_msg); 674 con->out_msg = NULL; 675 } 676 con->in_seq = 0; 677 con->in_seq_acked = 0; 678 679 con->out_skip = 0; 680} 681 682/* 683 * mark a peer down. drop any open connections. 684 */ 685void ceph_con_close(struct ceph_connection *con) 686{ 687 mutex_lock(&con->mutex); 688 dout("con_close %p peer %s\n", con, 689 ceph_pr_addr(&con->peer_addr.in_addr)); 690 con->state = CON_STATE_CLOSED; 691 692 con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */ 693 con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING); 694 con_flag_clear(con, CON_FLAG_WRITE_PENDING); 695 con_flag_clear(con, CON_FLAG_BACKOFF); 696 697 reset_connection(con); 698 con->peer_global_seq = 0; 699 cancel_con(con); 700 con_close_socket(con); 701 mutex_unlock(&con->mutex); 702} 703EXPORT_SYMBOL(ceph_con_close); 704 705/* 706 * Reopen a closed connection, with a new peer address. 707 */ 708void ceph_con_open(struct ceph_connection *con, 709 __u8 entity_type, __u64 entity_num, 710 struct ceph_entity_addr *addr) 711{ 712 mutex_lock(&con->mutex); 713 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); 714 715 WARN_ON(con->state != CON_STATE_CLOSED); 716 con->state = CON_STATE_PREOPEN; 717 718 con->peer_name.type = (__u8) entity_type; 719 con->peer_name.num = cpu_to_le64(entity_num); 720 721 memcpy(&con->peer_addr, addr, sizeof(*addr)); 722 con->delay = 0; /* reset backoff memory */ 723 mutex_unlock(&con->mutex); 724 queue_con(con); 725} 726EXPORT_SYMBOL(ceph_con_open); 727 728/* 729 * return true if this connection ever successfully opened 730 */ 731bool ceph_con_opened(struct ceph_connection *con) 732{ 733 return con->connect_seq > 0; 734} 735 736/* 737 * initialize a new connection. 738 */ 739void ceph_con_init(struct ceph_connection *con, void *private, 740 const struct ceph_connection_operations *ops, 741 struct ceph_messenger *msgr) 742{ 743 dout("con_init %p\n", con); 744 memset(con, 0, sizeof(*con)); 745 con->private = private; 746 con->ops = ops; 747 con->msgr = msgr; 748 749 con_sock_state_init(con); 750 751 mutex_init(&con->mutex); 752 INIT_LIST_HEAD(&con->out_queue); 753 INIT_LIST_HEAD(&con->out_sent); 754 INIT_DELAYED_WORK(&con->work, con_work); 755 756 con->state = CON_STATE_CLOSED; 757} 758EXPORT_SYMBOL(ceph_con_init); 759 760 761/* 762 * We maintain a global counter to order connection attempts. Get 763 * a unique seq greater than @gt. 764 */ 765static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) 766{ 767 u32 ret; 768 769 spin_lock(&msgr->global_seq_lock); 770 if (msgr->global_seq < gt) 771 msgr->global_seq = gt; 772 ret = ++msgr->global_seq; 773 spin_unlock(&msgr->global_seq_lock); 774 return ret; 775} 776 777static void con_out_kvec_reset(struct ceph_connection *con) 778{ 779 BUG_ON(con->out_skip); 780 781 con->out_kvec_left = 0; 782 con->out_kvec_bytes = 0; 783 con->out_kvec_cur = &con->out_kvec[0]; 784} 785 786static void con_out_kvec_add(struct ceph_connection *con, 787 size_t size, void *data) 788{ 789 int index = con->out_kvec_left; 790 791 BUG_ON(con->out_skip); 792 BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); 793 794 con->out_kvec[index].iov_len = size; 795 con->out_kvec[index].iov_base = data; 796 con->out_kvec_left++; 797 con->out_kvec_bytes += size; 798} 799 800/* 801 * Chop off a kvec from the end. Return residual number of bytes for 802 * that kvec, i.e. how many bytes would have been written if the kvec 803 * hadn't been nuked. 804 */ 805static int con_out_kvec_skip(struct ceph_connection *con) 806{ 807 int off = con->out_kvec_cur - con->out_kvec; 808 int skip = 0; 809 810 if (con->out_kvec_bytes > 0) { 811 skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; 812 BUG_ON(con->out_kvec_bytes < skip); 813 BUG_ON(!con->out_kvec_left); 814 con->out_kvec_bytes -= skip; 815 con->out_kvec_left--; 816 } 817 818 return skip; 819} 820 821#ifdef CONFIG_BLOCK 822 823/* 824 * For a bio data item, a piece is whatever remains of the next 825 * entry in the current bio iovec, or the first entry in the next 826 * bio in the list. 827 */ 828static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor, 829 size_t length) 830{ 831 struct ceph_msg_data *data = cursor->data; 832 struct bio *bio; 833 834 BUG_ON(data->type != CEPH_MSG_DATA_BIO); 835 836 bio = data->bio; 837 BUG_ON(!bio); 838 839 cursor->resid = min(length, data->bio_length); 840 cursor->bio = bio; 841 cursor->bvec_iter = bio->bi_iter; 842 cursor->last_piece = 843 cursor->resid <= bio_iter_len(bio, cursor->bvec_iter); 844} 845 846static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor, 847 size_t *page_offset, 848 size_t *length) 849{ 850 struct ceph_msg_data *data = cursor->data; 851 struct bio *bio; 852 struct bio_vec bio_vec; 853 854 BUG_ON(data->type != CEPH_MSG_DATA_BIO); 855 856 bio = cursor->bio; 857 BUG_ON(!bio); 858 859 bio_vec = bio_iter_iovec(bio, cursor->bvec_iter); 860 861 *page_offset = (size_t) bio_vec.bv_offset; 862 BUG_ON(*page_offset >= PAGE_SIZE); 863 if (cursor->last_piece) /* pagelist offset is always 0 */ 864 *length = cursor->resid; 865 else 866 *length = (size_t) bio_vec.bv_len; 867 BUG_ON(*length > cursor->resid); 868 BUG_ON(*page_offset + *length > PAGE_SIZE); 869 870 return bio_vec.bv_page; 871} 872 873static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor, 874 size_t bytes) 875{ 876 struct bio *bio; 877 struct bio_vec bio_vec; 878 879 BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO); 880 881 bio = cursor->bio; 882 BUG_ON(!bio); 883 884 bio_vec = bio_iter_iovec(bio, cursor->bvec_iter); 885 886 /* Advance the cursor offset */ 887 888 BUG_ON(cursor->resid < bytes); 889 cursor->resid -= bytes; 890 891 bio_advance_iter(bio, &cursor->bvec_iter, bytes); 892 893 if (bytes < bio_vec.bv_len) 894 return false; /* more bytes to process in this segment */ 895 896 /* Move on to the next segment, and possibly the next bio */ 897 898 if (!cursor->bvec_iter.bi_size) { 899 bio = bio->bi_next; 900 cursor->bio = bio; 901 if (bio) 902 cursor->bvec_iter = bio->bi_iter; 903 else 904 memset(&cursor->bvec_iter, 0, 905 sizeof(cursor->bvec_iter)); 906 } 907 908 if (!cursor->last_piece) { 909 BUG_ON(!cursor->resid); 910 BUG_ON(!bio); 911 /* A short read is OK, so use <= rather than == */ 912 if (cursor->resid <= bio_iter_len(bio, cursor->bvec_iter)) 913 cursor->last_piece = true; 914 } 915 916 return true; 917} 918#endif /* CONFIG_BLOCK */ 919 920/* 921 * For a page array, a piece comes from the first page in the array 922 * that has not already been fully consumed. 923 */ 924static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor, 925 size_t length) 926{ 927 struct ceph_msg_data *data = cursor->data; 928 int page_count; 929 930 BUG_ON(data->type != CEPH_MSG_DATA_PAGES); 931 932 BUG_ON(!data->pages); 933 BUG_ON(!data->length); 934 935 cursor->resid = min(length, data->length); 936 page_count = calc_pages_for(data->alignment, (u64)data->length); 937 cursor->page_offset = data->alignment & ~PAGE_MASK; 938 cursor->page_index = 0; 939 BUG_ON(page_count > (int)USHRT_MAX); 940 cursor->page_count = (unsigned short)page_count; 941 BUG_ON(length > SIZE_MAX - cursor->page_offset); 942 cursor->last_piece = cursor->page_offset + cursor->resid <= PAGE_SIZE; 943} 944 945static struct page * 946ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor, 947 size_t *page_offset, size_t *length) 948{ 949 struct ceph_msg_data *data = cursor->data; 950 951 BUG_ON(data->type != CEPH_MSG_DATA_PAGES); 952 953 BUG_ON(cursor->page_index >= cursor->page_count); 954 BUG_ON(cursor->page_offset >= PAGE_SIZE); 955 956 *page_offset = cursor->page_offset; 957 if (cursor->last_piece) 958 *length = cursor->resid; 959 else 960 *length = PAGE_SIZE - *page_offset; 961 962 return data->pages[cursor->page_index]; 963} 964 965static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor, 966 size_t bytes) 967{ 968 BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES); 969 970 BUG_ON(cursor->page_offset + bytes > PAGE_SIZE); 971 972 /* Advance the cursor page offset */ 973 974 cursor->resid -= bytes; 975 cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK; 976 if (!bytes || cursor->page_offset) 977 return false; /* more bytes to process in the current page */ 978 979 if (!cursor->resid) 980 return false; /* no more data */ 981 982 /* Move on to the next page; offset is already at 0 */ 983 984 BUG_ON(cursor->page_index >= cursor->page_count); 985 cursor->page_index++; 986 cursor->last_piece = cursor->resid <= PAGE_SIZE; 987 988 return true; 989} 990 991/* 992 * For a pagelist, a piece is whatever remains to be consumed in the 993 * first page in the list, or the front of the next page. 994 */ 995static void 996ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor, 997 size_t length) 998{ 999 struct ceph_msg_data *data = cursor->data; 1000 struct ceph_pagelist *pagelist; 1001 struct page *page; 1002 1003 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); 1004 1005 pagelist = data->pagelist; 1006 BUG_ON(!pagelist); 1007 1008 if (!length) 1009 return; /* pagelist can be assigned but empty */ 1010 1011 BUG_ON(list_empty(&pagelist->head)); 1012 page = list_first_entry(&pagelist->head, struct page, lru); 1013 1014 cursor->resid = min(length, pagelist->length); 1015 cursor->page = page; 1016 cursor->offset = 0; 1017 cursor->last_piece = cursor->resid <= PAGE_SIZE; 1018} 1019 1020static struct page * 1021ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor, 1022 size_t *page_offset, size_t *length) 1023{ 1024 struct ceph_msg_data *data = cursor->data; 1025 struct ceph_pagelist *pagelist; 1026 1027 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); 1028 1029 pagelist = data->pagelist; 1030 BUG_ON(!pagelist); 1031 1032 BUG_ON(!cursor->page); 1033 BUG_ON(cursor->offset + cursor->resid != pagelist->length); 1034 1035 /* offset of first page in pagelist is always 0 */ 1036 *page_offset = cursor->offset & ~PAGE_MASK; 1037 if (cursor->last_piece) 1038 *length = cursor->resid; 1039 else 1040 *length = PAGE_SIZE - *page_offset; 1041 1042 return cursor->page; 1043} 1044 1045static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor, 1046 size_t bytes) 1047{ 1048 struct ceph_msg_data *data = cursor->data; 1049 struct ceph_pagelist *pagelist; 1050 1051 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); 1052 1053 pagelist = data->pagelist; 1054 BUG_ON(!pagelist); 1055 1056 BUG_ON(cursor->offset + cursor->resid != pagelist->length); 1057 BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE); 1058 1059 /* Advance the cursor offset */ 1060 1061 cursor->resid -= bytes; 1062 cursor->offset += bytes; 1063 /* offset of first page in pagelist is always 0 */ 1064 if (!bytes || cursor->offset & ~PAGE_MASK) 1065 return false; /* more bytes to process in the current page */ 1066 1067 if (!cursor->resid) 1068 return false; /* no more data */ 1069 1070 /* Move on to the next page */ 1071 1072 BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); 1073 cursor->page = list_entry_next(cursor->page, lru); 1074 cursor->last_piece = cursor->resid <= PAGE_SIZE; 1075 1076 return true; 1077} 1078 1079/* 1080 * Message data is handled (sent or received) in pieces, where each 1081 * piece resides on a single page. The network layer might not 1082 * consume an entire piece at once. A data item's cursor keeps 1083 * track of which piece is next to process and how much remains to 1084 * be processed in that piece. It also tracks whether the current 1085 * piece is the last one in the data item. 1086 */ 1087static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor) 1088{ 1089 size_t length = cursor->total_resid; 1090 1091 switch (cursor->data->type) { 1092 case CEPH_MSG_DATA_PAGELIST: 1093 ceph_msg_data_pagelist_cursor_init(cursor, length); 1094 break; 1095 case CEPH_MSG_DATA_PAGES: 1096 ceph_msg_data_pages_cursor_init(cursor, length); 1097 break; 1098#ifdef CONFIG_BLOCK 1099 case CEPH_MSG_DATA_BIO: 1100 ceph_msg_data_bio_cursor_init(cursor, length); 1101 break; 1102#endif /* CONFIG_BLOCK */ 1103 case CEPH_MSG_DATA_NONE: 1104 default: 1105 /* BUG(); */ 1106 break; 1107 } 1108 cursor->need_crc = true; 1109} 1110 1111static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length) 1112{ 1113 struct ceph_msg_data_cursor *cursor = &msg->cursor; 1114 struct ceph_msg_data *data; 1115 1116 BUG_ON(!length); 1117 BUG_ON(length > msg->data_length); 1118 BUG_ON(list_empty(&msg->data)); 1119 1120 cursor->data_head = &msg->data; 1121 cursor->total_resid = length; 1122 data = list_first_entry(&msg->data, struct ceph_msg_data, links); 1123 cursor->data = data; 1124 1125 __ceph_msg_data_cursor_init(cursor); 1126} 1127 1128/* 1129 * Return the page containing the next piece to process for a given 1130 * data item, and supply the page offset and length of that piece. 1131 * Indicate whether this is the last piece in this data item. 1132 */ 1133static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, 1134 size_t *page_offset, size_t *length, 1135 bool *last_piece) 1136{ 1137 struct page *page; 1138 1139 switch (cursor->data->type) { 1140 case CEPH_MSG_DATA_PAGELIST: 1141 page = ceph_msg_data_pagelist_next(cursor, page_offset, length); 1142 break; 1143 case CEPH_MSG_DATA_PAGES: 1144 page = ceph_msg_data_pages_next(cursor, page_offset, length); 1145 break; 1146#ifdef CONFIG_BLOCK 1147 case CEPH_MSG_DATA_BIO: 1148 page = ceph_msg_data_bio_next(cursor, page_offset, length); 1149 break; 1150#endif /* CONFIG_BLOCK */ 1151 case CEPH_MSG_DATA_NONE: 1152 default: 1153 page = NULL; 1154 break; 1155 } 1156 BUG_ON(!page); 1157 BUG_ON(*page_offset + *length > PAGE_SIZE); 1158 BUG_ON(!*length); 1159 if (last_piece) 1160 *last_piece = cursor->last_piece; 1161 1162 return page; 1163} 1164 1165/* 1166 * Returns true if the result moves the cursor on to the next piece 1167 * of the data item. 1168 */ 1169static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, 1170 size_t bytes) 1171{ 1172 bool new_piece; 1173 1174 BUG_ON(bytes > cursor->resid); 1175 switch (cursor->data->type) { 1176 case CEPH_MSG_DATA_PAGELIST: 1177 new_piece = ceph_msg_data_pagelist_advance(cursor, bytes); 1178 break; 1179 case CEPH_MSG_DATA_PAGES: 1180 new_piece = ceph_msg_data_pages_advance(cursor, bytes); 1181 break; 1182#ifdef CONFIG_BLOCK 1183 case CEPH_MSG_DATA_BIO: 1184 new_piece = ceph_msg_data_bio_advance(cursor, bytes); 1185 break; 1186#endif /* CONFIG_BLOCK */ 1187 case CEPH_MSG_DATA_NONE: 1188 default: 1189 BUG(); 1190 break; 1191 } 1192 cursor->total_resid -= bytes; 1193 1194 if (!cursor->resid && cursor->total_resid) { 1195 WARN_ON(!cursor->last_piece); 1196 BUG_ON(list_is_last(&cursor->data->links, cursor->data_head)); 1197 cursor->data = list_entry_next(cursor->data, links); 1198 __ceph_msg_data_cursor_init(cursor); 1199 new_piece = true; 1200 } 1201 cursor->need_crc = new_piece; 1202 1203 return new_piece; 1204} 1205 1206static size_t sizeof_footer(struct ceph_connection *con) 1207{ 1208 return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? 1209 sizeof(struct ceph_msg_footer) : 1210 sizeof(struct ceph_msg_footer_old); 1211} 1212 1213static void prepare_message_data(struct ceph_msg *msg, u32 data_len) 1214{ 1215 BUG_ON(!msg); 1216 BUG_ON(!data_len); 1217 1218 /* Initialize data cursor */ 1219 1220 ceph_msg_data_cursor_init(msg, (size_t)data_len); 1221} 1222 1223/* 1224 * Prepare footer for currently outgoing message, and finish things 1225 * off. Assumes out_kvec* are already valid.. we just add on to the end. 1226 */ 1227static void prepare_write_message_footer(struct ceph_connection *con) 1228{ 1229 struct ceph_msg *m = con->out_msg; 1230 int v = con->out_kvec_left; 1231 1232 m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; 1233 1234 dout("prepare_write_message_footer %p\n", con); 1235 con->out_kvec[v].iov_base = &m->footer; 1236 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { 1237 if (con->ops->sign_message) 1238 con->ops->sign_message(con, m); 1239 else 1240 m->footer.sig = 0; 1241 con->out_kvec[v].iov_len = sizeof(m->footer); 1242 con->out_kvec_bytes += sizeof(m->footer); 1243 } else { 1244 m->old_footer.flags = m->footer.flags; 1245 con->out_kvec[v].iov_len = sizeof(m->old_footer); 1246 con->out_kvec_bytes += sizeof(m->old_footer); 1247 } 1248 con->out_kvec_left++; 1249 con->out_more = m->more_to_follow; 1250 con->out_msg_done = true; 1251} 1252 1253/* 1254 * Prepare headers for the next outgoing message. 1255 */ 1256static void prepare_write_message(struct ceph_connection *con) 1257{ 1258 struct ceph_msg *m; 1259 u32 crc; 1260 1261 con_out_kvec_reset(con); 1262 con->out_msg_done = false; 1263 1264 /* Sneak an ack in there first? If we can get it into the same 1265 * TCP packet that's a good thing. */ 1266 if (con->in_seq > con->in_seq_acked) { 1267 con->in_seq_acked = con->in_seq; 1268 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 1269 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 1270 con_out_kvec_add(con, sizeof (con->out_temp_ack), 1271 &con->out_temp_ack); 1272 } 1273 1274 BUG_ON(list_empty(&con->out_queue)); 1275 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); 1276 con->out_msg = m; 1277 BUG_ON(m->con != con); 1278 1279 /* put message on sent list */ 1280 ceph_msg_get(m); 1281 list_move_tail(&m->list_head, &con->out_sent); 1282 1283 /* 1284 * only assign outgoing seq # if we haven't sent this message 1285 * yet. if it is requeued, resend with it's original seq. 1286 */ 1287 if (m->needs_out_seq) { 1288 m->hdr.seq = cpu_to_le64(++con->out_seq); 1289 m->needs_out_seq = false; 1290 } 1291 WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len)); 1292 1293 dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n", 1294 m, con->out_seq, le16_to_cpu(m->hdr.type), 1295 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 1296 m->data_length); 1297 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 1298 1299 /* tag + hdr + front + middle */ 1300 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); 1301 con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); 1302 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); 1303 1304 if (m->middle) 1305 con_out_kvec_add(con, m->middle->vec.iov_len, 1306 m->middle->vec.iov_base); 1307 1308 /* fill in hdr crc and finalize hdr */ 1309 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); 1310 con->out_msg->hdr.crc = cpu_to_le32(crc); 1311 memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); 1312 1313 /* fill in front and middle crc, footer */ 1314 crc = crc32c(0, m->front.iov_base, m->front.iov_len); 1315 con->out_msg->footer.front_crc = cpu_to_le32(crc); 1316 if (m->middle) { 1317 crc = crc32c(0, m->middle->vec.iov_base, 1318 m->middle->vec.iov_len); 1319 con->out_msg->footer.middle_crc = cpu_to_le32(crc); 1320 } else 1321 con->out_msg->footer.middle_crc = 0; 1322 dout("%s front_crc %u middle_crc %u\n", __func__, 1323 le32_to_cpu(con->out_msg->footer.front_crc), 1324 le32_to_cpu(con->out_msg->footer.middle_crc)); 1325 con->out_msg->footer.flags = 0; 1326 1327 /* is there a data payload? */ 1328 con->out_msg->footer.data_crc = 0; 1329 if (m->data_length) { 1330 prepare_message_data(con->out_msg, m->data_length); 1331 con->out_more = 1; /* data + footer will follow */ 1332 } else { 1333 /* no, queue up footer too and be done */ 1334 prepare_write_message_footer(con); 1335 } 1336 1337 con_flag_set(con, CON_FLAG_WRITE_PENDING); 1338} 1339 1340/* 1341 * Prepare an ack. 1342 */ 1343static void prepare_write_ack(struct ceph_connection *con) 1344{ 1345 dout("prepare_write_ack %p %llu -> %llu\n", con, 1346 con->in_seq_acked, con->in_seq); 1347 con->in_seq_acked = con->in_seq; 1348 1349 con_out_kvec_reset(con); 1350 1351 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 1352 1353 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 1354 con_out_kvec_add(con, sizeof (con->out_temp_ack), 1355 &con->out_temp_ack); 1356 1357 con->out_more = 1; /* more will follow.. eventually.. */ 1358 con_flag_set(con, CON_FLAG_WRITE_PENDING); 1359} 1360 1361/* 1362 * Prepare to share the seq during handshake 1363 */ 1364static void prepare_write_seq(struct ceph_connection *con) 1365{ 1366 dout("prepare_write_seq %p %llu -> %llu\n", con, 1367 con->in_seq_acked, con->in_seq); 1368 con->in_seq_acked = con->in_seq; 1369 1370 con_out_kvec_reset(con); 1371 1372 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 1373 con_out_kvec_add(con, sizeof (con->out_temp_ack), 1374 &con->out_temp_ack); 1375 1376 con_flag_set(con, CON_FLAG_WRITE_PENDING); 1377} 1378 1379/* 1380 * Prepare to write keepalive byte. 1381 */ 1382static void prepare_write_keepalive(struct ceph_connection *con) 1383{ 1384 dout("prepare_write_keepalive %p\n", con); 1385 con_out_kvec_reset(con); 1386 con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); 1387 con_flag_set(con, CON_FLAG_WRITE_PENDING); 1388} 1389 1390/* 1391 * Connection negotiation. 1392 */ 1393 1394static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con, 1395 int *auth_proto) 1396{ 1397 struct ceph_auth_handshake *auth; 1398 1399 if (!con->ops->get_authorizer) { 1400 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; 1401 con->out_connect.authorizer_len = 0; 1402 return NULL; 1403 } 1404 1405 /* Can't hold the mutex while getting authorizer */ 1406 mutex_unlock(&con->mutex); 1407 auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); 1408 mutex_lock(&con->mutex); 1409 1410 if (IS_ERR(auth)) 1411 return auth; 1412 if (con->state != CON_STATE_NEGOTIATING) 1413 return ERR_PTR(-EAGAIN); 1414 1415 con->auth_reply_buf = auth->authorizer_reply_buf; 1416 con->auth_reply_buf_len = auth->authorizer_reply_buf_len; 1417 return auth; 1418} 1419 1420/* 1421 * We connected to a peer and are saying hello. 1422 */ 1423static void prepare_write_banner(struct ceph_connection *con) 1424{ 1425 con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); 1426 con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), 1427 &con->msgr->my_enc_addr); 1428 1429 con->out_more = 0; 1430 con_flag_set(con, CON_FLAG_WRITE_PENDING); 1431} 1432 1433static int prepare_write_connect(struct ceph_connection *con) 1434{ 1435 unsigned int global_seq = get_global_seq(con->msgr, 0); 1436 int proto; 1437 int auth_proto; 1438 struct ceph_auth_handshake *auth; 1439 1440 switch (con->peer_name.type) { 1441 case CEPH_ENTITY_TYPE_MON: 1442 proto = CEPH_MONC_PROTOCOL; 1443 break; 1444 case CEPH_ENTITY_TYPE_OSD: 1445 proto = CEPH_OSDC_PROTOCOL; 1446 break; 1447 case CEPH_ENTITY_TYPE_MDS: 1448 proto = CEPH_MDSC_PROTOCOL; 1449 break; 1450 default: 1451 BUG(); 1452 } 1453 1454 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 1455 con->connect_seq, global_seq, proto); 1456 1457 con->out_connect.features = cpu_to_le64(con->msgr->supported_features); 1458 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 1459 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 1460 con->out_connect.global_seq = cpu_to_le32(global_seq); 1461 con->out_connect.protocol_version = cpu_to_le32(proto); 1462 con->out_connect.flags = 0; 1463 1464 auth_proto = CEPH_AUTH_UNKNOWN; 1465 auth = get_connect_authorizer(con, &auth_proto); 1466 if (IS_ERR(auth)) 1467 return PTR_ERR(auth); 1468 1469 con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); 1470 con->out_connect.authorizer_len = auth ? 1471 cpu_to_le32(auth->authorizer_buf_len) : 0; 1472 1473 con_out_kvec_add(con, sizeof (con->out_connect), 1474 &con->out_connect); 1475 if (auth && auth->authorizer_buf_len) 1476 con_out_kvec_add(con, auth->authorizer_buf_len, 1477 auth->authorizer_buf); 1478 1479 con->out_more = 0; 1480 con_flag_set(con, CON_FLAG_WRITE_PENDING); 1481 1482 return 0; 1483} 1484 1485/* 1486 * write as much of pending kvecs to the socket as we can. 1487 * 1 -> done 1488 * 0 -> socket full, but more to do 1489 * <0 -> error 1490 */ 1491static int write_partial_kvec(struct ceph_connection *con) 1492{ 1493 int ret; 1494 1495 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); 1496 while (con->out_kvec_bytes > 0) { 1497 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, 1498 con->out_kvec_left, con->out_kvec_bytes, 1499 con->out_more); 1500 if (ret <= 0) 1501 goto out; 1502 con->out_kvec_bytes -= ret; 1503 if (con->out_kvec_bytes == 0) 1504 break; /* done */ 1505 1506 /* account for full iov entries consumed */ 1507 while (ret >= con->out_kvec_cur->iov_len) { 1508 BUG_ON(!con->out_kvec_left); 1509 ret -= con->out_kvec_cur->iov_len; 1510 con->out_kvec_cur++; 1511 con->out_kvec_left--; 1512 } 1513 /* and for a partially-consumed entry */ 1514 if (ret) { 1515 con->out_kvec_cur->iov_len -= ret; 1516 con->out_kvec_cur->iov_base += ret; 1517 } 1518 } 1519 con->out_kvec_left = 0; 1520 ret = 1; 1521out: 1522 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, 1523 con->out_kvec_bytes, con->out_kvec_left, ret); 1524 return ret; /* done! */ 1525} 1526 1527static u32 ceph_crc32c_page(u32 crc, struct page *page, 1528 unsigned int page_offset, 1529 unsigned int length) 1530{ 1531 char *kaddr; 1532 1533 kaddr = kmap(page); 1534 BUG_ON(kaddr == NULL); 1535 crc = crc32c(crc, kaddr + page_offset, length); 1536 kunmap(page); 1537 1538 return crc; 1539} 1540/* 1541 * Write as much message data payload as we can. If we finish, queue 1542 * up the footer. 1543 * 1 -> done, footer is now queued in out_kvec[]. 1544 * 0 -> socket full, but more to do 1545 * <0 -> error 1546 */ 1547static int write_partial_message_data(struct ceph_connection *con) 1548{ 1549 struct ceph_msg *msg = con->out_msg; 1550 struct ceph_msg_data_cursor *cursor = &msg->cursor; 1551 bool do_datacrc = !con->msgr->nocrc; 1552 u32 crc; 1553 1554 dout("%s %p msg %p\n", __func__, con, msg); 1555 1556 if (list_empty(&msg->data)) 1557 return -EINVAL; 1558 1559 /* 1560 * Iterate through each page that contains data to be 1561 * written, and send as much as possible for each. 1562 * 1563 * If we are calculating the data crc (the default), we will 1564 * need to map the page. If we have no pages, they have 1565 * been revoked, so use the zero page. 1566 */ 1567 crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0; 1568 while (cursor->resid) { 1569 struct page *page; 1570 size_t page_offset; 1571 size_t length; 1572 bool last_piece; 1573 bool need_crc; 1574 int ret; 1575 1576 page = ceph_msg_data_next(&msg->cursor, &page_offset, &length, 1577 &last_piece); 1578 ret = ceph_tcp_sendpage(con->sock, page, page_offset, 1579 length, last_piece); 1580 if (ret <= 0) { 1581 if (do_datacrc) 1582 msg->footer.data_crc = cpu_to_le32(crc); 1583 1584 return ret; 1585 } 1586 if (do_datacrc && cursor->need_crc) 1587 crc = ceph_crc32c_page(crc, page, page_offset, length); 1588 need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret); 1589 } 1590 1591 dout("%s %p msg %p done\n", __func__, con, msg); 1592 1593 /* prepare and queue up footer, too */ 1594 if (do_datacrc) 1595 msg->footer.data_crc = cpu_to_le32(crc); 1596 else 1597 msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 1598 con_out_kvec_reset(con); 1599 prepare_write_message_footer(con); 1600 1601 return 1; /* must return > 0 to indicate success */ 1602} 1603 1604/* 1605 * write some zeros 1606 */ 1607static int write_partial_skip(struct ceph_connection *con) 1608{ 1609 int ret; 1610 1611 dout("%s %p %d left\n", __func__, con, con->out_skip); 1612 while (con->out_skip > 0) { 1613 size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE); 1614 1615 ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, true); 1616 if (ret <= 0) 1617 goto out; 1618 con->out_skip -= ret; 1619 } 1620 ret = 1; 1621out: 1622 return ret; 1623} 1624 1625/* 1626 * Prepare to read connection handshake, or an ack. 1627 */ 1628static void prepare_read_banner(struct ceph_connection *con) 1629{ 1630 dout("prepare_read_banner %p\n", con); 1631 con->in_base_pos = 0; 1632} 1633 1634static void prepare_read_connect(struct ceph_connection *con) 1635{ 1636 dout("prepare_read_connect %p\n", con); 1637 con->in_base_pos = 0; 1638} 1639 1640static void prepare_read_ack(struct ceph_connection *con) 1641{ 1642 dout("prepare_read_ack %p\n", con); 1643 con->in_base_pos = 0; 1644} 1645 1646static void prepare_read_seq(struct ceph_connection *con) 1647{ 1648 dout("prepare_read_seq %p\n", con); 1649 con->in_base_pos = 0; 1650 con->in_tag = CEPH_MSGR_TAG_SEQ; 1651} 1652 1653static void prepare_read_tag(struct ceph_connection *con) 1654{ 1655 dout("prepare_read_tag %p\n", con); 1656 con->in_base_pos = 0; 1657 con->in_tag = CEPH_MSGR_TAG_READY; 1658} 1659 1660/* 1661 * Prepare to read a message. 1662 */ 1663static int prepare_read_message(struct ceph_connection *con) 1664{ 1665 dout("prepare_read_message %p\n", con); 1666 BUG_ON(con->in_msg != NULL); 1667 con->in_base_pos = 0; 1668 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; 1669 return 0; 1670} 1671 1672 1673static int read_partial(struct ceph_connection *con, 1674 int end, int size, void *object) 1675{ 1676 while (con->in_base_pos < end) { 1677 int left = end - con->in_base_pos; 1678 int have = size - left; 1679 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); 1680 if (ret <= 0) 1681 return ret; 1682 con->in_base_pos += ret; 1683 } 1684 return 1; 1685} 1686 1687 1688/* 1689 * Read all or part of the connect-side handshake on a new connection 1690 */ 1691static int read_partial_banner(struct ceph_connection *con) 1692{ 1693 int size; 1694 int end; 1695 int ret; 1696 1697 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); 1698 1699 /* peer's banner */ 1700 size = strlen(CEPH_BANNER); 1701 end = size; 1702 ret = read_partial(con, end, size, con->in_banner); 1703 if (ret <= 0) 1704 goto out; 1705 1706 size = sizeof (con->actual_peer_addr); 1707 end += size; 1708 ret = read_partial(con, end, size, &con->actual_peer_addr); 1709 if (ret <= 0) 1710 goto out; 1711 1712 size = sizeof (con->peer_addr_for_me); 1713 end += size; 1714 ret = read_partial(con, end, size, &con->peer_addr_for_me); 1715 if (ret <= 0) 1716 goto out; 1717 1718out: 1719 return ret; 1720} 1721 1722static int read_partial_connect(struct ceph_connection *con) 1723{ 1724 int size; 1725 int end; 1726 int ret; 1727 1728 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); 1729 1730 size = sizeof (con->in_reply); 1731 end = size; 1732 ret = read_partial(con, end, size, &con->in_reply); 1733 if (ret <= 0) 1734 goto out; 1735 1736 size = le32_to_cpu(con->in_reply.authorizer_len); 1737 end += size; 1738 ret = read_partial(con, end, size, con->auth_reply_buf); 1739 if (ret <= 0) 1740 goto out; 1741 1742 dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", 1743 con, (int)con->in_reply.tag, 1744 le32_to_cpu(con->in_reply.connect_seq), 1745 le32_to_cpu(con->in_reply.global_seq)); 1746out: 1747 return ret; 1748 1749} 1750 1751/* 1752 * Verify the hello banner looks okay. 1753 */ 1754static int verify_hello(struct ceph_connection *con) 1755{ 1756 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { 1757 pr_err("connect to %s got bad banner\n", 1758 ceph_pr_addr(&con->peer_addr.in_addr)); 1759 con->error_msg = "protocol error, bad banner"; 1760 return -1; 1761 } 1762 return 0; 1763} 1764 1765static bool addr_is_blank(struct sockaddr_storage *ss) 1766{ 1767 switch (ss->ss_family) { 1768 case AF_INET: 1769 return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0; 1770 case AF_INET6: 1771 return 1772 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 && 1773 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 && 1774 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 && 1775 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0; 1776 } 1777 return false; 1778} 1779 1780static int addr_port(struct sockaddr_storage *ss) 1781{ 1782 switch (ss->ss_family) { 1783 case AF_INET: 1784 return ntohs(((struct sockaddr_in *)ss)->sin_port); 1785 case AF_INET6: 1786 return ntohs(((struct sockaddr_in6 *)ss)->sin6_port); 1787 } 1788 return 0; 1789} 1790 1791static void addr_set_port(struct sockaddr_storage *ss, int p) 1792{ 1793 switch (ss->ss_family) { 1794 case AF_INET: 1795 ((struct sockaddr_in *)ss)->sin_port = htons(p); 1796 break; 1797 case AF_INET6: 1798 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p); 1799 break; 1800 } 1801} 1802 1803/* 1804 * Unlike other *_pton function semantics, zero indicates success. 1805 */ 1806static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss, 1807 char delim, const char **ipend) 1808{ 1809 struct sockaddr_in *in4 = (struct sockaddr_in *) ss; 1810 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss; 1811 1812 memset(ss, 0, sizeof(*ss)); 1813 1814 if (in4_pton(str, len, (u8 *)&in4->sin_addr.s_addr, delim, ipend)) { 1815 ss->ss_family = AF_INET; 1816 return 0; 1817 } 1818 1819 if (in6_pton(str, len, (u8 *)&in6->sin6_addr.s6_addr, delim, ipend)) { 1820 ss->ss_family = AF_INET6; 1821 return 0; 1822 } 1823 1824 return -EINVAL; 1825} 1826 1827/* 1828 * Extract hostname string and resolve using kernel DNS facility. 1829 */ 1830#ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER 1831static int ceph_dns_resolve_name(const char *name, size_t namelen, 1832 struct sockaddr_storage *ss, char delim, const char **ipend) 1833{ 1834 const char *end, *delim_p; 1835 char *colon_p, *ip_addr = NULL; 1836 int ip_len, ret; 1837 1838 /* 1839 * The end of the hostname occurs immediately preceding the delimiter or 1840 * the port marker (':') where the delimiter takes precedence. 1841 */ 1842 delim_p = memchr(name, delim, namelen); 1843 colon_p = memchr(name, ':', namelen); 1844 1845 if (delim_p && colon_p) 1846 end = delim_p < colon_p ? delim_p : colon_p; 1847 else if (!delim_p && colon_p) 1848 end = colon_p; 1849 else { 1850 end = delim_p; 1851 if (!end) /* case: hostname:/ */ 1852 end = name + namelen; 1853 } 1854 1855 if (end <= name) 1856 return -EINVAL; 1857 1858 /* do dns_resolve upcall */ 1859 ip_len = dns_query(NULL, name, end - name, NULL, &ip_addr, NULL); 1860 if (ip_len > 0) 1861 ret = ceph_pton(ip_addr, ip_len, ss, -1, NULL); 1862 else 1863 ret = -ESRCH; 1864 1865 kfree(ip_addr); 1866 1867 *ipend = end; 1868 1869 pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name, 1870 ret, ret ? "failed" : ceph_pr_addr(ss)); 1871 1872 return ret; 1873} 1874#else 1875static inline int ceph_dns_resolve_name(const char *name, size_t namelen, 1876 struct sockaddr_storage *ss, char delim, const char **ipend) 1877{ 1878 return -EINVAL; 1879} 1880#endif 1881 1882/* 1883 * Parse a server name (IP or hostname). If a valid IP address is not found 1884 * then try to extract a hostname to resolve using userspace DNS upcall. 1885 */ 1886static int ceph_parse_server_name(const char *name, size_t namelen, 1887 struct sockaddr_storage *ss, char delim, const char **ipend) 1888{ 1889 int ret; 1890 1891 ret = ceph_pton(name, namelen, ss, delim, ipend); 1892 if (ret) 1893 ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend); 1894 1895 return ret; 1896} 1897 1898/* 1899 * Parse an ip[:port] list into an addr array. Use the default 1900 * monitor port if a port isn't specified. 1901 */ 1902int ceph_parse_ips(const char *c, const char *end, 1903 struct ceph_entity_addr *addr, 1904 int max_count, int *count) 1905{ 1906 int i, ret = -EINVAL; 1907 const char *p = c; 1908 1909 dout("parse_ips on '%.*s'\n", (int)(end-c), c); 1910 for (i = 0; i < max_count; i++) { 1911 const char *ipend; 1912 struct sockaddr_storage *ss = &addr[i].in_addr; 1913 int port; 1914 char delim = ','; 1915 1916 if (*p == '[') { 1917 delim = ']'; 1918 p++; 1919 } 1920 1921 ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend); 1922 if (ret) 1923 goto bad; 1924 ret = -EINVAL; 1925 1926 p = ipend; 1927 1928 if (delim == ']') { 1929 if (*p != ']') { 1930 dout("missing matching ']'\n"); 1931 goto bad; 1932 } 1933 p++; 1934 } 1935 1936 /* port? */ 1937 if (p < end && *p == ':') { 1938 port = 0; 1939 p++; 1940 while (p < end && *p >= '0' && *p <= '9') { 1941 port = (port * 10) + (*p - '0'); 1942 p++; 1943 } 1944 if (port == 0) 1945 port = CEPH_MON_PORT; 1946 else if (port > 65535) 1947 goto bad; 1948 } else { 1949 port = CEPH_MON_PORT; 1950 } 1951 1952 addr_set_port(ss, port); 1953 1954 dout("parse_ips got %s\n", ceph_pr_addr(ss)); 1955 1956 if (p == end) 1957 break; 1958 if (*p != ',') 1959 goto bad; 1960 p++; 1961 } 1962 1963 if (p != end) 1964 goto bad; 1965 1966 if (count) 1967 *count = i + 1; 1968 return 0; 1969 1970bad: 1971 pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c); 1972 return ret; 1973} 1974EXPORT_SYMBOL(ceph_parse_ips); 1975 1976static int process_banner(struct ceph_connection *con) 1977{ 1978 dout("process_banner on %p\n", con); 1979 1980 if (verify_hello(con) < 0) 1981 return -1; 1982 1983 ceph_decode_addr(&con->actual_peer_addr); 1984 ceph_decode_addr(&con->peer_addr_for_me); 1985 1986 /* 1987 * Make sure the other end is who we wanted. note that the other 1988 * end may not yet know their ip address, so if it's 0.0.0.0, give 1989 * them the benefit of the doubt. 1990 */ 1991 if (memcmp(&con->peer_addr, &con->actual_peer_addr, 1992 sizeof(con->peer_addr)) != 0 && 1993 !(addr_is_blank(&con->actual_peer_addr.in_addr) && 1994 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { 1995 pr_warn("wrong peer, want %s/%d, got %s/%d\n", 1996 ceph_pr_addr(&con->peer_addr.in_addr), 1997 (int)le32_to_cpu(con->peer_addr.nonce), 1998 ceph_pr_addr(&con->actual_peer_addr.in_addr), 1999 (int)le32_to_cpu(con->actual_peer_addr.nonce)); 2000 con->error_msg = "wrong peer at address"; 2001 return -1; 2002 } 2003 2004 /* 2005 * did we learn our address? 2006 */ 2007 if (addr_is_blank(&con->msgr->inst.addr.in_addr)) { 2008 int port = addr_port(&con->msgr->inst.addr.in_addr); 2009 2010 memcpy(&con->msgr->inst.addr.in_addr, 2011 &con->peer_addr_for_me.in_addr, 2012 sizeof(con->peer_addr_for_me.in_addr)); 2013 addr_set_port(&con->msgr->inst.addr.in_addr, port); 2014 encode_my_addr(con->msgr); 2015 dout("process_banner learned my addr is %s\n", 2016 ceph_pr_addr(&con->msgr->inst.addr.in_addr)); 2017 } 2018 2019 return 0; 2020} 2021 2022static int process_connect(struct ceph_connection *con) 2023{ 2024 u64 sup_feat = con->msgr->supported_features; 2025 u64 req_feat = con->msgr->required_features; 2026 u64 server_feat = ceph_sanitize_features( 2027 le64_to_cpu(con->in_reply.features)); 2028 int ret; 2029 2030 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 2031 2032 switch (con->in_reply.tag) { 2033 case CEPH_MSGR_TAG_FEATURES: 2034 pr_err("%s%lld %s feature set mismatch," 2035 " my %llx < server's %llx, missing %llx\n", 2036 ENTITY_NAME(con->peer_name), 2037 ceph_pr_addr(&con->peer_addr.in_addr), 2038 sup_feat, server_feat, server_feat & ~sup_feat); 2039 con->error_msg = "missing required protocol features"; 2040 reset_connection(con); 2041 return -1; 2042 2043 case CEPH_MSGR_TAG_BADPROTOVER: 2044 pr_err("%s%lld %s protocol version mismatch," 2045 " my %d != server's %d\n", 2046 ENTITY_NAME(con->peer_name), 2047 ceph_pr_addr(&con->peer_addr.in_addr), 2048 le32_to_cpu(con->out_connect.protocol_version), 2049 le32_to_cpu(con->in_reply.protocol_version)); 2050 con->error_msg = "protocol version mismatch"; 2051 reset_connection(con); 2052 return -1; 2053 2054 case CEPH_MSGR_TAG_BADAUTHORIZER: 2055 con->auth_retry++; 2056 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, 2057 con->auth_retry); 2058 if (con->auth_retry == 2) { 2059 con->error_msg = "connect authorization failure"; 2060 return -1; 2061 } 2062 con_out_kvec_reset(con); 2063 ret = prepare_write_connect(con); 2064 if (ret < 0) 2065 return ret; 2066 prepare_read_connect(con); 2067 break; 2068 2069 case CEPH_MSGR_TAG_RESETSESSION: 2070 /* 2071 * If we connected with a large connect_seq but the peer 2072 * has no record of a session with us (no connection, or 2073 * connect_seq == 0), they will send RESETSESION to indicate 2074 * that they must have reset their session, and may have 2075 * dropped messages. 2076 */ 2077 dout("process_connect got RESET peer seq %u\n", 2078 le32_to_cpu(con->in_reply.connect_seq)); 2079 pr_err("%s%lld %s connection reset\n", 2080 ENTITY_NAME(con->peer_name), 2081 ceph_pr_addr(&con->peer_addr.in_addr)); 2082 reset_connection(con); 2083 con_out_kvec_reset(con); 2084 ret = prepare_write_connect(con); 2085 if (ret < 0) 2086 return ret; 2087 prepare_read_connect(con); 2088 2089 /* Tell ceph about it. */ 2090 mutex_unlock(&con->mutex); 2091 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); 2092 if (con->ops->peer_reset) 2093 con->ops->peer_reset(con); 2094 mutex_lock(&con->mutex); 2095 if (con->state != CON_STATE_NEGOTIATING) 2096 return -EAGAIN; 2097 break; 2098 2099 case CEPH_MSGR_TAG_RETRY_SESSION: 2100 /* 2101 * If we sent a smaller connect_seq than the peer has, try 2102 * again with a larger value. 2103 */ 2104 dout("process_connect got RETRY_SESSION my seq %u, peer %u\n", 2105 le32_to_cpu(con->out_connect.connect_seq), 2106 le32_to_cpu(con->in_reply.connect_seq)); 2107 con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); 2108 con_out_kvec_reset(con); 2109 ret = prepare_write_connect(con); 2110 if (ret < 0) 2111 return ret; 2112 prepare_read_connect(con); 2113 break; 2114 2115 case CEPH_MSGR_TAG_RETRY_GLOBAL: 2116 /* 2117 * If we sent a smaller global_seq than the peer has, try 2118 * again with a larger value. 2119 */ 2120 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", 2121 con->peer_global_seq, 2122 le32_to_cpu(con->in_reply.global_seq)); 2123 get_global_seq(con->msgr, 2124 le32_to_cpu(con->in_reply.global_seq)); 2125 con_out_kvec_reset(con); 2126 ret = prepare_write_connect(con); 2127 if (ret < 0) 2128 return ret; 2129 prepare_read_connect(con); 2130 break; 2131 2132 case CEPH_MSGR_TAG_SEQ: 2133 case CEPH_MSGR_TAG_READY: 2134 if (req_feat & ~server_feat) { 2135 pr_err("%s%lld %s protocol feature mismatch," 2136 " my required %llx > server's %llx, need %llx\n", 2137 ENTITY_NAME(con->peer_name), 2138 ceph_pr_addr(&con->peer_addr.in_addr), 2139 req_feat, server_feat, req_feat & ~server_feat); 2140 con->error_msg = "missing required protocol features"; 2141 reset_connection(con); 2142 return -1; 2143 } 2144 2145 WARN_ON(con->state != CON_STATE_NEGOTIATING); 2146 con->state = CON_STATE_OPEN; 2147 con->auth_retry = 0; /* we authenticated; clear flag */ 2148 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 2149 con->connect_seq++; 2150 con->peer_features = server_feat; 2151 dout("process_connect got READY gseq %d cseq %d (%d)\n", 2152 con->peer_global_seq, 2153 le32_to_cpu(con->in_reply.connect_seq), 2154 con->connect_seq); 2155 WARN_ON(con->connect_seq != 2156 le32_to_cpu(con->in_reply.connect_seq)); 2157 2158 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 2159 con_flag_set(con, CON_FLAG_LOSSYTX); 2160 2161 con->delay = 0; /* reset backoff memory */ 2162 2163 if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) { 2164 prepare_write_seq(con); 2165 prepare_read_seq(con); 2166 } else { 2167 prepare_read_tag(con); 2168 } 2169 break; 2170 2171 case CEPH_MSGR_TAG_WAIT: 2172 /* 2173 * If there is a connection race (we are opening 2174 * connections to each other), one of us may just have 2175 * to WAIT. This shouldn't happen if we are the 2176 * client. 2177 */ 2178 con->error_msg = "protocol error, got WAIT as client"; 2179 return -1; 2180 2181 default: 2182 con->error_msg = "protocol error, garbage tag during connect"; 2183 return -1; 2184 } 2185 return 0; 2186} 2187 2188 2189/* 2190 * read (part of) an ack 2191 */ 2192static int read_partial_ack(struct ceph_connection *con) 2193{ 2194 int size = sizeof (con->in_temp_ack); 2195 int end = size; 2196 2197 return read_partial(con, end, size, &con->in_temp_ack); 2198} 2199 2200/* 2201 * We can finally discard anything that's been acked. 2202 */ 2203static void process_ack(struct ceph_connection *con) 2204{ 2205 struct ceph_msg *m; 2206 u64 ack = le64_to_cpu(con->in_temp_ack); 2207 u64 seq; 2208 2209 while (!list_empty(&con->out_sent)) { 2210 m = list_first_entry(&con->out_sent, struct ceph_msg, 2211 list_head); 2212 seq = le64_to_cpu(m->hdr.seq); 2213 if (seq > ack) 2214 break; 2215 dout("got ack for seq %llu type %d at %p\n", seq, 2216 le16_to_cpu(m->hdr.type), m); 2217 m->ack_stamp = jiffies; 2218 ceph_msg_remove(m); 2219 } 2220 prepare_read_tag(con); 2221} 2222 2223 2224static int read_partial_message_section(struct ceph_connection *con, 2225 struct kvec *section, 2226 unsigned int sec_len, u32 *crc) 2227{ 2228 int ret, left; 2229 2230 BUG_ON(!section); 2231 2232 while (section->iov_len < sec_len) { 2233 BUG_ON(section->iov_base == NULL); 2234 left = sec_len - section->iov_len; 2235 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + 2236 section->iov_len, left); 2237 if (ret <= 0) 2238 return ret; 2239 section->iov_len += ret; 2240 } 2241 if (section->iov_len == sec_len) 2242 *crc = crc32c(0, section->iov_base, section->iov_len); 2243 2244 return 1; 2245} 2246 2247static int read_partial_msg_data(struct ceph_connection *con) 2248{ 2249 struct ceph_msg *msg = con->in_msg; 2250 struct ceph_msg_data_cursor *cursor = &msg->cursor; 2251 const bool do_datacrc = !con->msgr->nocrc; 2252 struct page *page; 2253 size_t page_offset; 2254 size_t length; 2255 u32 crc = 0; 2256 int ret; 2257 2258 BUG_ON(!msg); 2259 if (list_empty(&msg->data)) 2260 return -EIO; 2261 2262 if (do_datacrc) 2263 crc = con->in_data_crc; 2264 while (cursor->resid) { 2265 page = ceph_msg_data_next(&msg->cursor, &page_offset, &length, 2266 NULL); 2267 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); 2268 if (ret <= 0) { 2269 if (do_datacrc) 2270 con->in_data_crc = crc; 2271 2272 return ret; 2273 } 2274 2275 if (do_datacrc) 2276 crc = ceph_crc32c_page(crc, page, page_offset, ret); 2277 (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret); 2278 } 2279 if (do_datacrc) 2280 con->in_data_crc = crc; 2281 2282 return 1; /* must return > 0 to indicate success */ 2283} 2284 2285/* 2286 * read (part of) a message. 2287 */ 2288static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip); 2289 2290static int read_partial_message(struct ceph_connection *con) 2291{ 2292 struct ceph_msg *m = con->in_msg; 2293 int size; 2294 int end; 2295 int ret; 2296 unsigned int front_len, middle_len, data_len; 2297 bool do_datacrc = !con->msgr->nocrc; 2298 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); 2299 u64 seq; 2300 u32 crc; 2301 2302 dout("read_partial_message con %p msg %p\n", con, m); 2303 2304 /* header */ 2305 size = sizeof (con->in_hdr); 2306 end = size; 2307 ret = read_partial(con, end, size, &con->in_hdr); 2308 if (ret <= 0) 2309 return ret; 2310 2311 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); 2312 if (cpu_to_le32(crc) != con->in_hdr.crc) { 2313 pr_err("read_partial_message bad hdr crc %u != expected %u\n", 2314 crc, con->in_hdr.crc); 2315 return -EBADMSG; 2316 } 2317 2318 front_len = le32_to_cpu(con->in_hdr.front_len); 2319 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 2320 return -EIO; 2321 middle_len = le32_to_cpu(con->in_hdr.middle_len); 2322 if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN) 2323 return -EIO; 2324 data_len = le32_to_cpu(con->in_hdr.data_len); 2325 if (data_len > CEPH_MSG_MAX_DATA_LEN) 2326 return -EIO; 2327 2328 /* verify seq# */ 2329 seq = le64_to_cpu(con->in_hdr.seq); 2330 if ((s64)seq - (s64)con->in_seq < 1) { 2331 pr_info("skipping %s%lld %s seq %lld expected %lld\n", 2332 ENTITY_NAME(con->peer_name), 2333 ceph_pr_addr(&con->peer_addr.in_addr), 2334 seq, con->in_seq + 1); 2335 con->in_base_pos = -front_len - middle_len - data_len - 2336 sizeof_footer(con); 2337 con->in_tag = CEPH_MSGR_TAG_READY; 2338 return 1; 2339 } else if ((s64)seq - (s64)con->in_seq > 1) { 2340 pr_err("read_partial_message bad seq %lld expected %lld\n", 2341 seq, con->in_seq + 1); 2342 con->error_msg = "bad message sequence # for incoming message"; 2343 return -EBADE; 2344 } 2345 2346 /* allocate message? */ 2347 if (!con->in_msg) { 2348 int skip = 0; 2349 2350 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 2351 front_len, data_len); 2352 ret = ceph_con_in_msg_alloc(con, &skip); 2353 if (ret < 0) 2354 return ret; 2355 2356 BUG_ON(!con->in_msg ^ skip); 2357 if (con->in_msg && data_len > con->in_msg->data_length) { 2358 pr_warn("%s skipping long message (%u > %zd)\n", 2359 __func__, data_len, con->in_msg->data_length); 2360 ceph_msg_put(con->in_msg); 2361 con->in_msg = NULL; 2362 skip = 1; 2363 } 2364 if (skip) { 2365 /* skip this message */ 2366 dout("alloc_msg said skip message\n"); 2367 con->in_base_pos = -front_len - middle_len - data_len - 2368 sizeof_footer(con); 2369 con->in_tag = CEPH_MSGR_TAG_READY; 2370 con->in_seq++; 2371 return 1; 2372 } 2373 2374 BUG_ON(!con->in_msg); 2375 BUG_ON(con->in_msg->con != con); 2376 m = con->in_msg; 2377 m->front.iov_len = 0; /* haven't read it yet */ 2378 if (m->middle) 2379 m->middle->vec.iov_len = 0; 2380 2381 /* prepare for data payload, if any */ 2382 2383 if (data_len) 2384 prepare_message_data(con->in_msg, data_len); 2385 } 2386 2387 /* front */ 2388 ret = read_partial_message_section(con, &m->front, front_len, 2389 &con->in_front_crc); 2390 if (ret <= 0) 2391 return ret; 2392 2393 /* middle */ 2394 if (m->middle) { 2395 ret = read_partial_message_section(con, &m->middle->vec, 2396 middle_len, 2397 &con->in_middle_crc); 2398 if (ret <= 0) 2399 return ret; 2400 } 2401 2402 /* (page) data */ 2403 if (data_len) { 2404 ret = read_partial_msg_data(con); 2405 if (ret <= 0) 2406 return ret; 2407 } 2408 2409 /* footer */ 2410 if (need_sign) 2411 size = sizeof(m->footer); 2412 else 2413 size = sizeof(m->old_footer); 2414 2415 end += size; 2416 ret = read_partial(con, end, size, &m->footer); 2417 if (ret <= 0) 2418 return ret; 2419 2420 if (!need_sign) { 2421 m->footer.flags = m->old_footer.flags; 2422 m->footer.sig = 0; 2423 } 2424 2425 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 2426 m, front_len, m->footer.front_crc, middle_len, 2427 m->footer.middle_crc, data_len, m->footer.data_crc); 2428 2429 /* crc ok? */ 2430 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { 2431 pr_err("read_partial_message %p front crc %u != exp. %u\n", 2432 m, con->in_front_crc, m->footer.front_crc); 2433 return -EBADMSG; 2434 } 2435 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { 2436 pr_err("read_partial_message %p middle crc %u != exp %u\n", 2437 m, con->in_middle_crc, m->footer.middle_crc); 2438 return -EBADMSG; 2439 } 2440 if (do_datacrc && 2441 (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && 2442 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { 2443 pr_err("read_partial_message %p data crc %u != exp. %u\n", m, 2444 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); 2445 return -EBADMSG; 2446 } 2447 2448 if (need_sign && con->ops->check_message_signature && 2449 con->ops->check_message_signature(con, m)) { 2450 pr_err("read_partial_message %p signature check failed\n", m); 2451 return -EBADMSG; 2452 } 2453 2454 return 1; /* done! */ 2455} 2456 2457/* 2458 * Process message. This happens in the worker thread. The callback should 2459 * be careful not to do anything that waits on other incoming messages or it 2460 * may deadlock. 2461 */ 2462static void process_message(struct ceph_connection *con) 2463{ 2464 struct ceph_msg *msg; 2465 2466 BUG_ON(con->in_msg->con != con); 2467 con->in_msg->con = NULL; 2468 msg = con->in_msg; 2469 con->in_msg = NULL; 2470 con->ops->put(con); 2471 2472 /* if first message, set peer_name */ 2473 if (con->peer_name.type == 0) 2474 con->peer_name = msg->hdr.src; 2475 2476 con->in_seq++; 2477 mutex_unlock(&con->mutex); 2478 2479 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", 2480 msg, le64_to_cpu(msg->hdr.seq), 2481 ENTITY_NAME(msg->hdr.src), 2482 le16_to_cpu(msg->hdr.type), 2483 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 2484 le32_to_cpu(msg->hdr.front_len), 2485 le32_to_cpu(msg->hdr.data_len), 2486 con->in_front_crc, con->in_middle_crc, con->in_data_crc); 2487 con->ops->dispatch(con, msg); 2488 2489 mutex_lock(&con->mutex); 2490} 2491 2492 2493/* 2494 * Write something to the socket. Called in a worker thread when the 2495 * socket appears to be writeable and we have something ready to send. 2496 */ 2497static int try_write(struct ceph_connection *con) 2498{ 2499 int ret = 1; 2500 2501 dout("try_write start %p state %lu\n", con, con->state); 2502 2503more: 2504 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 2505 2506 /* open the socket first? */ 2507 if (con->state == CON_STATE_PREOPEN) { 2508 BUG_ON(con->sock); 2509 con->state = CON_STATE_CONNECTING; 2510 2511 con_out_kvec_reset(con); 2512 prepare_write_banner(con); 2513 prepare_read_banner(con); 2514 2515 BUG_ON(con->in_msg); 2516 con->in_tag = CEPH_MSGR_TAG_READY; 2517 dout("try_write initiating connect on %p new state %lu\n", 2518 con, con->state); 2519 ret = ceph_tcp_connect(con); 2520 if (ret < 0) { 2521 con->error_msg = "connect error"; 2522 goto out; 2523 } 2524 } 2525 2526more_kvec: 2527 /* kvec data queued? */ 2528 if (con->out_kvec_left) { 2529 ret = write_partial_kvec(con); 2530 if (ret <= 0) 2531 goto out; 2532 } 2533 if (con->out_skip) { 2534 ret = write_partial_skip(con); 2535 if (ret <= 0) 2536 goto out; 2537 } 2538 2539 /* msg pages? */ 2540 if (con->out_msg) { 2541 if (con->out_msg_done) { 2542 ceph_msg_put(con->out_msg); 2543 con->out_msg = NULL; /* we're done with this one */ 2544 goto do_next; 2545 } 2546 2547 ret = write_partial_message_data(con); 2548 if (ret == 1) 2549 goto more_kvec; /* we need to send the footer, too! */ 2550 if (ret == 0) 2551 goto out; 2552 if (ret < 0) { 2553 dout("try_write write_partial_message_data err %d\n", 2554 ret); 2555 goto out; 2556 } 2557 } 2558 2559do_next: 2560 if (con->state == CON_STATE_OPEN) { 2561 /* is anything else pending? */ 2562 if (!list_empty(&con->out_queue)) { 2563 prepare_write_message(con); 2564 goto more; 2565 } 2566 if (con->in_seq > con->in_seq_acked) { 2567 prepare_write_ack(con); 2568 goto more; 2569 } 2570 if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { 2571 prepare_write_keepalive(con); 2572 goto more; 2573 } 2574 } 2575 2576 /* Nothing to do! */ 2577 con_flag_clear(con, CON_FLAG_WRITE_PENDING); 2578 dout("try_write nothing else to write.\n"); 2579 ret = 0; 2580out: 2581 dout("try_write done on %p ret %d\n", con, ret); 2582 return ret; 2583} 2584 2585 2586 2587/* 2588 * Read what we can from the socket. 2589 */ 2590static int try_read(struct ceph_connection *con) 2591{ 2592 int ret = -1; 2593 2594more: 2595 dout("try_read start on %p state %lu\n", con, con->state); 2596 if (con->state != CON_STATE_CONNECTING && 2597 con->state != CON_STATE_NEGOTIATING && 2598 con->state != CON_STATE_OPEN) 2599 return 0; 2600 2601 BUG_ON(!con->sock); 2602 2603 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 2604 con->in_base_pos); 2605 2606 if (con->state == CON_STATE_CONNECTING) { 2607 dout("try_read connecting\n"); 2608 ret = read_partial_banner(con); 2609 if (ret <= 0) 2610 goto out; 2611 ret = process_banner(con); 2612 if (ret < 0) 2613 goto out; 2614 2615 con->state = CON_STATE_NEGOTIATING; 2616 2617 /* 2618 * Received banner is good, exchange connection info. 2619 * Do not reset out_kvec, as sending our banner raced 2620 * with receiving peer banner after connect completed. 2621 */ 2622 ret = prepare_write_connect(con); 2623 if (ret < 0) 2624 goto out; 2625 prepare_read_connect(con); 2626 2627 /* Send connection info before awaiting response */ 2628 goto out; 2629 } 2630 2631 if (con->state == CON_STATE_NEGOTIATING) { 2632 dout("try_read negotiating\n"); 2633 ret = read_partial_connect(con); 2634 if (ret <= 0) 2635 goto out; 2636 ret = process_connect(con); 2637 if (ret < 0) 2638 goto out; 2639 goto more; 2640 } 2641 2642 WARN_ON(con->state != CON_STATE_OPEN); 2643 2644 if (con->in_base_pos < 0) { 2645 /* 2646 * skipping + discarding content. 2647 * 2648 * FIXME: there must be a better way to do this! 2649 */ 2650 static char buf[SKIP_BUF_SIZE]; 2651 int skip = min((int) sizeof (buf), -con->in_base_pos); 2652 2653 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); 2654 ret = ceph_tcp_recvmsg(con->sock, buf, skip); 2655 if (ret <= 0) 2656 goto out; 2657 con->in_base_pos += ret; 2658 if (con->in_base_pos) 2659 goto more; 2660 } 2661 if (con->in_tag == CEPH_MSGR_TAG_READY) { 2662 /* 2663 * what's next? 2664 */ 2665 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); 2666 if (ret <= 0) 2667 goto out; 2668 dout("try_read got tag %d\n", (int)con->in_tag); 2669 switch (con->in_tag) { 2670 case CEPH_MSGR_TAG_MSG: 2671 prepare_read_message(con); 2672 break; 2673 case CEPH_MSGR_TAG_ACK: 2674 prepare_read_ack(con); 2675 break; 2676 case CEPH_MSGR_TAG_CLOSE: 2677 con_close_socket(con); 2678 con->state = CON_STATE_CLOSED; 2679 goto out; 2680 default: 2681 goto bad_tag; 2682 } 2683 } 2684 if (con->in_tag == CEPH_MSGR_TAG_MSG) { 2685 ret = read_partial_message(con); 2686 if (ret <= 0) { 2687 switch (ret) { 2688 case -EBADMSG: 2689 con->error_msg = "bad crc"; 2690 /* fall through */ 2691 case -EBADE: 2692 ret = -EIO; 2693 break; 2694 case -EIO: 2695 con->error_msg = "io error"; 2696 break; 2697 } 2698 goto out; 2699 } 2700 if (con->in_tag == CEPH_MSGR_TAG_READY) 2701 goto more; 2702 process_message(con); 2703 if (con->state == CON_STATE_OPEN) 2704 prepare_read_tag(con); 2705 goto more; 2706 } 2707 if (con->in_tag == CEPH_MSGR_TAG_ACK || 2708 con->in_tag == CEPH_MSGR_TAG_SEQ) { 2709 /* 2710 * the final handshake seq exchange is semantically 2711 * equivalent to an ACK 2712 */ 2713 ret = read_partial_ack(con); 2714 if (ret <= 0) 2715 goto out; 2716 process_ack(con); 2717 goto more; 2718 } 2719 2720out: 2721 dout("try_read done on %p ret %d\n", con, ret); 2722 return ret; 2723 2724bad_tag: 2725 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); 2726 con->error_msg = "protocol error, garbage tag"; 2727 ret = -1; 2728 goto out; 2729} 2730 2731 2732/* 2733 * Atomically queue work on a connection after the specified delay. 2734 * Bump @con reference to avoid races with connection teardown. 2735 * Returns 0 if work was queued, or an error code otherwise. 2736 */ 2737static int queue_con_delay(struct ceph_connection *con, unsigned long delay) 2738{ 2739 if (!con->ops->get(con)) { 2740 dout("%s %p ref count 0\n", __func__, con); 2741 return -ENOENT; 2742 } 2743 2744 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) { 2745 dout("%s %p - already queued\n", __func__, con); 2746 con->ops->put(con); 2747 return -EBUSY; 2748 } 2749 2750 dout("%s %p %lu\n", __func__, con, delay); 2751 return 0; 2752} 2753 2754static void queue_con(struct ceph_connection *con) 2755{ 2756 (void) queue_con_delay(con, 0); 2757} 2758 2759static void cancel_con(struct ceph_connection *con) 2760{ 2761 if (cancel_delayed_work(&con->work)) { 2762 dout("%s %p\n", __func__, con); 2763 con->ops->put(con); 2764 } 2765} 2766 2767static bool con_sock_closed(struct ceph_connection *con) 2768{ 2769 if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED)) 2770 return false; 2771 2772#define CASE(x) \ 2773 case CON_STATE_ ## x: \ 2774 con->error_msg = "socket closed (con state " #x ")"; \ 2775 break; 2776 2777 switch (con->state) { 2778 CASE(CLOSED); 2779 CASE(PREOPEN); 2780 CASE(CONNECTING); 2781 CASE(NEGOTIATING); 2782 CASE(OPEN); 2783 CASE(STANDBY); 2784 default: 2785 pr_warn("%s con %p unrecognized state %lu\n", 2786 __func__, con, con->state); 2787 con->error_msg = "unrecognized con state"; 2788 BUG(); 2789 break; 2790 } 2791#undef CASE 2792 2793 return true; 2794} 2795 2796static bool con_backoff(struct ceph_connection *con) 2797{ 2798 int ret; 2799 2800 if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF)) 2801 return false; 2802 2803 ret = queue_con_delay(con, round_jiffies_relative(con->delay)); 2804 if (ret) { 2805 dout("%s: con %p FAILED to back off %lu\n", __func__, 2806 con, con->delay); 2807 BUG_ON(ret == -ENOENT); 2808 con_flag_set(con, CON_FLAG_BACKOFF); 2809 } 2810 2811 return true; 2812} 2813 2814/* Finish fault handling; con->mutex must *not* be held here */ 2815 2816static void con_fault_finish(struct ceph_connection *con) 2817{ 2818 /* 2819 * in case we faulted due to authentication, invalidate our 2820 * current tickets so that we can get new ones. 2821 */ 2822 if (con->auth_retry && con->ops->invalidate_authorizer) { 2823 dout("calling invalidate_authorizer()\n"); 2824 con->ops->invalidate_authorizer(con); 2825 } 2826 2827 if (con->ops->fault) 2828 con->ops->fault(con); 2829} 2830 2831/* 2832 * Do some work on a connection. Drop a connection ref when we're done. 2833 */ 2834static void con_work(struct work_struct *work) 2835{ 2836 struct ceph_connection *con = container_of(work, struct ceph_connection, 2837 work.work); 2838 bool fault; 2839 2840 mutex_lock(&con->mutex); 2841 while (true) { 2842 int ret; 2843 2844 if ((fault = con_sock_closed(con))) { 2845 dout("%s: con %p SOCK_CLOSED\n", __func__, con); 2846 break; 2847 } 2848 if (con_backoff(con)) { 2849 dout("%s: con %p BACKOFF\n", __func__, con); 2850 break; 2851 } 2852 if (con->state == CON_STATE_STANDBY) { 2853 dout("%s: con %p STANDBY\n", __func__, con); 2854 break; 2855 } 2856 if (con->state == CON_STATE_CLOSED) { 2857 dout("%s: con %p CLOSED\n", __func__, con); 2858 BUG_ON(con->sock); 2859 break; 2860 } 2861 if (con->state == CON_STATE_PREOPEN) { 2862 dout("%s: con %p PREOPEN\n", __func__, con); 2863 BUG_ON(con->sock); 2864 } 2865 2866 ret = try_read(con); 2867 if (ret < 0) { 2868 if (ret == -EAGAIN) 2869 continue; 2870 if (!con->error_msg) 2871 con->error_msg = "socket error on read"; 2872 fault = true; 2873 break; 2874 } 2875 2876 ret = try_write(con); 2877 if (ret < 0) { 2878 if (ret == -EAGAIN) 2879 continue; 2880 if (!con->error_msg) 2881 con->error_msg = "socket error on write"; 2882 fault = true; 2883 } 2884 2885 break; /* If we make it to here, we're done */ 2886 } 2887 if (fault) 2888 con_fault(con); 2889 mutex_unlock(&con->mutex); 2890 2891 if (fault) 2892 con_fault_finish(con); 2893 2894 con->ops->put(con); 2895} 2896 2897/* 2898 * Generic error/fault handler. A retry mechanism is used with 2899 * exponential backoff 2900 */ 2901static void con_fault(struct ceph_connection *con) 2902{ 2903 dout("fault %p state %lu to peer %s\n", 2904 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); 2905 2906 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2907 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); 2908 con->error_msg = NULL; 2909 2910 WARN_ON(con->state != CON_STATE_CONNECTING && 2911 con->state != CON_STATE_NEGOTIATING && 2912 con->state != CON_STATE_OPEN); 2913 2914 con_close_socket(con); 2915 2916 if (con_flag_test(con, CON_FLAG_LOSSYTX)) { 2917 dout("fault on LOSSYTX channel, marking CLOSED\n"); 2918 con->state = CON_STATE_CLOSED; 2919 return; 2920 } 2921 2922 if (con->in_msg) { 2923 BUG_ON(con->in_msg->con != con); 2924 con->in_msg->con = NULL; 2925 ceph_msg_put(con->in_msg); 2926 con->in_msg = NULL; 2927 con->ops->put(con); 2928 } 2929 2930 /* Requeue anything that hasn't been acked */ 2931 list_splice_init(&con->out_sent, &con->out_queue); 2932 2933 /* If there are no messages queued or keepalive pending, place 2934 * the connection in a STANDBY state */ 2935 if (list_empty(&con->out_queue) && 2936 !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) { 2937 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); 2938 con_flag_clear(con, CON_FLAG_WRITE_PENDING); 2939 con->state = CON_STATE_STANDBY; 2940 } else { 2941 /* retry after a delay. */ 2942 con->state = CON_STATE_PREOPEN; 2943 if (con->delay == 0) 2944 con->delay = BASE_DELAY_INTERVAL; 2945 else if (con->delay < MAX_DELAY_INTERVAL) 2946 con->delay *= 2; 2947 con_flag_set(con, CON_FLAG_BACKOFF); 2948 queue_con(con); 2949 } 2950} 2951 2952 2953 2954/* 2955 * initialize a new messenger instance 2956 */ 2957void ceph_messenger_init(struct ceph_messenger *msgr, 2958 struct ceph_entity_addr *myaddr, 2959 u64 supported_features, 2960 u64 required_features, 2961 bool nocrc, 2962 bool tcp_nodelay) 2963{ 2964 msgr->supported_features = supported_features; 2965 msgr->required_features = required_features; 2966 2967 spin_lock_init(&msgr->global_seq_lock); 2968 2969 if (myaddr) 2970 msgr->inst.addr = *myaddr; 2971 2972 /* select a random nonce */ 2973 msgr->inst.addr.type = 0; 2974 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); 2975 encode_my_addr(msgr); 2976 msgr->nocrc = nocrc; 2977 msgr->tcp_nodelay = tcp_nodelay; 2978 2979 atomic_set(&msgr->stopping, 0); 2980 2981 dout("%s %p\n", __func__, msgr); 2982} 2983EXPORT_SYMBOL(ceph_messenger_init); 2984 2985static void clear_standby(struct ceph_connection *con) 2986{ 2987 /* come back from STANDBY? */ 2988 if (con->state == CON_STATE_STANDBY) { 2989 dout("clear_standby %p and ++connect_seq\n", con); 2990 con->state = CON_STATE_PREOPEN; 2991 con->connect_seq++; 2992 WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING)); 2993 WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)); 2994 } 2995} 2996 2997/* 2998 * Queue up an outgoing message on the given connection. 2999 */ 3000void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) 3001{ 3002 /* set src+dst */ 3003 msg->hdr.src = con->msgr->inst.name; 3004 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 3005 msg->needs_out_seq = true; 3006 3007 mutex_lock(&con->mutex); 3008 3009 if (con->state == CON_STATE_CLOSED) { 3010 dout("con_send %p closed, dropping %p\n", con, msg); 3011 ceph_msg_put(msg); 3012 mutex_unlock(&con->mutex); 3013 return; 3014 } 3015 3016 BUG_ON(msg->con != NULL); 3017 msg->con = con->ops->get(con); 3018 BUG_ON(msg->con == NULL); 3019 3020 BUG_ON(!list_empty(&msg->list_head)); 3021 list_add_tail(&msg->list_head, &con->out_queue); 3022 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, 3023 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), 3024 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 3025 le32_to_cpu(msg->hdr.front_len), 3026 le32_to_cpu(msg->hdr.middle_len), 3027 le32_to_cpu(msg->hdr.data_len)); 3028 3029 clear_standby(con); 3030 mutex_unlock(&con->mutex); 3031 3032 /* if there wasn't anything waiting to send before, queue 3033 * new work */ 3034 if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) 3035 queue_con(con); 3036} 3037EXPORT_SYMBOL(ceph_con_send); 3038 3039/* 3040 * Revoke a message that was previously queued for send 3041 */ 3042void ceph_msg_revoke(struct ceph_msg *msg) 3043{ 3044 struct ceph_connection *con = msg->con; 3045 3046 if (!con) 3047 return; /* Message not in our possession */ 3048 3049 mutex_lock(&con->mutex); 3050 if (!list_empty(&msg->list_head)) { 3051 dout("%s %p msg %p - was on queue\n", __func__, con, msg); 3052 list_del_init(&msg->list_head); 3053 BUG_ON(msg->con == NULL); 3054 msg->con->ops->put(msg->con); 3055 msg->con = NULL; 3056 msg->hdr.seq = 0; 3057 3058 ceph_msg_put(msg); 3059 } 3060 if (con->out_msg == msg) { 3061 BUG_ON(con->out_skip); 3062 /* footer */ 3063 if (con->out_msg_done) { 3064 con->out_skip += con_out_kvec_skip(con); 3065 } else { 3066 BUG_ON(!msg->data_length); 3067 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) 3068 con->out_skip += sizeof(msg->footer); 3069 else 3070 con->out_skip += sizeof(msg->old_footer); 3071 } 3072 /* data, middle, front */ 3073 if (msg->data_length) 3074 con->out_skip += msg->cursor.total_resid; 3075 if (msg->middle) 3076 con->out_skip += con_out_kvec_skip(con); 3077 con->out_skip += con_out_kvec_skip(con); 3078 3079 dout("%s %p msg %p - was sending, will write %d skip %d\n", 3080 __func__, con, msg, con->out_kvec_bytes, con->out_skip); 3081 msg->hdr.seq = 0; 3082 con->out_msg = NULL; 3083 ceph_msg_put(msg); 3084 } 3085 3086 mutex_unlock(&con->mutex); 3087} 3088 3089/* 3090 * Revoke a message that we may be reading data into 3091 */ 3092void ceph_msg_revoke_incoming(struct ceph_msg *msg) 3093{ 3094 struct ceph_connection *con; 3095 3096 BUG_ON(msg == NULL); 3097 if (!msg->con) { 3098 dout("%s msg %p null con\n", __func__, msg); 3099 3100 return; /* Message not in our possession */ 3101 } 3102 3103 con = msg->con; 3104 mutex_lock(&con->mutex); 3105 if (con->in_msg == msg) { 3106 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); 3107 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); 3108 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); 3109 3110 /* skip rest of message */ 3111 dout("%s %p msg %p revoked\n", __func__, con, msg); 3112 con->in_base_pos = con->in_base_pos - 3113 sizeof(struct ceph_msg_header) - 3114 front_len - 3115 middle_len - 3116 data_len - 3117 sizeof(struct ceph_msg_footer); 3118 ceph_msg_put(con->in_msg); 3119 con->in_msg = NULL; 3120 con->in_tag = CEPH_MSGR_TAG_READY; 3121 con->in_seq++; 3122 } else { 3123 dout("%s %p in_msg %p msg %p no-op\n", 3124 __func__, con, con->in_msg, msg); 3125 } 3126 mutex_unlock(&con->mutex); 3127} 3128 3129/* 3130 * Queue a keepalive byte to ensure the tcp connection is alive. 3131 */ 3132void ceph_con_keepalive(struct ceph_connection *con) 3133{ 3134 dout("con_keepalive %p\n", con); 3135 mutex_lock(&con->mutex); 3136 clear_standby(con); 3137 mutex_unlock(&con->mutex); 3138 if (con_flag_test_and_set(con, CON_FLAG_KEEPALIVE_PENDING) == 0 && 3139 con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) 3140 queue_con(con); 3141} 3142EXPORT_SYMBOL(ceph_con_keepalive); 3143 3144static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type) 3145{ 3146 struct ceph_msg_data *data; 3147 3148 if (WARN_ON(!ceph_msg_data_type_valid(type))) 3149 return NULL; 3150 3151 data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS); 3152 if (data) 3153 data->type = type; 3154 INIT_LIST_HEAD(&data->links); 3155 3156 return data; 3157} 3158 3159static void ceph_msg_data_destroy(struct ceph_msg_data *data) 3160{ 3161 if (!data) 3162 return; 3163 3164 WARN_ON(!list_empty(&data->links)); 3165 if (data->type == CEPH_MSG_DATA_PAGELIST) 3166 ceph_pagelist_release(data->pagelist); 3167 kmem_cache_free(ceph_msg_data_cache, data); 3168} 3169 3170void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, 3171 size_t length, size_t alignment) 3172{ 3173 struct ceph_msg_data *data; 3174 3175 BUG_ON(!pages); 3176 BUG_ON(!length); 3177 3178 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES); 3179 BUG_ON(!data); 3180 data->pages = pages; 3181 data->length = length; 3182 data->alignment = alignment & ~PAGE_MASK; 3183 3184 list_add_tail(&data->links, &msg->data); 3185 msg->data_length += length; 3186} 3187EXPORT_SYMBOL(ceph_msg_data_add_pages); 3188 3189void ceph_msg_data_add_pagelist(struct ceph_msg *msg, 3190 struct ceph_pagelist *pagelist) 3191{ 3192 struct ceph_msg_data *data; 3193 3194 BUG_ON(!pagelist); 3195 BUG_ON(!pagelist->length); 3196 3197 data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST); 3198 BUG_ON(!data); 3199 data->pagelist = pagelist; 3200 3201 list_add_tail(&data->links, &msg->data); 3202 msg->data_length += pagelist->length; 3203} 3204EXPORT_SYMBOL(ceph_msg_data_add_pagelist); 3205 3206#ifdef CONFIG_BLOCK 3207void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio, 3208 size_t length) 3209{ 3210 struct ceph_msg_data *data; 3211 3212 BUG_ON(!bio); 3213 3214 data = ceph_msg_data_create(CEPH_MSG_DATA_BIO); 3215 BUG_ON(!data); 3216 data->bio = bio; 3217 data->bio_length = length; 3218 3219 list_add_tail(&data->links, &msg->data); 3220 msg->data_length += length; 3221} 3222EXPORT_SYMBOL(ceph_msg_data_add_bio); 3223#endif /* CONFIG_BLOCK */ 3224 3225/* 3226 * construct a new message with given type, size 3227 * the new msg has a ref count of 1. 3228 */ 3229struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, 3230 bool can_fail) 3231{ 3232 struct ceph_msg *m; 3233 3234 m = kmem_cache_zalloc(ceph_msg_cache, flags); 3235 if (m == NULL) 3236 goto out; 3237 3238 m->hdr.type = cpu_to_le16(type); 3239 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); 3240 m->hdr.front_len = cpu_to_le32(front_len); 3241 3242 INIT_LIST_HEAD(&m->list_head); 3243 kref_init(&m->kref); 3244 INIT_LIST_HEAD(&m->data); 3245 3246 /* front */ 3247 if (front_len) { 3248 m->front.iov_base = ceph_kvmalloc(front_len, flags); 3249 if (m->front.iov_base == NULL) { 3250 dout("ceph_msg_new can't allocate %d bytes\n", 3251 front_len); 3252 goto out2; 3253 } 3254 } else { 3255 m->front.iov_base = NULL; 3256 } 3257 m->front_alloc_len = m->front.iov_len = front_len; 3258 3259 dout("ceph_msg_new %p front %d\n", m, front_len); 3260 return m; 3261 3262out2: 3263 ceph_msg_put(m); 3264out: 3265 if (!can_fail) { 3266 pr_err("msg_new can't create type %d front %d\n", type, 3267 front_len); 3268 WARN_ON(1); 3269 } else { 3270 dout("msg_new can't create type %d front %d\n", type, 3271 front_len); 3272 } 3273 return NULL; 3274} 3275EXPORT_SYMBOL(ceph_msg_new); 3276 3277/* 3278 * Allocate "middle" portion of a message, if it is needed and wasn't 3279 * allocated by alloc_msg. This allows us to read a small fixed-size 3280 * per-type header in the front and then gracefully fail (i.e., 3281 * propagate the error to the caller based on info in the front) when 3282 * the middle is too large. 3283 */ 3284static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) 3285{ 3286 int type = le16_to_cpu(msg->hdr.type); 3287 int middle_len = le32_to_cpu(msg->hdr.middle_len); 3288 3289 dout("alloc_middle %p type %d %s middle_len %d\n", msg, type, 3290 ceph_msg_type_name(type), middle_len); 3291 BUG_ON(!middle_len); 3292 BUG_ON(msg->middle); 3293 3294 msg->middle = ceph_buffer_new(middle_len, GFP_NOFS); 3295 if (!msg->middle) 3296 return -ENOMEM; 3297 return 0; 3298} 3299 3300/* 3301 * Allocate a message for receiving an incoming message on a 3302 * connection, and save the result in con->in_msg. Uses the 3303 * connection's private alloc_msg op if available. 3304 * 3305 * Returns 0 on success, or a negative error code. 3306 * 3307 * On success, if we set *skip = 1: 3308 * - the next message should be skipped and ignored. 3309 * - con->in_msg == NULL 3310 * or if we set *skip = 0: 3311 * - con->in_msg is non-null. 3312 * On error (ENOMEM, EAGAIN, ...), 3313 * - con->in_msg == NULL 3314 */ 3315static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) 3316{ 3317 struct ceph_msg_header *hdr = &con->in_hdr; 3318 int middle_len = le32_to_cpu(hdr->middle_len); 3319 struct ceph_msg *msg; 3320 int ret = 0; 3321 3322 BUG_ON(con->in_msg != NULL); 3323 BUG_ON(!con->ops->alloc_msg); 3324 3325 mutex_unlock(&con->mutex); 3326 msg = con->ops->alloc_msg(con, hdr, skip); 3327 mutex_lock(&con->mutex); 3328 if (con->state != CON_STATE_OPEN) { 3329 if (msg) 3330 ceph_msg_put(msg); 3331 return -EAGAIN; 3332 } 3333 if (msg) { 3334 BUG_ON(*skip); 3335 con->in_msg = msg; 3336 con->in_msg->con = con->ops->get(con); 3337 BUG_ON(con->in_msg->con == NULL); 3338 } else { 3339 /* 3340 * Null message pointer means either we should skip 3341 * this message or we couldn't allocate memory. The 3342 * former is not an error. 3343 */ 3344 if (*skip) 3345 return 0; 3346 3347 con->error_msg = "error allocating memory for incoming message"; 3348 return -ENOMEM; 3349 } 3350 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 3351 3352 if (middle_len && !con->in_msg->middle) { 3353 ret = ceph_alloc_middle(con, con->in_msg); 3354 if (ret < 0) { 3355 ceph_msg_put(con->in_msg); 3356 con->in_msg = NULL; 3357 } 3358 } 3359 3360 return ret; 3361} 3362 3363 3364/* 3365 * Free a generically kmalloc'd message. 3366 */ 3367static void ceph_msg_free(struct ceph_msg *m) 3368{ 3369 dout("%s %p\n", __func__, m); 3370 kvfree(m->front.iov_base); 3371 kmem_cache_free(ceph_msg_cache, m); 3372} 3373 3374static void ceph_msg_release(struct kref *kref) 3375{ 3376 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 3377 LIST_HEAD(data); 3378 struct list_head *links; 3379 struct list_head *next; 3380 3381 dout("%s %p\n", __func__, m); 3382 WARN_ON(!list_empty(&m->list_head)); 3383 3384 /* drop middle, data, if any */ 3385 if (m->middle) { 3386 ceph_buffer_put(m->middle); 3387 m->middle = NULL; 3388 } 3389 3390 list_splice_init(&m->data, &data); 3391 list_for_each_safe(links, next, &data) { 3392 struct ceph_msg_data *data; 3393 3394 data = list_entry(links, struct ceph_msg_data, links); 3395 list_del_init(links); 3396 ceph_msg_data_destroy(data); 3397 } 3398 m->data_length = 0; 3399 3400 if (m->pool) 3401 ceph_msgpool_put(m->pool, m); 3402 else 3403 ceph_msg_free(m); 3404} 3405 3406struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) 3407{ 3408 dout("%s %p (was %d)\n", __func__, msg, 3409 atomic_read(&msg->kref.refcount)); 3410 kref_get(&msg->kref); 3411 return msg; 3412} 3413EXPORT_SYMBOL(ceph_msg_get); 3414 3415void ceph_msg_put(struct ceph_msg *msg) 3416{ 3417 dout("%s %p (was %d)\n", __func__, msg, 3418 atomic_read(&msg->kref.refcount)); 3419 kref_put(&msg->kref, ceph_msg_release); 3420} 3421EXPORT_SYMBOL(ceph_msg_put); 3422 3423void ceph_msg_dump(struct ceph_msg *msg) 3424{ 3425 pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg, 3426 msg->front_alloc_len, msg->data_length); 3427 print_hex_dump(KERN_DEBUG, "header: ", 3428 DUMP_PREFIX_OFFSET, 16, 1, 3429 &msg->hdr, sizeof(msg->hdr), true); 3430 print_hex_dump(KERN_DEBUG, " front: ", 3431 DUMP_PREFIX_OFFSET, 16, 1, 3432 msg->front.iov_base, msg->front.iov_len, true); 3433 if (msg->middle) 3434 print_hex_dump(KERN_DEBUG, "middle: ", 3435 DUMP_PREFIX_OFFSET, 16, 1, 3436 msg->middle->vec.iov_base, 3437 msg->middle->vec.iov_len, true); 3438 print_hex_dump(KERN_DEBUG, "footer: ", 3439 DUMP_PREFIX_OFFSET, 16, 1, 3440 &msg->footer, sizeof(msg->footer), true); 3441} 3442EXPORT_SYMBOL(ceph_msg_dump); 3443