Lines Matching refs:con

119 static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag)  in con_flag_clear()  argument
123 clear_bit(con_flag, &con->flags); in con_flag_clear()
126 static void con_flag_set(struct ceph_connection *con, unsigned long con_flag) in con_flag_set() argument
130 set_bit(con_flag, &con->flags); in con_flag_set()
133 static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag) in con_flag_test() argument
137 return test_bit(con_flag, &con->flags); in con_flag_test()
140 static bool con_flag_test_and_clear(struct ceph_connection *con, in con_flag_test_and_clear() argument
145 return test_and_clear_bit(con_flag, &con->flags); in con_flag_test_and_clear()
148 static bool con_flag_test_and_set(struct ceph_connection *con, in con_flag_test_and_set() argument
153 return test_and_set_bit(con_flag, &con->flags); in con_flag_test_and_set()
176 static void queue_con(struct ceph_connection *con);
177 static void cancel_con(struct ceph_connection *con);
179 static void con_fault(struct ceph_connection *con);
326 static void con_sock_state_init(struct ceph_connection *con) in con_sock_state_init() argument
330 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); in con_sock_state_init()
333 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_init()
337 static void con_sock_state_connecting(struct ceph_connection *con) in con_sock_state_connecting() argument
341 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); in con_sock_state_connecting()
344 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_connecting()
348 static void con_sock_state_connected(struct ceph_connection *con) in con_sock_state_connected() argument
352 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); in con_sock_state_connected()
355 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_connected()
359 static void con_sock_state_closing(struct ceph_connection *con) in con_sock_state_closing() argument
363 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING); in con_sock_state_closing()
368 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_closing()
372 static void con_sock_state_closed(struct ceph_connection *con) in con_sock_state_closed() argument
376 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); in con_sock_state_closed()
382 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_closed()
393 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_data_ready() local
394 if (atomic_read(&con->msgr->stopping)) { in ceph_sock_data_ready()
400 con, con->state); in ceph_sock_data_ready()
401 queue_con(con); in ceph_sock_data_ready()
408 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_write_space() local
417 if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) { in ceph_sock_write_space()
419 dout("%s %p queueing write work\n", __func__, con); in ceph_sock_write_space()
421 queue_con(con); in ceph_sock_write_space()
424 dout("%s %p nothing to write\n", __func__, con); in ceph_sock_write_space()
431 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_state_change() local
434 con, con->state, sk->sk_state); in ceph_sock_state_change()
441 con_sock_state_closing(con); in ceph_sock_state_change()
442 con_flag_set(con, CON_FLAG_SOCK_CLOSED); in ceph_sock_state_change()
443 queue_con(con); in ceph_sock_state_change()
447 con_sock_state_connected(con); in ceph_sock_state_change()
448 queue_con(con); in ceph_sock_state_change()
459 struct ceph_connection *con) in set_sock_callbacks() argument
462 sk->sk_user_data = con; in set_sock_callbacks()
476 static int ceph_tcp_connect(struct ceph_connection *con) in ceph_tcp_connect() argument
478 struct sockaddr_storage *paddr = &con->peer_addr.in_addr; in ceph_tcp_connect()
482 BUG_ON(con->sock); in ceph_tcp_connect()
483 ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM, in ceph_tcp_connect()
493 set_sock_callbacks(sock, con); in ceph_tcp_connect()
495 dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); in ceph_tcp_connect()
497 con_sock_state_connecting(con); in ceph_tcp_connect()
502 ceph_pr_addr(&con->peer_addr.in_addr), in ceph_tcp_connect()
506 ceph_pr_addr(&con->peer_addr.in_addr), ret); in ceph_tcp_connect()
511 if (con->msgr->tcp_nodelay) { in ceph_tcp_connect()
521 con->sock = sock; in ceph_tcp_connect()
609 static int con_close_socket(struct ceph_connection *con) in con_close_socket() argument
613 dout("con_close_socket on %p sock %p\n", con, con->sock); in con_close_socket()
614 if (con->sock) { in con_close_socket()
615 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); in con_close_socket()
616 sock_release(con->sock); in con_close_socket()
617 con->sock = NULL; in con_close_socket()
626 con_flag_clear(con, CON_FLAG_SOCK_CLOSED); in con_close_socket()
628 con_sock_state_closed(con); in con_close_socket()
639 BUG_ON(msg->con == NULL); in ceph_msg_remove()
640 msg->con->ops->put(msg->con); in ceph_msg_remove()
641 msg->con = NULL; in ceph_msg_remove()
654 static void reset_connection(struct ceph_connection *con) in reset_connection() argument
658 dout("reset_connection %p\n", con); in reset_connection()
659 ceph_msg_remove_list(&con->out_queue); in reset_connection()
660 ceph_msg_remove_list(&con->out_sent); in reset_connection()
662 if (con->in_msg) { in reset_connection()
663 BUG_ON(con->in_msg->con != con); in reset_connection()
664 con->in_msg->con = NULL; in reset_connection()
665 ceph_msg_put(con->in_msg); in reset_connection()
666 con->in_msg = NULL; in reset_connection()
667 con->ops->put(con); in reset_connection()
670 con->connect_seq = 0; in reset_connection()
671 con->out_seq = 0; in reset_connection()
672 if (con->out_msg) { in reset_connection()
673 ceph_msg_put(con->out_msg); in reset_connection()
674 con->out_msg = NULL; in reset_connection()
676 con->in_seq = 0; in reset_connection()
677 con->in_seq_acked = 0; in reset_connection()
679 con->out_skip = 0; in reset_connection()
685 void ceph_con_close(struct ceph_connection *con) in ceph_con_close() argument
687 mutex_lock(&con->mutex); in ceph_con_close()
688 dout("con_close %p peer %s\n", con, in ceph_con_close()
689 ceph_pr_addr(&con->peer_addr.in_addr)); in ceph_con_close()
690 con->state = CON_STATE_CLOSED; in ceph_con_close()
692 con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */ in ceph_con_close()
693 con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING); in ceph_con_close()
694 con_flag_clear(con, CON_FLAG_WRITE_PENDING); in ceph_con_close()
695 con_flag_clear(con, CON_FLAG_BACKOFF); in ceph_con_close()
697 reset_connection(con); in ceph_con_close()
698 con->peer_global_seq = 0; in ceph_con_close()
699 cancel_con(con); in ceph_con_close()
700 con_close_socket(con); in ceph_con_close()
701 mutex_unlock(&con->mutex); in ceph_con_close()
708 void ceph_con_open(struct ceph_connection *con, in ceph_con_open() argument
712 mutex_lock(&con->mutex); in ceph_con_open()
713 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); in ceph_con_open()
715 WARN_ON(con->state != CON_STATE_CLOSED); in ceph_con_open()
716 con->state = CON_STATE_PREOPEN; in ceph_con_open()
718 con->peer_name.type = (__u8) entity_type; in ceph_con_open()
719 con->peer_name.num = cpu_to_le64(entity_num); in ceph_con_open()
721 memcpy(&con->peer_addr, addr, sizeof(*addr)); in ceph_con_open()
722 con->delay = 0; /* reset backoff memory */ in ceph_con_open()
723 mutex_unlock(&con->mutex); in ceph_con_open()
724 queue_con(con); in ceph_con_open()
731 bool ceph_con_opened(struct ceph_connection *con) in ceph_con_opened() argument
733 return con->connect_seq > 0; in ceph_con_opened()
739 void ceph_con_init(struct ceph_connection *con, void *private, in ceph_con_init() argument
743 dout("con_init %p\n", con); in ceph_con_init()
744 memset(con, 0, sizeof(*con)); in ceph_con_init()
745 con->private = private; in ceph_con_init()
746 con->ops = ops; in ceph_con_init()
747 con->msgr = msgr; in ceph_con_init()
749 con_sock_state_init(con); in ceph_con_init()
751 mutex_init(&con->mutex); in ceph_con_init()
752 INIT_LIST_HEAD(&con->out_queue); in ceph_con_init()
753 INIT_LIST_HEAD(&con->out_sent); in ceph_con_init()
754 INIT_DELAYED_WORK(&con->work, con_work); in ceph_con_init()
756 con->state = CON_STATE_CLOSED; in ceph_con_init()
777 static void con_out_kvec_reset(struct ceph_connection *con) in con_out_kvec_reset() argument
779 BUG_ON(con->out_skip); in con_out_kvec_reset()
781 con->out_kvec_left = 0; in con_out_kvec_reset()
782 con->out_kvec_bytes = 0; in con_out_kvec_reset()
783 con->out_kvec_cur = &con->out_kvec[0]; in con_out_kvec_reset()
786 static void con_out_kvec_add(struct ceph_connection *con, in con_out_kvec_add() argument
789 int index = con->out_kvec_left; in con_out_kvec_add()
791 BUG_ON(con->out_skip); in con_out_kvec_add()
792 BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); in con_out_kvec_add()
794 con->out_kvec[index].iov_len = size; in con_out_kvec_add()
795 con->out_kvec[index].iov_base = data; in con_out_kvec_add()
796 con->out_kvec_left++; in con_out_kvec_add()
797 con->out_kvec_bytes += size; in con_out_kvec_add()
805 static int con_out_kvec_skip(struct ceph_connection *con) in con_out_kvec_skip() argument
807 int off = con->out_kvec_cur - con->out_kvec; in con_out_kvec_skip()
810 if (con->out_kvec_bytes > 0) { in con_out_kvec_skip()
811 skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; in con_out_kvec_skip()
812 BUG_ON(con->out_kvec_bytes < skip); in con_out_kvec_skip()
813 BUG_ON(!con->out_kvec_left); in con_out_kvec_skip()
814 con->out_kvec_bytes -= skip; in con_out_kvec_skip()
815 con->out_kvec_left--; in con_out_kvec_skip()
1206 static size_t sizeof_footer(struct ceph_connection *con) in sizeof_footer() argument
1208 return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? in sizeof_footer()
1227 static void prepare_write_message_footer(struct ceph_connection *con) in prepare_write_message_footer() argument
1229 struct ceph_msg *m = con->out_msg; in prepare_write_message_footer()
1230 int v = con->out_kvec_left; in prepare_write_message_footer()
1234 dout("prepare_write_message_footer %p\n", con); in prepare_write_message_footer()
1235 con->out_kvec[v].iov_base = &m->footer; in prepare_write_message_footer()
1236 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { in prepare_write_message_footer()
1237 if (con->ops->sign_message) in prepare_write_message_footer()
1238 con->ops->sign_message(con, m); in prepare_write_message_footer()
1241 con->out_kvec[v].iov_len = sizeof(m->footer); in prepare_write_message_footer()
1242 con->out_kvec_bytes += sizeof(m->footer); in prepare_write_message_footer()
1245 con->out_kvec[v].iov_len = sizeof(m->old_footer); in prepare_write_message_footer()
1246 con->out_kvec_bytes += sizeof(m->old_footer); in prepare_write_message_footer()
1248 con->out_kvec_left++; in prepare_write_message_footer()
1249 con->out_more = m->more_to_follow; in prepare_write_message_footer()
1250 con->out_msg_done = true; in prepare_write_message_footer()
1256 static void prepare_write_message(struct ceph_connection *con) in prepare_write_message() argument
1261 con_out_kvec_reset(con); in prepare_write_message()
1262 con->out_msg_done = false; in prepare_write_message()
1266 if (con->in_seq > con->in_seq_acked) { in prepare_write_message()
1267 con->in_seq_acked = con->in_seq; in prepare_write_message()
1268 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); in prepare_write_message()
1269 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_message()
1270 con_out_kvec_add(con, sizeof (con->out_temp_ack), in prepare_write_message()
1271 &con->out_temp_ack); in prepare_write_message()
1274 BUG_ON(list_empty(&con->out_queue)); in prepare_write_message()
1275 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); in prepare_write_message()
1276 con->out_msg = m; in prepare_write_message()
1277 BUG_ON(m->con != con); in prepare_write_message()
1281 list_move_tail(&m->list_head, &con->out_sent); in prepare_write_message()
1288 m->hdr.seq = cpu_to_le64(++con->out_seq); in prepare_write_message()
1294 m, con->out_seq, le16_to_cpu(m->hdr.type), in prepare_write_message()
1300 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); in prepare_write_message()
1301 con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); in prepare_write_message()
1302 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); in prepare_write_message()
1305 con_out_kvec_add(con, m->middle->vec.iov_len, in prepare_write_message()
1310 con->out_msg->hdr.crc = cpu_to_le32(crc); in prepare_write_message()
1311 memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); in prepare_write_message()
1315 con->out_msg->footer.front_crc = cpu_to_le32(crc); in prepare_write_message()
1319 con->out_msg->footer.middle_crc = cpu_to_le32(crc); in prepare_write_message()
1321 con->out_msg->footer.middle_crc = 0; in prepare_write_message()
1323 le32_to_cpu(con->out_msg->footer.front_crc), in prepare_write_message()
1324 le32_to_cpu(con->out_msg->footer.middle_crc)); in prepare_write_message()
1325 con->out_msg->footer.flags = 0; in prepare_write_message()
1328 con->out_msg->footer.data_crc = 0; in prepare_write_message()
1330 prepare_message_data(con->out_msg, m->data_length); in prepare_write_message()
1331 con->out_more = 1; /* data + footer will follow */ in prepare_write_message()
1334 prepare_write_message_footer(con); in prepare_write_message()
1337 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_message()
1343 static void prepare_write_ack(struct ceph_connection *con) in prepare_write_ack() argument
1345 dout("prepare_write_ack %p %llu -> %llu\n", con, in prepare_write_ack()
1346 con->in_seq_acked, con->in_seq); in prepare_write_ack()
1347 con->in_seq_acked = con->in_seq; in prepare_write_ack()
1349 con_out_kvec_reset(con); in prepare_write_ack()
1351 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); in prepare_write_ack()
1353 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_ack()
1354 con_out_kvec_add(con, sizeof (con->out_temp_ack), in prepare_write_ack()
1355 &con->out_temp_ack); in prepare_write_ack()
1357 con->out_more = 1; /* more will follow.. eventually.. */ in prepare_write_ack()
1358 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_ack()
1364 static void prepare_write_seq(struct ceph_connection *con) in prepare_write_seq() argument
1366 dout("prepare_write_seq %p %llu -> %llu\n", con, in prepare_write_seq()
1367 con->in_seq_acked, con->in_seq); in prepare_write_seq()
1368 con->in_seq_acked = con->in_seq; in prepare_write_seq()
1370 con_out_kvec_reset(con); in prepare_write_seq()
1372 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_seq()
1373 con_out_kvec_add(con, sizeof (con->out_temp_ack), in prepare_write_seq()
1374 &con->out_temp_ack); in prepare_write_seq()
1376 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_seq()
1382 static void prepare_write_keepalive(struct ceph_connection *con) in prepare_write_keepalive() argument
1384 dout("prepare_write_keepalive %p\n", con); in prepare_write_keepalive()
1385 con_out_kvec_reset(con); in prepare_write_keepalive()
1386 con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); in prepare_write_keepalive()
1387 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_keepalive()
1394 static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con, in get_connect_authorizer() argument
1399 if (!con->ops->get_authorizer) { in get_connect_authorizer()
1400 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; in get_connect_authorizer()
1401 con->out_connect.authorizer_len = 0; in get_connect_authorizer()
1406 mutex_unlock(&con->mutex); in get_connect_authorizer()
1407 auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); in get_connect_authorizer()
1408 mutex_lock(&con->mutex); in get_connect_authorizer()
1412 if (con->state != CON_STATE_NEGOTIATING) in get_connect_authorizer()
1415 con->auth_reply_buf = auth->authorizer_reply_buf; in get_connect_authorizer()
1416 con->auth_reply_buf_len = auth->authorizer_reply_buf_len; in get_connect_authorizer()
1423 static void prepare_write_banner(struct ceph_connection *con) in prepare_write_banner() argument
1425 con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); in prepare_write_banner()
1426 con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), in prepare_write_banner()
1427 &con->msgr->my_enc_addr); in prepare_write_banner()
1429 con->out_more = 0; in prepare_write_banner()
1430 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_banner()
1433 static int prepare_write_connect(struct ceph_connection *con) in prepare_write_connect() argument
1435 unsigned int global_seq = get_global_seq(con->msgr, 0); in prepare_write_connect()
1440 switch (con->peer_name.type) { in prepare_write_connect()
1454 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, in prepare_write_connect()
1455 con->connect_seq, global_seq, proto); in prepare_write_connect()
1457 con->out_connect.features = cpu_to_le64(con->msgr->supported_features); in prepare_write_connect()
1458 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); in prepare_write_connect()
1459 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); in prepare_write_connect()
1460 con->out_connect.global_seq = cpu_to_le32(global_seq); in prepare_write_connect()
1461 con->out_connect.protocol_version = cpu_to_le32(proto); in prepare_write_connect()
1462 con->out_connect.flags = 0; in prepare_write_connect()
1465 auth = get_connect_authorizer(con, &auth_proto); in prepare_write_connect()
1469 con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); in prepare_write_connect()
1470 con->out_connect.authorizer_len = auth ? in prepare_write_connect()
1473 con_out_kvec_add(con, sizeof (con->out_connect), in prepare_write_connect()
1474 &con->out_connect); in prepare_write_connect()
1476 con_out_kvec_add(con, auth->authorizer_buf_len, in prepare_write_connect()
1479 con->out_more = 0; in prepare_write_connect()
1480 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_connect()
1491 static int write_partial_kvec(struct ceph_connection *con) in write_partial_kvec() argument
1495 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); in write_partial_kvec()
1496 while (con->out_kvec_bytes > 0) { in write_partial_kvec()
1497 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, in write_partial_kvec()
1498 con->out_kvec_left, con->out_kvec_bytes, in write_partial_kvec()
1499 con->out_more); in write_partial_kvec()
1502 con->out_kvec_bytes -= ret; in write_partial_kvec()
1503 if (con->out_kvec_bytes == 0) in write_partial_kvec()
1507 while (ret >= con->out_kvec_cur->iov_len) { in write_partial_kvec()
1508 BUG_ON(!con->out_kvec_left); in write_partial_kvec()
1509 ret -= con->out_kvec_cur->iov_len; in write_partial_kvec()
1510 con->out_kvec_cur++; in write_partial_kvec()
1511 con->out_kvec_left--; in write_partial_kvec()
1515 con->out_kvec_cur->iov_len -= ret; in write_partial_kvec()
1516 con->out_kvec_cur->iov_base += ret; in write_partial_kvec()
1519 con->out_kvec_left = 0; in write_partial_kvec()
1522 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, in write_partial_kvec()
1523 con->out_kvec_bytes, con->out_kvec_left, ret); in write_partial_kvec()
1547 static int write_partial_message_data(struct ceph_connection *con) in write_partial_message_data() argument
1549 struct ceph_msg *msg = con->out_msg; in write_partial_message_data()
1551 bool do_datacrc = !con->msgr->nocrc; in write_partial_message_data()
1554 dout("%s %p msg %p\n", __func__, con, msg); in write_partial_message_data()
1578 ret = ceph_tcp_sendpage(con->sock, page, page_offset, in write_partial_message_data()
1591 dout("%s %p msg %p done\n", __func__, con, msg); in write_partial_message_data()
1598 con_out_kvec_reset(con); in write_partial_message_data()
1599 prepare_write_message_footer(con); in write_partial_message_data()
1607 static int write_partial_skip(struct ceph_connection *con) in write_partial_skip() argument
1611 dout("%s %p %d left\n", __func__, con, con->out_skip); in write_partial_skip()
1612 while (con->out_skip > 0) { in write_partial_skip()
1613 size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE); in write_partial_skip()
1615 ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, true); in write_partial_skip()
1618 con->out_skip -= ret; in write_partial_skip()
1628 static void prepare_read_banner(struct ceph_connection *con) in prepare_read_banner() argument
1630 dout("prepare_read_banner %p\n", con); in prepare_read_banner()
1631 con->in_base_pos = 0; in prepare_read_banner()
1634 static void prepare_read_connect(struct ceph_connection *con) in prepare_read_connect() argument
1636 dout("prepare_read_connect %p\n", con); in prepare_read_connect()
1637 con->in_base_pos = 0; in prepare_read_connect()
1640 static void prepare_read_ack(struct ceph_connection *con) in prepare_read_ack() argument
1642 dout("prepare_read_ack %p\n", con); in prepare_read_ack()
1643 con->in_base_pos = 0; in prepare_read_ack()
1646 static void prepare_read_seq(struct ceph_connection *con) in prepare_read_seq() argument
1648 dout("prepare_read_seq %p\n", con); in prepare_read_seq()
1649 con->in_base_pos = 0; in prepare_read_seq()
1650 con->in_tag = CEPH_MSGR_TAG_SEQ; in prepare_read_seq()
1653 static void prepare_read_tag(struct ceph_connection *con) in prepare_read_tag() argument
1655 dout("prepare_read_tag %p\n", con); in prepare_read_tag()
1656 con->in_base_pos = 0; in prepare_read_tag()
1657 con->in_tag = CEPH_MSGR_TAG_READY; in prepare_read_tag()
1663 static int prepare_read_message(struct ceph_connection *con) in prepare_read_message() argument
1665 dout("prepare_read_message %p\n", con); in prepare_read_message()
1666 BUG_ON(con->in_msg != NULL); in prepare_read_message()
1667 con->in_base_pos = 0; in prepare_read_message()
1668 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; in prepare_read_message()
1673 static int read_partial(struct ceph_connection *con, in read_partial() argument
1676 while (con->in_base_pos < end) { in read_partial()
1677 int left = end - con->in_base_pos; in read_partial()
1679 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); in read_partial()
1682 con->in_base_pos += ret; in read_partial()
1691 static int read_partial_banner(struct ceph_connection *con) in read_partial_banner() argument
1697 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); in read_partial_banner()
1702 ret = read_partial(con, end, size, con->in_banner); in read_partial_banner()
1706 size = sizeof (con->actual_peer_addr); in read_partial_banner()
1708 ret = read_partial(con, end, size, &con->actual_peer_addr); in read_partial_banner()
1712 size = sizeof (con->peer_addr_for_me); in read_partial_banner()
1714 ret = read_partial(con, end, size, &con->peer_addr_for_me); in read_partial_banner()
1722 static int read_partial_connect(struct ceph_connection *con) in read_partial_connect() argument
1728 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); in read_partial_connect()
1730 size = sizeof (con->in_reply); in read_partial_connect()
1732 ret = read_partial(con, end, size, &con->in_reply); in read_partial_connect()
1736 size = le32_to_cpu(con->in_reply.authorizer_len); in read_partial_connect()
1738 ret = read_partial(con, end, size, con->auth_reply_buf); in read_partial_connect()
1743 con, (int)con->in_reply.tag, in read_partial_connect()
1744 le32_to_cpu(con->in_reply.connect_seq), in read_partial_connect()
1745 le32_to_cpu(con->in_reply.global_seq)); in read_partial_connect()
1754 static int verify_hello(struct ceph_connection *con) in verify_hello() argument
1756 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { in verify_hello()
1758 ceph_pr_addr(&con->peer_addr.in_addr)); in verify_hello()
1759 con->error_msg = "protocol error, bad banner"; in verify_hello()
1976 static int process_banner(struct ceph_connection *con) in process_banner() argument
1978 dout("process_banner on %p\n", con); in process_banner()
1980 if (verify_hello(con) < 0) in process_banner()
1983 ceph_decode_addr(&con->actual_peer_addr); in process_banner()
1984 ceph_decode_addr(&con->peer_addr_for_me); in process_banner()
1991 if (memcmp(&con->peer_addr, &con->actual_peer_addr, in process_banner()
1992 sizeof(con->peer_addr)) != 0 && in process_banner()
1993 !(addr_is_blank(&con->actual_peer_addr.in_addr) && in process_banner()
1994 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { in process_banner()
1996 ceph_pr_addr(&con->peer_addr.in_addr), in process_banner()
1997 (int)le32_to_cpu(con->peer_addr.nonce), in process_banner()
1998 ceph_pr_addr(&con->actual_peer_addr.in_addr), in process_banner()
1999 (int)le32_to_cpu(con->actual_peer_addr.nonce)); in process_banner()
2000 con->error_msg = "wrong peer at address"; in process_banner()
2007 if (addr_is_blank(&con->msgr->inst.addr.in_addr)) { in process_banner()
2008 int port = addr_port(&con->msgr->inst.addr.in_addr); in process_banner()
2010 memcpy(&con->msgr->inst.addr.in_addr, in process_banner()
2011 &con->peer_addr_for_me.in_addr, in process_banner()
2012 sizeof(con->peer_addr_for_me.in_addr)); in process_banner()
2013 addr_set_port(&con->msgr->inst.addr.in_addr, port); in process_banner()
2014 encode_my_addr(con->msgr); in process_banner()
2016 ceph_pr_addr(&con->msgr->inst.addr.in_addr)); in process_banner()
2022 static int process_connect(struct ceph_connection *con) in process_connect() argument
2024 u64 sup_feat = con->msgr->supported_features; in process_connect()
2025 u64 req_feat = con->msgr->required_features; in process_connect()
2027 le64_to_cpu(con->in_reply.features)); in process_connect()
2030 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); in process_connect()
2032 switch (con->in_reply.tag) { in process_connect()
2036 ENTITY_NAME(con->peer_name), in process_connect()
2037 ceph_pr_addr(&con->peer_addr.in_addr), in process_connect()
2039 con->error_msg = "missing required protocol features"; in process_connect()
2040 reset_connection(con); in process_connect()
2046 ENTITY_NAME(con->peer_name), in process_connect()
2047 ceph_pr_addr(&con->peer_addr.in_addr), in process_connect()
2048 le32_to_cpu(con->out_connect.protocol_version), in process_connect()
2049 le32_to_cpu(con->in_reply.protocol_version)); in process_connect()
2050 con->error_msg = "protocol version mismatch"; in process_connect()
2051 reset_connection(con); in process_connect()
2055 con->auth_retry++; in process_connect()
2056 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, in process_connect()
2057 con->auth_retry); in process_connect()
2058 if (con->auth_retry == 2) { in process_connect()
2059 con->error_msg = "connect authorization failure"; in process_connect()
2062 con_out_kvec_reset(con); in process_connect()
2063 ret = prepare_write_connect(con); in process_connect()
2066 prepare_read_connect(con); in process_connect()
2078 le32_to_cpu(con->in_reply.connect_seq)); in process_connect()
2080 ENTITY_NAME(con->peer_name), in process_connect()
2081 ceph_pr_addr(&con->peer_addr.in_addr)); in process_connect()
2082 reset_connection(con); in process_connect()
2083 con_out_kvec_reset(con); in process_connect()
2084 ret = prepare_write_connect(con); in process_connect()
2087 prepare_read_connect(con); in process_connect()
2090 mutex_unlock(&con->mutex); in process_connect()
2091 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); in process_connect()
2092 if (con->ops->peer_reset) in process_connect()
2093 con->ops->peer_reset(con); in process_connect()
2094 mutex_lock(&con->mutex); in process_connect()
2095 if (con->state != CON_STATE_NEGOTIATING) in process_connect()
2105 le32_to_cpu(con->out_connect.connect_seq), in process_connect()
2106 le32_to_cpu(con->in_reply.connect_seq)); in process_connect()
2107 con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); in process_connect()
2108 con_out_kvec_reset(con); in process_connect()
2109 ret = prepare_write_connect(con); in process_connect()
2112 prepare_read_connect(con); in process_connect()
2121 con->peer_global_seq, in process_connect()
2122 le32_to_cpu(con->in_reply.global_seq)); in process_connect()
2123 get_global_seq(con->msgr, in process_connect()
2124 le32_to_cpu(con->in_reply.global_seq)); in process_connect()
2125 con_out_kvec_reset(con); in process_connect()
2126 ret = prepare_write_connect(con); in process_connect()
2129 prepare_read_connect(con); in process_connect()
2137 ENTITY_NAME(con->peer_name), in process_connect()
2138 ceph_pr_addr(&con->peer_addr.in_addr), in process_connect()
2140 con->error_msg = "missing required protocol features"; in process_connect()
2141 reset_connection(con); in process_connect()
2145 WARN_ON(con->state != CON_STATE_NEGOTIATING); in process_connect()
2146 con->state = CON_STATE_OPEN; in process_connect()
2147 con->auth_retry = 0; /* we authenticated; clear flag */ in process_connect()
2148 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); in process_connect()
2149 con->connect_seq++; in process_connect()
2150 con->peer_features = server_feat; in process_connect()
2152 con->peer_global_seq, in process_connect()
2153 le32_to_cpu(con->in_reply.connect_seq), in process_connect()
2154 con->connect_seq); in process_connect()
2155 WARN_ON(con->connect_seq != in process_connect()
2156 le32_to_cpu(con->in_reply.connect_seq)); in process_connect()
2158 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) in process_connect()
2159 con_flag_set(con, CON_FLAG_LOSSYTX); in process_connect()
2161 con->delay = 0; /* reset backoff memory */ in process_connect()
2163 if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) { in process_connect()
2164 prepare_write_seq(con); in process_connect()
2165 prepare_read_seq(con); in process_connect()
2167 prepare_read_tag(con); in process_connect()
2178 con->error_msg = "protocol error, got WAIT as client"; in process_connect()
2182 con->error_msg = "protocol error, garbage tag during connect"; in process_connect()
2192 static int read_partial_ack(struct ceph_connection *con) in read_partial_ack() argument
2194 int size = sizeof (con->in_temp_ack); in read_partial_ack()
2197 return read_partial(con, end, size, &con->in_temp_ack); in read_partial_ack()
2203 static void process_ack(struct ceph_connection *con) in process_ack() argument
2206 u64 ack = le64_to_cpu(con->in_temp_ack); in process_ack()
2209 while (!list_empty(&con->out_sent)) { in process_ack()
2210 m = list_first_entry(&con->out_sent, struct ceph_msg, in process_ack()
2220 prepare_read_tag(con); in process_ack()
2224 static int read_partial_message_section(struct ceph_connection *con, in read_partial_message_section() argument
2235 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + in read_partial_message_section()
2247 static int read_partial_msg_data(struct ceph_connection *con) in read_partial_msg_data() argument
2249 struct ceph_msg *msg = con->in_msg; in read_partial_msg_data()
2251 const bool do_datacrc = !con->msgr->nocrc; in read_partial_msg_data()
2263 crc = con->in_data_crc; in read_partial_msg_data()
2267 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); in read_partial_msg_data()
2270 con->in_data_crc = crc; in read_partial_msg_data()
2280 con->in_data_crc = crc; in read_partial_msg_data()
2288 static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
2290 static int read_partial_message(struct ceph_connection *con) in read_partial_message() argument
2292 struct ceph_msg *m = con->in_msg; in read_partial_message()
2297 bool do_datacrc = !con->msgr->nocrc; in read_partial_message()
2298 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); in read_partial_message()
2302 dout("read_partial_message con %p msg %p\n", con, m); in read_partial_message()
2305 size = sizeof (con->in_hdr); in read_partial_message()
2307 ret = read_partial(con, end, size, &con->in_hdr); in read_partial_message()
2311 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); in read_partial_message()
2312 if (cpu_to_le32(crc) != con->in_hdr.crc) { in read_partial_message()
2314 crc, con->in_hdr.crc); in read_partial_message()
2318 front_len = le32_to_cpu(con->in_hdr.front_len); in read_partial_message()
2321 middle_len = le32_to_cpu(con->in_hdr.middle_len); in read_partial_message()
2324 data_len = le32_to_cpu(con->in_hdr.data_len); in read_partial_message()
2329 seq = le64_to_cpu(con->in_hdr.seq); in read_partial_message()
2330 if ((s64)seq - (s64)con->in_seq < 1) { in read_partial_message()
2332 ENTITY_NAME(con->peer_name), in read_partial_message()
2333 ceph_pr_addr(&con->peer_addr.in_addr), in read_partial_message()
2334 seq, con->in_seq + 1); in read_partial_message()
2335 con->in_base_pos = -front_len - middle_len - data_len - in read_partial_message()
2336 sizeof_footer(con); in read_partial_message()
2337 con->in_tag = CEPH_MSGR_TAG_READY; in read_partial_message()
2339 } else if ((s64)seq - (s64)con->in_seq > 1) { in read_partial_message()
2341 seq, con->in_seq + 1); in read_partial_message()
2342 con->error_msg = "bad message sequence # for incoming message"; in read_partial_message()
2347 if (!con->in_msg) { in read_partial_message()
2350 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, in read_partial_message()
2352 ret = ceph_con_in_msg_alloc(con, &skip); in read_partial_message()
2356 BUG_ON(!con->in_msg ^ skip); in read_partial_message()
2357 if (con->in_msg && data_len > con->in_msg->data_length) { in read_partial_message()
2359 __func__, data_len, con->in_msg->data_length); in read_partial_message()
2360 ceph_msg_put(con->in_msg); in read_partial_message()
2361 con->in_msg = NULL; in read_partial_message()
2367 con->in_base_pos = -front_len - middle_len - data_len - in read_partial_message()
2368 sizeof_footer(con); in read_partial_message()
2369 con->in_tag = CEPH_MSGR_TAG_READY; in read_partial_message()
2370 con->in_seq++; in read_partial_message()
2374 BUG_ON(!con->in_msg); in read_partial_message()
2375 BUG_ON(con->in_msg->con != con); in read_partial_message()
2376 m = con->in_msg; in read_partial_message()
2384 prepare_message_data(con->in_msg, data_len); in read_partial_message()
2388 ret = read_partial_message_section(con, &m->front, front_len, in read_partial_message()
2389 &con->in_front_crc); in read_partial_message()
2395 ret = read_partial_message_section(con, &m->middle->vec, in read_partial_message()
2397 &con->in_middle_crc); in read_partial_message()
2404 ret = read_partial_msg_data(con); in read_partial_message()
2416 ret = read_partial(con, end, size, &m->footer); in read_partial_message()
2430 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { in read_partial_message()
2432 m, con->in_front_crc, m->footer.front_crc); in read_partial_message()
2435 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { in read_partial_message()
2437 m, con->in_middle_crc, m->footer.middle_crc); in read_partial_message()
2442 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { in read_partial_message()
2444 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); in read_partial_message()
2448 if (need_sign && con->ops->check_message_signature && in read_partial_message()
2449 con->ops->check_message_signature(con, m)) { in read_partial_message()
2462 static void process_message(struct ceph_connection *con) in process_message() argument
2466 BUG_ON(con->in_msg->con != con); in process_message()
2467 con->in_msg->con = NULL; in process_message()
2468 msg = con->in_msg; in process_message()
2469 con->in_msg = NULL; in process_message()
2470 con->ops->put(con); in process_message()
2473 if (con->peer_name.type == 0) in process_message()
2474 con->peer_name = msg->hdr.src; in process_message()
2476 con->in_seq++; in process_message()
2477 mutex_unlock(&con->mutex); in process_message()
2486 con->in_front_crc, con->in_middle_crc, con->in_data_crc); in process_message()
2487 con->ops->dispatch(con, msg); in process_message()
2489 mutex_lock(&con->mutex); in process_message()
2497 static int try_write(struct ceph_connection *con) in try_write() argument
2501 dout("try_write start %p state %lu\n", con, con->state); in try_write()
2504 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); in try_write()
2507 if (con->state == CON_STATE_PREOPEN) { in try_write()
2508 BUG_ON(con->sock); in try_write()
2509 con->state = CON_STATE_CONNECTING; in try_write()
2511 con_out_kvec_reset(con); in try_write()
2512 prepare_write_banner(con); in try_write()
2513 prepare_read_banner(con); in try_write()
2515 BUG_ON(con->in_msg); in try_write()
2516 con->in_tag = CEPH_MSGR_TAG_READY; in try_write()
2518 con, con->state); in try_write()
2519 ret = ceph_tcp_connect(con); in try_write()
2521 con->error_msg = "connect error"; in try_write()
2528 if (con->out_kvec_left) { in try_write()
2529 ret = write_partial_kvec(con); in try_write()
2533 if (con->out_skip) { in try_write()
2534 ret = write_partial_skip(con); in try_write()
2540 if (con->out_msg) { in try_write()
2541 if (con->out_msg_done) { in try_write()
2542 ceph_msg_put(con->out_msg); in try_write()
2543 con->out_msg = NULL; /* we're done with this one */ in try_write()
2547 ret = write_partial_message_data(con); in try_write()
2560 if (con->state == CON_STATE_OPEN) { in try_write()
2562 if (!list_empty(&con->out_queue)) { in try_write()
2563 prepare_write_message(con); in try_write()
2566 if (con->in_seq > con->in_seq_acked) { in try_write()
2567 prepare_write_ack(con); in try_write()
2570 if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { in try_write()
2571 prepare_write_keepalive(con); in try_write()
2577 con_flag_clear(con, CON_FLAG_WRITE_PENDING); in try_write()
2581 dout("try_write done on %p ret %d\n", con, ret); in try_write()
2590 static int try_read(struct ceph_connection *con) in try_read() argument
2595 dout("try_read start on %p state %lu\n", con, con->state); in try_read()
2596 if (con->state != CON_STATE_CONNECTING && in try_read()
2597 con->state != CON_STATE_NEGOTIATING && in try_read()
2598 con->state != CON_STATE_OPEN) in try_read()
2601 BUG_ON(!con->sock); in try_read()
2603 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, in try_read()
2604 con->in_base_pos); in try_read()
2606 if (con->state == CON_STATE_CONNECTING) { in try_read()
2608 ret = read_partial_banner(con); in try_read()
2611 ret = process_banner(con); in try_read()
2615 con->state = CON_STATE_NEGOTIATING; in try_read()
2622 ret = prepare_write_connect(con); in try_read()
2625 prepare_read_connect(con); in try_read()
2631 if (con->state == CON_STATE_NEGOTIATING) { in try_read()
2633 ret = read_partial_connect(con); in try_read()
2636 ret = process_connect(con); in try_read()
2642 WARN_ON(con->state != CON_STATE_OPEN); in try_read()
2644 if (con->in_base_pos < 0) { in try_read()
2651 int skip = min((int) sizeof (buf), -con->in_base_pos); in try_read()
2653 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); in try_read()
2654 ret = ceph_tcp_recvmsg(con->sock, buf, skip); in try_read()
2657 con->in_base_pos += ret; in try_read()
2658 if (con->in_base_pos) in try_read()
2661 if (con->in_tag == CEPH_MSGR_TAG_READY) { in try_read()
2665 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); in try_read()
2668 dout("try_read got tag %d\n", (int)con->in_tag); in try_read()
2669 switch (con->in_tag) { in try_read()
2671 prepare_read_message(con); in try_read()
2674 prepare_read_ack(con); in try_read()
2677 con_close_socket(con); in try_read()
2678 con->state = CON_STATE_CLOSED; in try_read()
2684 if (con->in_tag == CEPH_MSGR_TAG_MSG) { in try_read()
2685 ret = read_partial_message(con); in try_read()
2689 con->error_msg = "bad crc"; in try_read()
2695 con->error_msg = "io error"; in try_read()
2700 if (con->in_tag == CEPH_MSGR_TAG_READY) in try_read()
2702 process_message(con); in try_read()
2703 if (con->state == CON_STATE_OPEN) in try_read()
2704 prepare_read_tag(con); in try_read()
2707 if (con->in_tag == CEPH_MSGR_TAG_ACK || in try_read()
2708 con->in_tag == CEPH_MSGR_TAG_SEQ) { in try_read()
2713 ret = read_partial_ack(con); in try_read()
2716 process_ack(con); in try_read()
2721 dout("try_read done on %p ret %d\n", con, ret); in try_read()
2725 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); in try_read()
2726 con->error_msg = "protocol error, garbage tag"; in try_read()
2737 static int queue_con_delay(struct ceph_connection *con, unsigned long delay) in queue_con_delay() argument
2739 if (!con->ops->get(con)) { in queue_con_delay()
2740 dout("%s %p ref count 0\n", __func__, con); in queue_con_delay()
2744 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) { in queue_con_delay()
2745 dout("%s %p - already queued\n", __func__, con); in queue_con_delay()
2746 con->ops->put(con); in queue_con_delay()
2750 dout("%s %p %lu\n", __func__, con, delay); in queue_con_delay()
2754 static void queue_con(struct ceph_connection *con) in queue_con() argument
2756 (void) queue_con_delay(con, 0); in queue_con()
2759 static void cancel_con(struct ceph_connection *con) in cancel_con() argument
2761 if (cancel_delayed_work(&con->work)) { in cancel_con()
2762 dout("%s %p\n", __func__, con); in cancel_con()
2763 con->ops->put(con); in cancel_con()
2767 static bool con_sock_closed(struct ceph_connection *con) in con_sock_closed() argument
2769 if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED)) in con_sock_closed()
2774 con->error_msg = "socket closed (con state " #x ")"; \ in con_sock_closed()
2777 switch (con->state) { in con_sock_closed()
2786 __func__, con, con->state); in con_sock_closed()
2787 con->error_msg = "unrecognized con state"; in con_sock_closed()
2796 static bool con_backoff(struct ceph_connection *con) in con_backoff() argument
2800 if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF)) in con_backoff()
2803 ret = queue_con_delay(con, round_jiffies_relative(con->delay)); in con_backoff()
2806 con, con->delay); in con_backoff()
2808 con_flag_set(con, CON_FLAG_BACKOFF); in con_backoff()
2816 static void con_fault_finish(struct ceph_connection *con) in con_fault_finish() argument
2822 if (con->auth_retry && con->ops->invalidate_authorizer) { in con_fault_finish()
2824 con->ops->invalidate_authorizer(con); in con_fault_finish()
2827 if (con->ops->fault) in con_fault_finish()
2828 con->ops->fault(con); in con_fault_finish()
2836 struct ceph_connection *con = container_of(work, struct ceph_connection, in con_work() local
2840 mutex_lock(&con->mutex); in con_work()
2844 if ((fault = con_sock_closed(con))) { in con_work()
2845 dout("%s: con %p SOCK_CLOSED\n", __func__, con); in con_work()
2848 if (con_backoff(con)) { in con_work()
2849 dout("%s: con %p BACKOFF\n", __func__, con); in con_work()
2852 if (con->state == CON_STATE_STANDBY) { in con_work()
2853 dout("%s: con %p STANDBY\n", __func__, con); in con_work()
2856 if (con->state == CON_STATE_CLOSED) { in con_work()
2857 dout("%s: con %p CLOSED\n", __func__, con); in con_work()
2858 BUG_ON(con->sock); in con_work()
2861 if (con->state == CON_STATE_PREOPEN) { in con_work()
2862 dout("%s: con %p PREOPEN\n", __func__, con); in con_work()
2863 BUG_ON(con->sock); in con_work()
2866 ret = try_read(con); in con_work()
2870 if (!con->error_msg) in con_work()
2871 con->error_msg = "socket error on read"; in con_work()
2876 ret = try_write(con); in con_work()
2880 if (!con->error_msg) in con_work()
2881 con->error_msg = "socket error on write"; in con_work()
2888 con_fault(con); in con_work()
2889 mutex_unlock(&con->mutex); in con_work()
2892 con_fault_finish(con); in con_work()
2894 con->ops->put(con); in con_work()
2901 static void con_fault(struct ceph_connection *con) in con_fault() argument
2904 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); in con_fault()
2906 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), in con_fault()
2907 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); in con_fault()
2908 con->error_msg = NULL; in con_fault()
2910 WARN_ON(con->state != CON_STATE_CONNECTING && in con_fault()
2911 con->state != CON_STATE_NEGOTIATING && in con_fault()
2912 con->state != CON_STATE_OPEN); in con_fault()
2914 con_close_socket(con); in con_fault()
2916 if (con_flag_test(con, CON_FLAG_LOSSYTX)) { in con_fault()
2918 con->state = CON_STATE_CLOSED; in con_fault()
2922 if (con->in_msg) { in con_fault()
2923 BUG_ON(con->in_msg->con != con); in con_fault()
2924 con->in_msg->con = NULL; in con_fault()
2925 ceph_msg_put(con->in_msg); in con_fault()
2926 con->in_msg = NULL; in con_fault()
2927 con->ops->put(con); in con_fault()
2931 list_splice_init(&con->out_sent, &con->out_queue); in con_fault()
2935 if (list_empty(&con->out_queue) && in con_fault()
2936 !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) { in con_fault()
2937 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); in con_fault()
2938 con_flag_clear(con, CON_FLAG_WRITE_PENDING); in con_fault()
2939 con->state = CON_STATE_STANDBY; in con_fault()
2942 con->state = CON_STATE_PREOPEN; in con_fault()
2943 if (con->delay == 0) in con_fault()
2944 con->delay = BASE_DELAY_INTERVAL; in con_fault()
2945 else if (con->delay < MAX_DELAY_INTERVAL) in con_fault()
2946 con->delay *= 2; in con_fault()
2947 con_flag_set(con, CON_FLAG_BACKOFF); in con_fault()
2948 queue_con(con); in con_fault()
2985 static void clear_standby(struct ceph_connection *con) in clear_standby() argument
2988 if (con->state == CON_STATE_STANDBY) { in clear_standby()
2989 dout("clear_standby %p and ++connect_seq\n", con); in clear_standby()
2990 con->state = CON_STATE_PREOPEN; in clear_standby()
2991 con->connect_seq++; in clear_standby()
2992 WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING)); in clear_standby()
2993 WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)); in clear_standby()
3000 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) in ceph_con_send() argument
3003 msg->hdr.src = con->msgr->inst.name; in ceph_con_send()
3007 mutex_lock(&con->mutex); in ceph_con_send()
3009 if (con->state == CON_STATE_CLOSED) { in ceph_con_send()
3010 dout("con_send %p closed, dropping %p\n", con, msg); in ceph_con_send()
3012 mutex_unlock(&con->mutex); in ceph_con_send()
3016 BUG_ON(msg->con != NULL); in ceph_con_send()
3017 msg->con = con->ops->get(con); in ceph_con_send()
3018 BUG_ON(msg->con == NULL); in ceph_con_send()
3021 list_add_tail(&msg->list_head, &con->out_queue); in ceph_con_send()
3023 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), in ceph_con_send()
3029 clear_standby(con); in ceph_con_send()
3030 mutex_unlock(&con->mutex); in ceph_con_send()
3034 if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) in ceph_con_send()
3035 queue_con(con); in ceph_con_send()
3044 struct ceph_connection *con = msg->con; in ceph_msg_revoke() local
3046 if (!con) in ceph_msg_revoke()
3049 mutex_lock(&con->mutex); in ceph_msg_revoke()
3051 dout("%s %p msg %p - was on queue\n", __func__, con, msg); in ceph_msg_revoke()
3053 BUG_ON(msg->con == NULL); in ceph_msg_revoke()
3054 msg->con->ops->put(msg->con); in ceph_msg_revoke()
3055 msg->con = NULL; in ceph_msg_revoke()
3060 if (con->out_msg == msg) { in ceph_msg_revoke()
3061 BUG_ON(con->out_skip); in ceph_msg_revoke()
3063 if (con->out_msg_done) { in ceph_msg_revoke()
3064 con->out_skip += con_out_kvec_skip(con); in ceph_msg_revoke()
3067 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) in ceph_msg_revoke()
3068 con->out_skip += sizeof(msg->footer); in ceph_msg_revoke()
3070 con->out_skip += sizeof(msg->old_footer); in ceph_msg_revoke()
3074 con->out_skip += msg->cursor.total_resid; in ceph_msg_revoke()
3076 con->out_skip += con_out_kvec_skip(con); in ceph_msg_revoke()
3077 con->out_skip += con_out_kvec_skip(con); in ceph_msg_revoke()
3080 __func__, con, msg, con->out_kvec_bytes, con->out_skip); in ceph_msg_revoke()
3082 con->out_msg = NULL; in ceph_msg_revoke()
3086 mutex_unlock(&con->mutex); in ceph_msg_revoke()
3094 struct ceph_connection *con; in ceph_msg_revoke_incoming() local
3097 if (!msg->con) { in ceph_msg_revoke_incoming()
3103 con = msg->con; in ceph_msg_revoke_incoming()
3104 mutex_lock(&con->mutex); in ceph_msg_revoke_incoming()
3105 if (con->in_msg == msg) { in ceph_msg_revoke_incoming()
3106 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); in ceph_msg_revoke_incoming()
3107 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); in ceph_msg_revoke_incoming()
3108 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); in ceph_msg_revoke_incoming()
3111 dout("%s %p msg %p revoked\n", __func__, con, msg); in ceph_msg_revoke_incoming()
3112 con->in_base_pos = con->in_base_pos - in ceph_msg_revoke_incoming()
3118 ceph_msg_put(con->in_msg); in ceph_msg_revoke_incoming()
3119 con->in_msg = NULL; in ceph_msg_revoke_incoming()
3120 con->in_tag = CEPH_MSGR_TAG_READY; in ceph_msg_revoke_incoming()
3121 con->in_seq++; in ceph_msg_revoke_incoming()
3124 __func__, con, con->in_msg, msg); in ceph_msg_revoke_incoming()
3126 mutex_unlock(&con->mutex); in ceph_msg_revoke_incoming()
3132 void ceph_con_keepalive(struct ceph_connection *con) in ceph_con_keepalive() argument
3134 dout("con_keepalive %p\n", con); in ceph_con_keepalive()
3135 mutex_lock(&con->mutex); in ceph_con_keepalive()
3136 clear_standby(con); in ceph_con_keepalive()
3137 mutex_unlock(&con->mutex); in ceph_con_keepalive()
3138 if (con_flag_test_and_set(con, CON_FLAG_KEEPALIVE_PENDING) == 0 && in ceph_con_keepalive()
3139 con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) in ceph_con_keepalive()
3140 queue_con(con); in ceph_con_keepalive()
3284 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) in ceph_alloc_middle() argument
3315 static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) in ceph_con_in_msg_alloc() argument
3317 struct ceph_msg_header *hdr = &con->in_hdr; in ceph_con_in_msg_alloc()
3322 BUG_ON(con->in_msg != NULL); in ceph_con_in_msg_alloc()
3323 BUG_ON(!con->ops->alloc_msg); in ceph_con_in_msg_alloc()
3325 mutex_unlock(&con->mutex); in ceph_con_in_msg_alloc()
3326 msg = con->ops->alloc_msg(con, hdr, skip); in ceph_con_in_msg_alloc()
3327 mutex_lock(&con->mutex); in ceph_con_in_msg_alloc()
3328 if (con->state != CON_STATE_OPEN) { in ceph_con_in_msg_alloc()
3335 con->in_msg = msg; in ceph_con_in_msg_alloc()
3336 con->in_msg->con = con->ops->get(con); in ceph_con_in_msg_alloc()
3337 BUG_ON(con->in_msg->con == NULL); in ceph_con_in_msg_alloc()
3347 con->error_msg = "error allocating memory for incoming message"; in ceph_con_in_msg_alloc()
3350 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); in ceph_con_in_msg_alloc()
3352 if (middle_len && !con->in_msg->middle) { in ceph_con_in_msg_alloc()
3353 ret = ceph_alloc_middle(con, con->in_msg); in ceph_con_in_msg_alloc()
3355 ceph_msg_put(con->in_msg); in ceph_con_in_msg_alloc()
3356 con->in_msg = NULL; in ceph_con_in_msg_alloc()