Lines Matching refs:con
120 static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag) in con_flag_clear() argument
124 clear_bit(con_flag, &con->flags); in con_flag_clear()
127 static void con_flag_set(struct ceph_connection *con, unsigned long con_flag) in con_flag_set() argument
131 set_bit(con_flag, &con->flags); in con_flag_set()
134 static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag) in con_flag_test() argument
138 return test_bit(con_flag, &con->flags); in con_flag_test()
141 static bool con_flag_test_and_clear(struct ceph_connection *con, in con_flag_test_and_clear() argument
146 return test_and_clear_bit(con_flag, &con->flags); in con_flag_test_and_clear()
149 static bool con_flag_test_and_set(struct ceph_connection *con, in con_flag_test_and_set() argument
154 return test_and_set_bit(con_flag, &con->flags); in con_flag_test_and_set()
178 static void queue_con(struct ceph_connection *con);
179 static void cancel_con(struct ceph_connection *con);
181 static void con_fault(struct ceph_connection *con);
327 static void con_sock_state_init(struct ceph_connection *con) in con_sock_state_init() argument
331 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); in con_sock_state_init()
334 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_init()
338 static void con_sock_state_connecting(struct ceph_connection *con) in con_sock_state_connecting() argument
342 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); in con_sock_state_connecting()
345 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_connecting()
349 static void con_sock_state_connected(struct ceph_connection *con) in con_sock_state_connected() argument
353 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); in con_sock_state_connected()
356 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_connected()
360 static void con_sock_state_closing(struct ceph_connection *con) in con_sock_state_closing() argument
364 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING); in con_sock_state_closing()
369 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_closing()
373 static void con_sock_state_closed(struct ceph_connection *con) in con_sock_state_closed() argument
377 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); in con_sock_state_closed()
383 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_closed()
394 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_data_ready() local
395 if (atomic_read(&con->msgr->stopping)) { in ceph_sock_data_ready()
401 con, con->state); in ceph_sock_data_ready()
402 queue_con(con); in ceph_sock_data_ready()
409 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_write_space() local
418 if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) { in ceph_sock_write_space()
420 dout("%s %p queueing write work\n", __func__, con); in ceph_sock_write_space()
422 queue_con(con); in ceph_sock_write_space()
425 dout("%s %p nothing to write\n", __func__, con); in ceph_sock_write_space()
432 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_state_change() local
435 con, con->state, sk->sk_state); in ceph_sock_state_change()
442 con_sock_state_closing(con); in ceph_sock_state_change()
443 con_flag_set(con, CON_FLAG_SOCK_CLOSED); in ceph_sock_state_change()
444 queue_con(con); in ceph_sock_state_change()
448 con_sock_state_connected(con); in ceph_sock_state_change()
449 queue_con(con); in ceph_sock_state_change()
460 struct ceph_connection *con) in set_sock_callbacks() argument
463 sk->sk_user_data = con; in set_sock_callbacks()
477 static int ceph_tcp_connect(struct ceph_connection *con) in ceph_tcp_connect() argument
479 struct sockaddr_storage *paddr = &con->peer_addr.in_addr; in ceph_tcp_connect()
483 BUG_ON(con->sock); in ceph_tcp_connect()
484 ret = sock_create_kern(read_pnet(&con->msgr->net), paddr->ss_family, in ceph_tcp_connect()
494 set_sock_callbacks(sock, con); in ceph_tcp_connect()
496 dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); in ceph_tcp_connect()
498 con_sock_state_connecting(con); in ceph_tcp_connect()
503 ceph_pr_addr(&con->peer_addr.in_addr), in ceph_tcp_connect()
507 ceph_pr_addr(&con->peer_addr.in_addr), ret); in ceph_tcp_connect()
512 if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) { in ceph_tcp_connect()
522 con->sock = sock; in ceph_tcp_connect()
610 static int con_close_socket(struct ceph_connection *con) in con_close_socket() argument
614 dout("con_close_socket on %p sock %p\n", con, con->sock); in con_close_socket()
615 if (con->sock) { in con_close_socket()
616 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); in con_close_socket()
617 sock_release(con->sock); in con_close_socket()
618 con->sock = NULL; in con_close_socket()
627 con_flag_clear(con, CON_FLAG_SOCK_CLOSED); in con_close_socket()
629 con_sock_state_closed(con); in con_close_socket()
652 static void reset_connection(struct ceph_connection *con) in reset_connection() argument
656 dout("reset_connection %p\n", con); in reset_connection()
657 ceph_msg_remove_list(&con->out_queue); in reset_connection()
658 ceph_msg_remove_list(&con->out_sent); in reset_connection()
660 if (con->in_msg) { in reset_connection()
661 BUG_ON(con->in_msg->con != con); in reset_connection()
662 ceph_msg_put(con->in_msg); in reset_connection()
663 con->in_msg = NULL; in reset_connection()
666 con->connect_seq = 0; in reset_connection()
667 con->out_seq = 0; in reset_connection()
668 if (con->out_msg) { in reset_connection()
669 BUG_ON(con->out_msg->con != con); in reset_connection()
670 ceph_msg_put(con->out_msg); in reset_connection()
671 con->out_msg = NULL; in reset_connection()
673 con->in_seq = 0; in reset_connection()
674 con->in_seq_acked = 0; in reset_connection()
676 con->out_skip = 0; in reset_connection()
682 void ceph_con_close(struct ceph_connection *con) in ceph_con_close() argument
684 mutex_lock(&con->mutex); in ceph_con_close()
685 dout("con_close %p peer %s\n", con, in ceph_con_close()
686 ceph_pr_addr(&con->peer_addr.in_addr)); in ceph_con_close()
687 con->state = CON_STATE_CLOSED; in ceph_con_close()
689 con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */ in ceph_con_close()
690 con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING); in ceph_con_close()
691 con_flag_clear(con, CON_FLAG_WRITE_PENDING); in ceph_con_close()
692 con_flag_clear(con, CON_FLAG_BACKOFF); in ceph_con_close()
694 reset_connection(con); in ceph_con_close()
695 con->peer_global_seq = 0; in ceph_con_close()
696 cancel_con(con); in ceph_con_close()
697 con_close_socket(con); in ceph_con_close()
698 mutex_unlock(&con->mutex); in ceph_con_close()
705 void ceph_con_open(struct ceph_connection *con, in ceph_con_open() argument
709 mutex_lock(&con->mutex); in ceph_con_open()
710 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); in ceph_con_open()
712 WARN_ON(con->state != CON_STATE_CLOSED); in ceph_con_open()
713 con->state = CON_STATE_PREOPEN; in ceph_con_open()
715 con->peer_name.type = (__u8) entity_type; in ceph_con_open()
716 con->peer_name.num = cpu_to_le64(entity_num); in ceph_con_open()
718 memcpy(&con->peer_addr, addr, sizeof(*addr)); in ceph_con_open()
719 con->delay = 0; /* reset backoff memory */ in ceph_con_open()
720 mutex_unlock(&con->mutex); in ceph_con_open()
721 queue_con(con); in ceph_con_open()
728 bool ceph_con_opened(struct ceph_connection *con) in ceph_con_opened() argument
730 return con->connect_seq > 0; in ceph_con_opened()
736 void ceph_con_init(struct ceph_connection *con, void *private, in ceph_con_init() argument
740 dout("con_init %p\n", con); in ceph_con_init()
741 memset(con, 0, sizeof(*con)); in ceph_con_init()
742 con->private = private; in ceph_con_init()
743 con->ops = ops; in ceph_con_init()
744 con->msgr = msgr; in ceph_con_init()
746 con_sock_state_init(con); in ceph_con_init()
748 mutex_init(&con->mutex); in ceph_con_init()
749 INIT_LIST_HEAD(&con->out_queue); in ceph_con_init()
750 INIT_LIST_HEAD(&con->out_sent); in ceph_con_init()
751 INIT_DELAYED_WORK(&con->work, ceph_con_workfn); in ceph_con_init()
753 con->state = CON_STATE_CLOSED; in ceph_con_init()
774 static void con_out_kvec_reset(struct ceph_connection *con) in con_out_kvec_reset() argument
776 BUG_ON(con->out_skip); in con_out_kvec_reset()
778 con->out_kvec_left = 0; in con_out_kvec_reset()
779 con->out_kvec_bytes = 0; in con_out_kvec_reset()
780 con->out_kvec_cur = &con->out_kvec[0]; in con_out_kvec_reset()
783 static void con_out_kvec_add(struct ceph_connection *con, in con_out_kvec_add() argument
786 int index = con->out_kvec_left; in con_out_kvec_add()
788 BUG_ON(con->out_skip); in con_out_kvec_add()
789 BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); in con_out_kvec_add()
791 con->out_kvec[index].iov_len = size; in con_out_kvec_add()
792 con->out_kvec[index].iov_base = data; in con_out_kvec_add()
793 con->out_kvec_left++; in con_out_kvec_add()
794 con->out_kvec_bytes += size; in con_out_kvec_add()
802 static int con_out_kvec_skip(struct ceph_connection *con) in con_out_kvec_skip() argument
804 int off = con->out_kvec_cur - con->out_kvec; in con_out_kvec_skip()
807 if (con->out_kvec_bytes > 0) { in con_out_kvec_skip()
808 skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; in con_out_kvec_skip()
809 BUG_ON(con->out_kvec_bytes < skip); in con_out_kvec_skip()
810 BUG_ON(!con->out_kvec_left); in con_out_kvec_skip()
811 con->out_kvec_bytes -= skip; in con_out_kvec_skip()
812 con->out_kvec_left--; in con_out_kvec_skip()
1203 static size_t sizeof_footer(struct ceph_connection *con) in sizeof_footer() argument
1205 return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? in sizeof_footer()
1224 static void prepare_write_message_footer(struct ceph_connection *con) in prepare_write_message_footer() argument
1226 struct ceph_msg *m = con->out_msg; in prepare_write_message_footer()
1227 int v = con->out_kvec_left; in prepare_write_message_footer()
1231 dout("prepare_write_message_footer %p\n", con); in prepare_write_message_footer()
1232 con->out_kvec[v].iov_base = &m->footer; in prepare_write_message_footer()
1233 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { in prepare_write_message_footer()
1234 if (con->ops->sign_message) in prepare_write_message_footer()
1235 con->ops->sign_message(m); in prepare_write_message_footer()
1238 con->out_kvec[v].iov_len = sizeof(m->footer); in prepare_write_message_footer()
1239 con->out_kvec_bytes += sizeof(m->footer); in prepare_write_message_footer()
1242 con->out_kvec[v].iov_len = sizeof(m->old_footer); in prepare_write_message_footer()
1243 con->out_kvec_bytes += sizeof(m->old_footer); in prepare_write_message_footer()
1245 con->out_kvec_left++; in prepare_write_message_footer()
1246 con->out_more = m->more_to_follow; in prepare_write_message_footer()
1247 con->out_msg_done = true; in prepare_write_message_footer()
1253 static void prepare_write_message(struct ceph_connection *con) in prepare_write_message() argument
1258 con_out_kvec_reset(con); in prepare_write_message()
1259 con->out_msg_done = false; in prepare_write_message()
1263 if (con->in_seq > con->in_seq_acked) { in prepare_write_message()
1264 con->in_seq_acked = con->in_seq; in prepare_write_message()
1265 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); in prepare_write_message()
1266 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_message()
1267 con_out_kvec_add(con, sizeof (con->out_temp_ack), in prepare_write_message()
1268 &con->out_temp_ack); in prepare_write_message()
1271 BUG_ON(list_empty(&con->out_queue)); in prepare_write_message()
1272 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); in prepare_write_message()
1273 con->out_msg = m; in prepare_write_message()
1274 BUG_ON(m->con != con); in prepare_write_message()
1278 list_move_tail(&m->list_head, &con->out_sent); in prepare_write_message()
1285 m->hdr.seq = cpu_to_le64(++con->out_seq); in prepare_write_message()
1291 m, con->out_seq, le16_to_cpu(m->hdr.type), in prepare_write_message()
1297 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); in prepare_write_message()
1298 con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); in prepare_write_message()
1299 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); in prepare_write_message()
1302 con_out_kvec_add(con, m->middle->vec.iov_len, in prepare_write_message()
1307 con->out_msg->hdr.crc = cpu_to_le32(crc); in prepare_write_message()
1308 memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); in prepare_write_message()
1312 con->out_msg->footer.front_crc = cpu_to_le32(crc); in prepare_write_message()
1316 con->out_msg->footer.middle_crc = cpu_to_le32(crc); in prepare_write_message()
1318 con->out_msg->footer.middle_crc = 0; in prepare_write_message()
1320 le32_to_cpu(con->out_msg->footer.front_crc), in prepare_write_message()
1321 le32_to_cpu(con->out_msg->footer.middle_crc)); in prepare_write_message()
1322 con->out_msg->footer.flags = 0; in prepare_write_message()
1325 con->out_msg->footer.data_crc = 0; in prepare_write_message()
1327 prepare_message_data(con->out_msg, m->data_length); in prepare_write_message()
1328 con->out_more = 1; /* data + footer will follow */ in prepare_write_message()
1331 prepare_write_message_footer(con); in prepare_write_message()
1334 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_message()
1340 static void prepare_write_ack(struct ceph_connection *con) in prepare_write_ack() argument
1342 dout("prepare_write_ack %p %llu -> %llu\n", con, in prepare_write_ack()
1343 con->in_seq_acked, con->in_seq); in prepare_write_ack()
1344 con->in_seq_acked = con->in_seq; in prepare_write_ack()
1346 con_out_kvec_reset(con); in prepare_write_ack()
1348 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); in prepare_write_ack()
1350 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_ack()
1351 con_out_kvec_add(con, sizeof (con->out_temp_ack), in prepare_write_ack()
1352 &con->out_temp_ack); in prepare_write_ack()
1354 con->out_more = 1; /* more will follow.. eventually.. */ in prepare_write_ack()
1355 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_ack()
1361 static void prepare_write_seq(struct ceph_connection *con) in prepare_write_seq() argument
1363 dout("prepare_write_seq %p %llu -> %llu\n", con, in prepare_write_seq()
1364 con->in_seq_acked, con->in_seq); in prepare_write_seq()
1365 con->in_seq_acked = con->in_seq; in prepare_write_seq()
1367 con_out_kvec_reset(con); in prepare_write_seq()
1369 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); in prepare_write_seq()
1370 con_out_kvec_add(con, sizeof (con->out_temp_ack), in prepare_write_seq()
1371 &con->out_temp_ack); in prepare_write_seq()
1373 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_seq()
1379 static void prepare_write_keepalive(struct ceph_connection *con) in prepare_write_keepalive() argument
1381 dout("prepare_write_keepalive %p\n", con); in prepare_write_keepalive()
1382 con_out_kvec_reset(con); in prepare_write_keepalive()
1383 if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { in prepare_write_keepalive()
1386 con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); in prepare_write_keepalive()
1387 ceph_encode_timespec(&con->out_temp_keepalive2, &now); in prepare_write_keepalive()
1388 con_out_kvec_add(con, sizeof(con->out_temp_keepalive2), in prepare_write_keepalive()
1389 &con->out_temp_keepalive2); in prepare_write_keepalive()
1391 con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); in prepare_write_keepalive()
1393 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_keepalive()
1400 static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con, in get_connect_authorizer() argument
1405 if (!con->ops->get_authorizer) { in get_connect_authorizer()
1406 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; in get_connect_authorizer()
1407 con->out_connect.authorizer_len = 0; in get_connect_authorizer()
1412 mutex_unlock(&con->mutex); in get_connect_authorizer()
1413 auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); in get_connect_authorizer()
1414 mutex_lock(&con->mutex); in get_connect_authorizer()
1418 if (con->state != CON_STATE_NEGOTIATING) in get_connect_authorizer()
1421 con->auth_reply_buf = auth->authorizer_reply_buf; in get_connect_authorizer()
1422 con->auth_reply_buf_len = auth->authorizer_reply_buf_len; in get_connect_authorizer()
1429 static void prepare_write_banner(struct ceph_connection *con) in prepare_write_banner() argument
1431 con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); in prepare_write_banner()
1432 con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), in prepare_write_banner()
1433 &con->msgr->my_enc_addr); in prepare_write_banner()
1435 con->out_more = 0; in prepare_write_banner()
1436 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_banner()
1439 static int prepare_write_connect(struct ceph_connection *con) in prepare_write_connect() argument
1441 unsigned int global_seq = get_global_seq(con->msgr, 0); in prepare_write_connect()
1446 switch (con->peer_name.type) { in prepare_write_connect()
1460 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, in prepare_write_connect()
1461 con->connect_seq, global_seq, proto); in prepare_write_connect()
1463 con->out_connect.features = in prepare_write_connect()
1464 cpu_to_le64(from_msgr(con->msgr)->supported_features); in prepare_write_connect()
1465 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); in prepare_write_connect()
1466 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); in prepare_write_connect()
1467 con->out_connect.global_seq = cpu_to_le32(global_seq); in prepare_write_connect()
1468 con->out_connect.protocol_version = cpu_to_le32(proto); in prepare_write_connect()
1469 con->out_connect.flags = 0; in prepare_write_connect()
1472 auth = get_connect_authorizer(con, &auth_proto); in prepare_write_connect()
1476 con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); in prepare_write_connect()
1477 con->out_connect.authorizer_len = auth ? in prepare_write_connect()
1480 con_out_kvec_add(con, sizeof (con->out_connect), in prepare_write_connect()
1481 &con->out_connect); in prepare_write_connect()
1483 con_out_kvec_add(con, auth->authorizer_buf_len, in prepare_write_connect()
1486 con->out_more = 0; in prepare_write_connect()
1487 con_flag_set(con, CON_FLAG_WRITE_PENDING); in prepare_write_connect()
1498 static int write_partial_kvec(struct ceph_connection *con) in write_partial_kvec() argument
1502 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); in write_partial_kvec()
1503 while (con->out_kvec_bytes > 0) { in write_partial_kvec()
1504 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, in write_partial_kvec()
1505 con->out_kvec_left, con->out_kvec_bytes, in write_partial_kvec()
1506 con->out_more); in write_partial_kvec()
1509 con->out_kvec_bytes -= ret; in write_partial_kvec()
1510 if (con->out_kvec_bytes == 0) in write_partial_kvec()
1514 while (ret >= con->out_kvec_cur->iov_len) { in write_partial_kvec()
1515 BUG_ON(!con->out_kvec_left); in write_partial_kvec()
1516 ret -= con->out_kvec_cur->iov_len; in write_partial_kvec()
1517 con->out_kvec_cur++; in write_partial_kvec()
1518 con->out_kvec_left--; in write_partial_kvec()
1522 con->out_kvec_cur->iov_len -= ret; in write_partial_kvec()
1523 con->out_kvec_cur->iov_base += ret; in write_partial_kvec()
1526 con->out_kvec_left = 0; in write_partial_kvec()
1529 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, in write_partial_kvec()
1530 con->out_kvec_bytes, con->out_kvec_left, ret); in write_partial_kvec()
1554 static int write_partial_message_data(struct ceph_connection *con) in write_partial_message_data() argument
1556 struct ceph_msg *msg = con->out_msg; in write_partial_message_data()
1558 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in write_partial_message_data()
1561 dout("%s %p msg %p\n", __func__, con, msg); in write_partial_message_data()
1585 ret = ceph_tcp_sendpage(con->sock, page, page_offset, in write_partial_message_data()
1598 dout("%s %p msg %p done\n", __func__, con, msg); in write_partial_message_data()
1605 con_out_kvec_reset(con); in write_partial_message_data()
1606 prepare_write_message_footer(con); in write_partial_message_data()
1614 static int write_partial_skip(struct ceph_connection *con) in write_partial_skip() argument
1618 dout("%s %p %d left\n", __func__, con, con->out_skip); in write_partial_skip()
1619 while (con->out_skip > 0) { in write_partial_skip()
1620 size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE); in write_partial_skip()
1622 ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, true); in write_partial_skip()
1625 con->out_skip -= ret; in write_partial_skip()
1635 static void prepare_read_banner(struct ceph_connection *con) in prepare_read_banner() argument
1637 dout("prepare_read_banner %p\n", con); in prepare_read_banner()
1638 con->in_base_pos = 0; in prepare_read_banner()
1641 static void prepare_read_connect(struct ceph_connection *con) in prepare_read_connect() argument
1643 dout("prepare_read_connect %p\n", con); in prepare_read_connect()
1644 con->in_base_pos = 0; in prepare_read_connect()
1647 static void prepare_read_ack(struct ceph_connection *con) in prepare_read_ack() argument
1649 dout("prepare_read_ack %p\n", con); in prepare_read_ack()
1650 con->in_base_pos = 0; in prepare_read_ack()
1653 static void prepare_read_seq(struct ceph_connection *con) in prepare_read_seq() argument
1655 dout("prepare_read_seq %p\n", con); in prepare_read_seq()
1656 con->in_base_pos = 0; in prepare_read_seq()
1657 con->in_tag = CEPH_MSGR_TAG_SEQ; in prepare_read_seq()
1660 static void prepare_read_tag(struct ceph_connection *con) in prepare_read_tag() argument
1662 dout("prepare_read_tag %p\n", con); in prepare_read_tag()
1663 con->in_base_pos = 0; in prepare_read_tag()
1664 con->in_tag = CEPH_MSGR_TAG_READY; in prepare_read_tag()
1667 static void prepare_read_keepalive_ack(struct ceph_connection *con) in prepare_read_keepalive_ack() argument
1669 dout("prepare_read_keepalive_ack %p\n", con); in prepare_read_keepalive_ack()
1670 con->in_base_pos = 0; in prepare_read_keepalive_ack()
1676 static int prepare_read_message(struct ceph_connection *con) in prepare_read_message() argument
1678 dout("prepare_read_message %p\n", con); in prepare_read_message()
1679 BUG_ON(con->in_msg != NULL); in prepare_read_message()
1680 con->in_base_pos = 0; in prepare_read_message()
1681 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; in prepare_read_message()
1686 static int read_partial(struct ceph_connection *con, in read_partial() argument
1689 while (con->in_base_pos < end) { in read_partial()
1690 int left = end - con->in_base_pos; in read_partial()
1692 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); in read_partial()
1695 con->in_base_pos += ret; in read_partial()
1704 static int read_partial_banner(struct ceph_connection *con) in read_partial_banner() argument
1710 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); in read_partial_banner()
1715 ret = read_partial(con, end, size, con->in_banner); in read_partial_banner()
1719 size = sizeof (con->actual_peer_addr); in read_partial_banner()
1721 ret = read_partial(con, end, size, &con->actual_peer_addr); in read_partial_banner()
1725 size = sizeof (con->peer_addr_for_me); in read_partial_banner()
1727 ret = read_partial(con, end, size, &con->peer_addr_for_me); in read_partial_banner()
1735 static int read_partial_connect(struct ceph_connection *con) in read_partial_connect() argument
1741 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); in read_partial_connect()
1743 size = sizeof (con->in_reply); in read_partial_connect()
1745 ret = read_partial(con, end, size, &con->in_reply); in read_partial_connect()
1749 size = le32_to_cpu(con->in_reply.authorizer_len); in read_partial_connect()
1751 ret = read_partial(con, end, size, con->auth_reply_buf); in read_partial_connect()
1756 con, (int)con->in_reply.tag, in read_partial_connect()
1757 le32_to_cpu(con->in_reply.connect_seq), in read_partial_connect()
1758 le32_to_cpu(con->in_reply.global_seq)); in read_partial_connect()
1767 static int verify_hello(struct ceph_connection *con) in verify_hello() argument
1769 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { in verify_hello()
1771 ceph_pr_addr(&con->peer_addr.in_addr)); in verify_hello()
1772 con->error_msg = "protocol error, bad banner"; in verify_hello()
1989 static int process_banner(struct ceph_connection *con) in process_banner() argument
1991 dout("process_banner on %p\n", con); in process_banner()
1993 if (verify_hello(con) < 0) in process_banner()
1996 ceph_decode_addr(&con->actual_peer_addr); in process_banner()
1997 ceph_decode_addr(&con->peer_addr_for_me); in process_banner()
2004 if (memcmp(&con->peer_addr, &con->actual_peer_addr, in process_banner()
2005 sizeof(con->peer_addr)) != 0 && in process_banner()
2006 !(addr_is_blank(&con->actual_peer_addr.in_addr) && in process_banner()
2007 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { in process_banner()
2009 ceph_pr_addr(&con->peer_addr.in_addr), in process_banner()
2010 (int)le32_to_cpu(con->peer_addr.nonce), in process_banner()
2011 ceph_pr_addr(&con->actual_peer_addr.in_addr), in process_banner()
2012 (int)le32_to_cpu(con->actual_peer_addr.nonce)); in process_banner()
2013 con->error_msg = "wrong peer at address"; in process_banner()
2020 if (addr_is_blank(&con->msgr->inst.addr.in_addr)) { in process_banner()
2021 int port = addr_port(&con->msgr->inst.addr.in_addr); in process_banner()
2023 memcpy(&con->msgr->inst.addr.in_addr, in process_banner()
2024 &con->peer_addr_for_me.in_addr, in process_banner()
2025 sizeof(con->peer_addr_for_me.in_addr)); in process_banner()
2026 addr_set_port(&con->msgr->inst.addr.in_addr, port); in process_banner()
2027 encode_my_addr(con->msgr); in process_banner()
2029 ceph_pr_addr(&con->msgr->inst.addr.in_addr)); in process_banner()
2035 static int process_connect(struct ceph_connection *con) in process_connect() argument
2037 u64 sup_feat = from_msgr(con->msgr)->supported_features; in process_connect()
2038 u64 req_feat = from_msgr(con->msgr)->required_features; in process_connect()
2040 le64_to_cpu(con->in_reply.features)); in process_connect()
2043 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); in process_connect()
2045 switch (con->in_reply.tag) { in process_connect()
2049 ENTITY_NAME(con->peer_name), in process_connect()
2050 ceph_pr_addr(&con->peer_addr.in_addr), in process_connect()
2052 con->error_msg = "missing required protocol features"; in process_connect()
2053 reset_connection(con); in process_connect()
2059 ENTITY_NAME(con->peer_name), in process_connect()
2060 ceph_pr_addr(&con->peer_addr.in_addr), in process_connect()
2061 le32_to_cpu(con->out_connect.protocol_version), in process_connect()
2062 le32_to_cpu(con->in_reply.protocol_version)); in process_connect()
2063 con->error_msg = "protocol version mismatch"; in process_connect()
2064 reset_connection(con); in process_connect()
2068 con->auth_retry++; in process_connect()
2069 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, in process_connect()
2070 con->auth_retry); in process_connect()
2071 if (con->auth_retry == 2) { in process_connect()
2072 con->error_msg = "connect authorization failure"; in process_connect()
2075 con_out_kvec_reset(con); in process_connect()
2076 ret = prepare_write_connect(con); in process_connect()
2079 prepare_read_connect(con); in process_connect()
2091 le32_to_cpu(con->in_reply.connect_seq)); in process_connect()
2093 ENTITY_NAME(con->peer_name), in process_connect()
2094 ceph_pr_addr(&con->peer_addr.in_addr)); in process_connect()
2095 reset_connection(con); in process_connect()
2096 con_out_kvec_reset(con); in process_connect()
2097 ret = prepare_write_connect(con); in process_connect()
2100 prepare_read_connect(con); in process_connect()
2103 mutex_unlock(&con->mutex); in process_connect()
2104 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); in process_connect()
2105 if (con->ops->peer_reset) in process_connect()
2106 con->ops->peer_reset(con); in process_connect()
2107 mutex_lock(&con->mutex); in process_connect()
2108 if (con->state != CON_STATE_NEGOTIATING) in process_connect()
2118 le32_to_cpu(con->out_connect.connect_seq), in process_connect()
2119 le32_to_cpu(con->in_reply.connect_seq)); in process_connect()
2120 con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); in process_connect()
2121 con_out_kvec_reset(con); in process_connect()
2122 ret = prepare_write_connect(con); in process_connect()
2125 prepare_read_connect(con); in process_connect()
2134 con->peer_global_seq, in process_connect()
2135 le32_to_cpu(con->in_reply.global_seq)); in process_connect()
2136 get_global_seq(con->msgr, in process_connect()
2137 le32_to_cpu(con->in_reply.global_seq)); in process_connect()
2138 con_out_kvec_reset(con); in process_connect()
2139 ret = prepare_write_connect(con); in process_connect()
2142 prepare_read_connect(con); in process_connect()
2150 ENTITY_NAME(con->peer_name), in process_connect()
2151 ceph_pr_addr(&con->peer_addr.in_addr), in process_connect()
2153 con->error_msg = "missing required protocol features"; in process_connect()
2154 reset_connection(con); in process_connect()
2158 WARN_ON(con->state != CON_STATE_NEGOTIATING); in process_connect()
2159 con->state = CON_STATE_OPEN; in process_connect()
2160 con->auth_retry = 0; /* we authenticated; clear flag */ in process_connect()
2161 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); in process_connect()
2162 con->connect_seq++; in process_connect()
2163 con->peer_features = server_feat; in process_connect()
2165 con->peer_global_seq, in process_connect()
2166 le32_to_cpu(con->in_reply.connect_seq), in process_connect()
2167 con->connect_seq); in process_connect()
2168 WARN_ON(con->connect_seq != in process_connect()
2169 le32_to_cpu(con->in_reply.connect_seq)); in process_connect()
2171 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) in process_connect()
2172 con_flag_set(con, CON_FLAG_LOSSYTX); in process_connect()
2174 con->delay = 0; /* reset backoff memory */ in process_connect()
2176 if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) { in process_connect()
2177 prepare_write_seq(con); in process_connect()
2178 prepare_read_seq(con); in process_connect()
2180 prepare_read_tag(con); in process_connect()
2191 con->error_msg = "protocol error, got WAIT as client"; in process_connect()
2195 con->error_msg = "protocol error, garbage tag during connect"; in process_connect()
2205 static int read_partial_ack(struct ceph_connection *con) in read_partial_ack() argument
2207 int size = sizeof (con->in_temp_ack); in read_partial_ack()
2210 return read_partial(con, end, size, &con->in_temp_ack); in read_partial_ack()
2216 static void process_ack(struct ceph_connection *con) in process_ack() argument
2219 u64 ack = le64_to_cpu(con->in_temp_ack); in process_ack()
2222 while (!list_empty(&con->out_sent)) { in process_ack()
2223 m = list_first_entry(&con->out_sent, struct ceph_msg, in process_ack()
2233 prepare_read_tag(con); in process_ack()
2237 static int read_partial_message_section(struct ceph_connection *con, in read_partial_message_section() argument
2248 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + in read_partial_message_section()
2260 static int read_partial_msg_data(struct ceph_connection *con) in read_partial_msg_data() argument
2262 struct ceph_msg *msg = con->in_msg; in read_partial_msg_data()
2264 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in read_partial_msg_data()
2276 crc = con->in_data_crc; in read_partial_msg_data()
2279 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); in read_partial_msg_data()
2282 con->in_data_crc = crc; in read_partial_msg_data()
2292 con->in_data_crc = crc; in read_partial_msg_data()
2300 static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
2302 static int read_partial_message(struct ceph_connection *con) in read_partial_message() argument
2304 struct ceph_msg *m = con->in_msg; in read_partial_message()
2309 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); in read_partial_message()
2310 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); in read_partial_message()
2314 dout("read_partial_message con %p msg %p\n", con, m); in read_partial_message()
2317 size = sizeof (con->in_hdr); in read_partial_message()
2319 ret = read_partial(con, end, size, &con->in_hdr); in read_partial_message()
2323 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); in read_partial_message()
2324 if (cpu_to_le32(crc) != con->in_hdr.crc) { in read_partial_message()
2326 crc, con->in_hdr.crc); in read_partial_message()
2330 front_len = le32_to_cpu(con->in_hdr.front_len); in read_partial_message()
2333 middle_len = le32_to_cpu(con->in_hdr.middle_len); in read_partial_message()
2336 data_len = le32_to_cpu(con->in_hdr.data_len); in read_partial_message()
2341 seq = le64_to_cpu(con->in_hdr.seq); in read_partial_message()
2342 if ((s64)seq - (s64)con->in_seq < 1) { in read_partial_message()
2344 ENTITY_NAME(con->peer_name), in read_partial_message()
2345 ceph_pr_addr(&con->peer_addr.in_addr), in read_partial_message()
2346 seq, con->in_seq + 1); in read_partial_message()
2347 con->in_base_pos = -front_len - middle_len - data_len - in read_partial_message()
2348 sizeof_footer(con); in read_partial_message()
2349 con->in_tag = CEPH_MSGR_TAG_READY; in read_partial_message()
2351 } else if ((s64)seq - (s64)con->in_seq > 1) { in read_partial_message()
2353 seq, con->in_seq + 1); in read_partial_message()
2354 con->error_msg = "bad message sequence # for incoming message"; in read_partial_message()
2359 if (!con->in_msg) { in read_partial_message()
2362 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, in read_partial_message()
2364 ret = ceph_con_in_msg_alloc(con, &skip); in read_partial_message()
2368 BUG_ON(!con->in_msg ^ skip); in read_partial_message()
2372 con->in_base_pos = -front_len - middle_len - data_len - in read_partial_message()
2373 sizeof_footer(con); in read_partial_message()
2374 con->in_tag = CEPH_MSGR_TAG_READY; in read_partial_message()
2375 con->in_seq++; in read_partial_message()
2379 BUG_ON(!con->in_msg); in read_partial_message()
2380 BUG_ON(con->in_msg->con != con); in read_partial_message()
2381 m = con->in_msg; in read_partial_message()
2389 prepare_message_data(con->in_msg, data_len); in read_partial_message()
2393 ret = read_partial_message_section(con, &m->front, front_len, in read_partial_message()
2394 &con->in_front_crc); in read_partial_message()
2400 ret = read_partial_message_section(con, &m->middle->vec, in read_partial_message()
2402 &con->in_middle_crc); in read_partial_message()
2409 ret = read_partial_msg_data(con); in read_partial_message()
2421 ret = read_partial(con, end, size, &m->footer); in read_partial_message()
2435 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { in read_partial_message()
2437 m, con->in_front_crc, m->footer.front_crc); in read_partial_message()
2440 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { in read_partial_message()
2442 m, con->in_middle_crc, m->footer.middle_crc); in read_partial_message()
2447 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { in read_partial_message()
2449 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); in read_partial_message()
2453 if (need_sign && con->ops->check_message_signature && in read_partial_message()
2454 con->ops->check_message_signature(m)) { in read_partial_message()
2467 static void process_message(struct ceph_connection *con) in process_message() argument
2469 struct ceph_msg *msg = con->in_msg; in process_message()
2471 BUG_ON(con->in_msg->con != con); in process_message()
2472 con->in_msg = NULL; in process_message()
2475 if (con->peer_name.type == 0) in process_message()
2476 con->peer_name = msg->hdr.src; in process_message()
2478 con->in_seq++; in process_message()
2479 mutex_unlock(&con->mutex); in process_message()
2488 con->in_front_crc, con->in_middle_crc, con->in_data_crc); in process_message()
2489 con->ops->dispatch(con, msg); in process_message()
2491 mutex_lock(&con->mutex); in process_message()
2494 static int read_keepalive_ack(struct ceph_connection *con) in read_keepalive_ack() argument
2498 int ret = read_partial(con, size, size, &ceph_ts); in read_keepalive_ack()
2501 ceph_decode_timespec(&con->last_keepalive_ack, &ceph_ts); in read_keepalive_ack()
2502 prepare_read_tag(con); in read_keepalive_ack()
2510 static int try_write(struct ceph_connection *con) in try_write() argument
2514 dout("try_write start %p state %lu\n", con, con->state); in try_write()
2517 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); in try_write()
2520 if (con->state == CON_STATE_PREOPEN) { in try_write()
2521 BUG_ON(con->sock); in try_write()
2522 con->state = CON_STATE_CONNECTING; in try_write()
2524 con_out_kvec_reset(con); in try_write()
2525 prepare_write_banner(con); in try_write()
2526 prepare_read_banner(con); in try_write()
2528 BUG_ON(con->in_msg); in try_write()
2529 con->in_tag = CEPH_MSGR_TAG_READY; in try_write()
2531 con, con->state); in try_write()
2532 ret = ceph_tcp_connect(con); in try_write()
2534 con->error_msg = "connect error"; in try_write()
2541 if (con->out_kvec_left) { in try_write()
2542 ret = write_partial_kvec(con); in try_write()
2546 if (con->out_skip) { in try_write()
2547 ret = write_partial_skip(con); in try_write()
2553 if (con->out_msg) { in try_write()
2554 if (con->out_msg_done) { in try_write()
2555 ceph_msg_put(con->out_msg); in try_write()
2556 con->out_msg = NULL; /* we're done with this one */ in try_write()
2560 ret = write_partial_message_data(con); in try_write()
2573 if (con->state == CON_STATE_OPEN) { in try_write()
2574 if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { in try_write()
2575 prepare_write_keepalive(con); in try_write()
2579 if (!list_empty(&con->out_queue)) { in try_write()
2580 prepare_write_message(con); in try_write()
2583 if (con->in_seq > con->in_seq_acked) { in try_write()
2584 prepare_write_ack(con); in try_write()
2590 con_flag_clear(con, CON_FLAG_WRITE_PENDING); in try_write()
2594 dout("try_write done on %p ret %d\n", con, ret); in try_write()
2603 static int try_read(struct ceph_connection *con) in try_read() argument
2608 dout("try_read start on %p state %lu\n", con, con->state); in try_read()
2609 if (con->state != CON_STATE_CONNECTING && in try_read()
2610 con->state != CON_STATE_NEGOTIATING && in try_read()
2611 con->state != CON_STATE_OPEN) in try_read()
2614 BUG_ON(!con->sock); in try_read()
2616 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, in try_read()
2617 con->in_base_pos); in try_read()
2619 if (con->state == CON_STATE_CONNECTING) { in try_read()
2621 ret = read_partial_banner(con); in try_read()
2624 ret = process_banner(con); in try_read()
2628 con->state = CON_STATE_NEGOTIATING; in try_read()
2635 ret = prepare_write_connect(con); in try_read()
2638 prepare_read_connect(con); in try_read()
2644 if (con->state == CON_STATE_NEGOTIATING) { in try_read()
2646 ret = read_partial_connect(con); in try_read()
2649 ret = process_connect(con); in try_read()
2655 WARN_ON(con->state != CON_STATE_OPEN); in try_read()
2657 if (con->in_base_pos < 0) { in try_read()
2664 int skip = min((int) sizeof (buf), -con->in_base_pos); in try_read()
2666 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); in try_read()
2667 ret = ceph_tcp_recvmsg(con->sock, buf, skip); in try_read()
2670 con->in_base_pos += ret; in try_read()
2671 if (con->in_base_pos) in try_read()
2674 if (con->in_tag == CEPH_MSGR_TAG_READY) { in try_read()
2678 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); in try_read()
2681 dout("try_read got tag %d\n", (int)con->in_tag); in try_read()
2682 switch (con->in_tag) { in try_read()
2684 prepare_read_message(con); in try_read()
2687 prepare_read_ack(con); in try_read()
2690 prepare_read_keepalive_ack(con); in try_read()
2693 con_close_socket(con); in try_read()
2694 con->state = CON_STATE_CLOSED; in try_read()
2700 if (con->in_tag == CEPH_MSGR_TAG_MSG) { in try_read()
2701 ret = read_partial_message(con); in try_read()
2705 con->error_msg = "bad crc/signature"; in try_read()
2711 con->error_msg = "io error"; in try_read()
2716 if (con->in_tag == CEPH_MSGR_TAG_READY) in try_read()
2718 process_message(con); in try_read()
2719 if (con->state == CON_STATE_OPEN) in try_read()
2720 prepare_read_tag(con); in try_read()
2723 if (con->in_tag == CEPH_MSGR_TAG_ACK || in try_read()
2724 con->in_tag == CEPH_MSGR_TAG_SEQ) { in try_read()
2729 ret = read_partial_ack(con); in try_read()
2732 process_ack(con); in try_read()
2735 if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { in try_read()
2736 ret = read_keepalive_ack(con); in try_read()
2743 dout("try_read done on %p ret %d\n", con, ret); in try_read()
2747 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); in try_read()
2748 con->error_msg = "protocol error, garbage tag"; in try_read()
2759 static int queue_con_delay(struct ceph_connection *con, unsigned long delay) in queue_con_delay() argument
2761 if (!con->ops->get(con)) { in queue_con_delay()
2762 dout("%s %p ref count 0\n", __func__, con); in queue_con_delay()
2766 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) { in queue_con_delay()
2767 dout("%s %p - already queued\n", __func__, con); in queue_con_delay()
2768 con->ops->put(con); in queue_con_delay()
2772 dout("%s %p %lu\n", __func__, con, delay); in queue_con_delay()
2776 static void queue_con(struct ceph_connection *con) in queue_con() argument
2778 (void) queue_con_delay(con, 0); in queue_con()
2781 static void cancel_con(struct ceph_connection *con) in cancel_con() argument
2783 if (cancel_delayed_work(&con->work)) { in cancel_con()
2784 dout("%s %p\n", __func__, con); in cancel_con()
2785 con->ops->put(con); in cancel_con()
2789 static bool con_sock_closed(struct ceph_connection *con) in con_sock_closed() argument
2791 if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED)) in con_sock_closed()
2796 con->error_msg = "socket closed (con state " #x ")"; \ in con_sock_closed()
2799 switch (con->state) { in con_sock_closed()
2808 __func__, con, con->state); in con_sock_closed()
2809 con->error_msg = "unrecognized con state"; in con_sock_closed()
2818 static bool con_backoff(struct ceph_connection *con) in con_backoff() argument
2822 if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF)) in con_backoff()
2825 ret = queue_con_delay(con, round_jiffies_relative(con->delay)); in con_backoff()
2828 con, con->delay); in con_backoff()
2830 con_flag_set(con, CON_FLAG_BACKOFF); in con_backoff()
2838 static void con_fault_finish(struct ceph_connection *con) in con_fault_finish() argument
2844 if (con->auth_retry && con->ops->invalidate_authorizer) { in con_fault_finish()
2846 con->ops->invalidate_authorizer(con); in con_fault_finish()
2849 if (con->ops->fault) in con_fault_finish()
2850 con->ops->fault(con); in con_fault_finish()
2858 struct ceph_connection *con = container_of(work, struct ceph_connection, in ceph_con_workfn() local
2862 mutex_lock(&con->mutex); in ceph_con_workfn()
2866 if ((fault = con_sock_closed(con))) { in ceph_con_workfn()
2867 dout("%s: con %p SOCK_CLOSED\n", __func__, con); in ceph_con_workfn()
2870 if (con_backoff(con)) { in ceph_con_workfn()
2871 dout("%s: con %p BACKOFF\n", __func__, con); in ceph_con_workfn()
2874 if (con->state == CON_STATE_STANDBY) { in ceph_con_workfn()
2875 dout("%s: con %p STANDBY\n", __func__, con); in ceph_con_workfn()
2878 if (con->state == CON_STATE_CLOSED) { in ceph_con_workfn()
2879 dout("%s: con %p CLOSED\n", __func__, con); in ceph_con_workfn()
2880 BUG_ON(con->sock); in ceph_con_workfn()
2883 if (con->state == CON_STATE_PREOPEN) { in ceph_con_workfn()
2884 dout("%s: con %p PREOPEN\n", __func__, con); in ceph_con_workfn()
2885 BUG_ON(con->sock); in ceph_con_workfn()
2888 ret = try_read(con); in ceph_con_workfn()
2892 if (!con->error_msg) in ceph_con_workfn()
2893 con->error_msg = "socket error on read"; in ceph_con_workfn()
2898 ret = try_write(con); in ceph_con_workfn()
2902 if (!con->error_msg) in ceph_con_workfn()
2903 con->error_msg = "socket error on write"; in ceph_con_workfn()
2910 con_fault(con); in ceph_con_workfn()
2911 mutex_unlock(&con->mutex); in ceph_con_workfn()
2914 con_fault_finish(con); in ceph_con_workfn()
2916 con->ops->put(con); in ceph_con_workfn()
2923 static void con_fault(struct ceph_connection *con) in con_fault() argument
2926 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); in con_fault()
2928 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), in con_fault()
2929 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); in con_fault()
2930 con->error_msg = NULL; in con_fault()
2932 WARN_ON(con->state != CON_STATE_CONNECTING && in con_fault()
2933 con->state != CON_STATE_NEGOTIATING && in con_fault()
2934 con->state != CON_STATE_OPEN); in con_fault()
2936 con_close_socket(con); in con_fault()
2938 if (con_flag_test(con, CON_FLAG_LOSSYTX)) { in con_fault()
2940 con->state = CON_STATE_CLOSED; in con_fault()
2944 if (con->in_msg) { in con_fault()
2945 BUG_ON(con->in_msg->con != con); in con_fault()
2946 ceph_msg_put(con->in_msg); in con_fault()
2947 con->in_msg = NULL; in con_fault()
2951 list_splice_init(&con->out_sent, &con->out_queue); in con_fault()
2955 if (list_empty(&con->out_queue) && in con_fault()
2956 !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) { in con_fault()
2957 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); in con_fault()
2958 con_flag_clear(con, CON_FLAG_WRITE_PENDING); in con_fault()
2959 con->state = CON_STATE_STANDBY; in con_fault()
2962 con->state = CON_STATE_PREOPEN; in con_fault()
2963 if (con->delay == 0) in con_fault()
2964 con->delay = BASE_DELAY_INTERVAL; in con_fault()
2965 else if (con->delay < MAX_DELAY_INTERVAL) in con_fault()
2966 con->delay *= 2; in con_fault()
2967 con_flag_set(con, CON_FLAG_BACKOFF); in con_fault()
2968 queue_con(con); in con_fault()
3003 static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con) in msg_con_set() argument
3005 if (msg->con) in msg_con_set()
3006 msg->con->ops->put(msg->con); in msg_con_set()
3008 msg->con = con ? con->ops->get(con) : NULL; in msg_con_set()
3009 BUG_ON(msg->con != con); in msg_con_set()
3012 static void clear_standby(struct ceph_connection *con) in clear_standby() argument
3015 if (con->state == CON_STATE_STANDBY) { in clear_standby()
3016 dout("clear_standby %p and ++connect_seq\n", con); in clear_standby()
3017 con->state = CON_STATE_PREOPEN; in clear_standby()
3018 con->connect_seq++; in clear_standby()
3019 WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING)); in clear_standby()
3020 WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)); in clear_standby()
3027 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) in ceph_con_send() argument
3030 msg->hdr.src = con->msgr->inst.name; in ceph_con_send()
3034 mutex_lock(&con->mutex); in ceph_con_send()
3036 if (con->state == CON_STATE_CLOSED) { in ceph_con_send()
3037 dout("con_send %p closed, dropping %p\n", con, msg); in ceph_con_send()
3039 mutex_unlock(&con->mutex); in ceph_con_send()
3043 msg_con_set(msg, con); in ceph_con_send()
3046 list_add_tail(&msg->list_head, &con->out_queue); in ceph_con_send()
3048 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), in ceph_con_send()
3054 clear_standby(con); in ceph_con_send()
3055 mutex_unlock(&con->mutex); in ceph_con_send()
3059 if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) in ceph_con_send()
3060 queue_con(con); in ceph_con_send()
3069 struct ceph_connection *con = msg->con; in ceph_msg_revoke() local
3071 if (!con) { in ceph_msg_revoke()
3076 mutex_lock(&con->mutex); in ceph_msg_revoke()
3078 dout("%s %p msg %p - was on queue\n", __func__, con, msg); in ceph_msg_revoke()
3084 if (con->out_msg == msg) { in ceph_msg_revoke()
3085 BUG_ON(con->out_skip); in ceph_msg_revoke()
3087 if (con->out_msg_done) { in ceph_msg_revoke()
3088 con->out_skip += con_out_kvec_skip(con); in ceph_msg_revoke()
3091 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) in ceph_msg_revoke()
3092 con->out_skip += sizeof(msg->footer); in ceph_msg_revoke()
3094 con->out_skip += sizeof(msg->old_footer); in ceph_msg_revoke()
3098 con->out_skip += msg->cursor.total_resid; in ceph_msg_revoke()
3100 con->out_skip += con_out_kvec_skip(con); in ceph_msg_revoke()
3101 con->out_skip += con_out_kvec_skip(con); in ceph_msg_revoke()
3104 __func__, con, msg, con->out_kvec_bytes, con->out_skip); in ceph_msg_revoke()
3106 con->out_msg = NULL; in ceph_msg_revoke()
3110 mutex_unlock(&con->mutex); in ceph_msg_revoke()
3118 struct ceph_connection *con = msg->con; in ceph_msg_revoke_incoming() local
3120 if (!con) { in ceph_msg_revoke_incoming()
3125 mutex_lock(&con->mutex); in ceph_msg_revoke_incoming()
3126 if (con->in_msg == msg) { in ceph_msg_revoke_incoming()
3127 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); in ceph_msg_revoke_incoming()
3128 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); in ceph_msg_revoke_incoming()
3129 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); in ceph_msg_revoke_incoming()
3132 dout("%s %p msg %p revoked\n", __func__, con, msg); in ceph_msg_revoke_incoming()
3133 con->in_base_pos = con->in_base_pos - in ceph_msg_revoke_incoming()
3139 ceph_msg_put(con->in_msg); in ceph_msg_revoke_incoming()
3140 con->in_msg = NULL; in ceph_msg_revoke_incoming()
3141 con->in_tag = CEPH_MSGR_TAG_READY; in ceph_msg_revoke_incoming()
3142 con->in_seq++; in ceph_msg_revoke_incoming()
3145 __func__, con, con->in_msg, msg); in ceph_msg_revoke_incoming()
3147 mutex_unlock(&con->mutex); in ceph_msg_revoke_incoming()
3153 void ceph_con_keepalive(struct ceph_connection *con) in ceph_con_keepalive() argument
3155 dout("con_keepalive %p\n", con); in ceph_con_keepalive()
3156 mutex_lock(&con->mutex); in ceph_con_keepalive()
3157 clear_standby(con); in ceph_con_keepalive()
3158 mutex_unlock(&con->mutex); in ceph_con_keepalive()
3159 if (con_flag_test_and_set(con, CON_FLAG_KEEPALIVE_PENDING) == 0 && in ceph_con_keepalive()
3160 con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) in ceph_con_keepalive()
3161 queue_con(con); in ceph_con_keepalive()
3165 bool ceph_con_keepalive_expired(struct ceph_connection *con, in ceph_con_keepalive_expired() argument
3169 (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) { in ceph_con_keepalive_expired()
3173 ts = timespec_add(con->last_keepalive_ack, ts); in ceph_con_keepalive_expired()
3319 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) in ceph_alloc_middle() argument
3350 static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) in ceph_con_in_msg_alloc() argument
3352 struct ceph_msg_header *hdr = &con->in_hdr; in ceph_con_in_msg_alloc()
3357 BUG_ON(con->in_msg != NULL); in ceph_con_in_msg_alloc()
3358 BUG_ON(!con->ops->alloc_msg); in ceph_con_in_msg_alloc()
3360 mutex_unlock(&con->mutex); in ceph_con_in_msg_alloc()
3361 msg = con->ops->alloc_msg(con, hdr, skip); in ceph_con_in_msg_alloc()
3362 mutex_lock(&con->mutex); in ceph_con_in_msg_alloc()
3363 if (con->state != CON_STATE_OPEN) { in ceph_con_in_msg_alloc()
3370 msg_con_set(msg, con); in ceph_con_in_msg_alloc()
3371 con->in_msg = msg; in ceph_con_in_msg_alloc()
3381 con->error_msg = "error allocating memory for incoming message"; in ceph_con_in_msg_alloc()
3384 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); in ceph_con_in_msg_alloc()
3386 if (middle_len && !con->in_msg->middle) { in ceph_con_in_msg_alloc()
3387 ret = ceph_alloc_middle(con, con->in_msg); in ceph_con_in_msg_alloc()
3389 ceph_msg_put(con->in_msg); in ceph_con_in_msg_alloc()
3390 con->in_msg = NULL; in ceph_con_in_msg_alloc()