1/******************************************************************************
2*******************************************************************************
3**
4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
6**
7**  This copyrighted material is made available to anyone wishing to use,
8**  modify, copy, or redistribute it subject to the terms and conditions
9**  of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14/*
15 * lowcomms.c
16 *
17 * This is the "low-level" comms layer.
18 *
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
21 *
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is its
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
28 *
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
38 *
39 * lowcomms will choose to use either TCP or SCTP as its transport layer
40 * depending on the configuration variable 'protocol'. This should be set
41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
42 * cluster-wide mechanism as it must be the same on all nodes of the cluster
43 * for the DLM to function.
44 *
45 */
46
47#include <asm/ioctls.h>
48#include <net/sock.h>
49#include <net/tcp.h>
50#include <linux/pagemap.h>
51#include <linux/file.h>
52#include <linux/mutex.h>
53#include <linux/sctp.h>
54#include <linux/slab.h>
55#include <net/sctp/sctp.h>
56#include <net/ipv6.h>
57
58#include "dlm_internal.h"
59#include "lowcomms.h"
60#include "midcomms.h"
61#include "config.h"
62
63#define NEEDED_RMEM (4*1024*1024)
64#define CONN_HASH_SIZE 32
65
66/* Number of messages to send before rescheduling */
67#define MAX_SEND_MSG_COUNT 25
68
69struct cbuf {
70	unsigned int base;
71	unsigned int len;
72	unsigned int mask;
73};
74
75static void cbuf_add(struct cbuf *cb, int n)
76{
77	cb->len += n;
78}
79
80static int cbuf_data(struct cbuf *cb)
81{
82	return ((cb->base + cb->len) & cb->mask);
83}
84
85static void cbuf_init(struct cbuf *cb, int size)
86{
87	cb->base = cb->len = 0;
88	cb->mask = size-1;
89}
90
91static void cbuf_eat(struct cbuf *cb, int n)
92{
93	cb->len  -= n;
94	cb->base += n;
95	cb->base &= cb->mask;
96}
97
98static bool cbuf_empty(struct cbuf *cb)
99{
100	return cb->len == 0;
101}
102
103struct connection {
104	struct socket *sock;	/* NULL if not connected */
105	uint32_t nodeid;	/* So we know who we are in the list */
106	struct mutex sock_mutex;
107	unsigned long flags;
108#define CF_READ_PENDING 1
109#define CF_WRITE_PENDING 2
110#define CF_CONNECT_PENDING 3
111#define CF_INIT_PENDING 4
112#define CF_IS_OTHERCON 5
113#define CF_CLOSE 6
114#define CF_APP_LIMITED 7
115	struct list_head writequeue;  /* List of outgoing writequeue_entries */
116	spinlock_t writequeue_lock;
117	int (*rx_action) (struct connection *);	/* What to do when active */
118	void (*connect_action) (struct connection *);	/* What to do to connect */
119	struct page *rx_page;
120	struct cbuf cb;
121	int retries;
122#define MAX_CONNECT_RETRIES 3
123	int sctp_assoc;
124	struct hlist_node list;
125	struct connection *othercon;
126	struct work_struct rwork; /* Receive workqueue */
127	struct work_struct swork; /* Send workqueue */
128	bool try_new_addr;
129};
130#define sock2con(x) ((struct connection *)(x)->sk_user_data)
131
132/* An entry waiting to be sent */
133struct writequeue_entry {
134	struct list_head list;
135	struct page *page;
136	int offset;
137	int len;
138	int end;
139	int users;
140	struct connection *con;
141};
142
143struct dlm_node_addr {
144	struct list_head list;
145	int nodeid;
146	int addr_count;
147	int curr_addr_index;
148	struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
149};
150
151static LIST_HEAD(dlm_node_addrs);
152static DEFINE_SPINLOCK(dlm_node_addrs_spin);
153
154static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
155static int dlm_local_count;
156static int dlm_allow_conn;
157
158/* Work queues */
159static struct workqueue_struct *recv_workqueue;
160static struct workqueue_struct *send_workqueue;
161
162static struct hlist_head connection_hash[CONN_HASH_SIZE];
163static DEFINE_MUTEX(connections_lock);
164static struct kmem_cache *con_cache;
165
166static void process_recv_sockets(struct work_struct *work);
167static void process_send_sockets(struct work_struct *work);
168
169
170/* This is deliberately very simple because most clusters have simple
171   sequential nodeids, so we should be able to go straight to a connection
172   struct in the array */
173static inline int nodeid_hash(int nodeid)
174{
175	return nodeid & (CONN_HASH_SIZE-1);
176}
177
178static struct connection *__find_con(int nodeid)
179{
180	int r;
181	struct connection *con;
182
183	r = nodeid_hash(nodeid);
184
185	hlist_for_each_entry(con, &connection_hash[r], list) {
186		if (con->nodeid == nodeid)
187			return con;
188	}
189	return NULL;
190}
191
192/*
193 * If 'allocation' is zero then we don't attempt to create a new
194 * connection structure for this node.
195 */
196static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
197{
198	struct connection *con = NULL;
199	int r;
200
201	con = __find_con(nodeid);
202	if (con || !alloc)
203		return con;
204
205	con = kmem_cache_zalloc(con_cache, alloc);
206	if (!con)
207		return NULL;
208
209	r = nodeid_hash(nodeid);
210	hlist_add_head(&con->list, &connection_hash[r]);
211
212	con->nodeid = nodeid;
213	mutex_init(&con->sock_mutex);
214	INIT_LIST_HEAD(&con->writequeue);
215	spin_lock_init(&con->writequeue_lock);
216	INIT_WORK(&con->swork, process_send_sockets);
217	INIT_WORK(&con->rwork, process_recv_sockets);
218
219	/* Setup action pointers for child sockets */
220	if (con->nodeid) {
221		struct connection *zerocon = __find_con(0);
222
223		con->connect_action = zerocon->connect_action;
224		if (!con->rx_action)
225			con->rx_action = zerocon->rx_action;
226	}
227
228	return con;
229}
230
231/* Loop round all connections */
232static void foreach_conn(void (*conn_func)(struct connection *c))
233{
234	int i;
235	struct hlist_node *n;
236	struct connection *con;
237
238	for (i = 0; i < CONN_HASH_SIZE; i++) {
239		hlist_for_each_entry_safe(con, n, &connection_hash[i], list)
240			conn_func(con);
241	}
242}
243
244static struct connection *nodeid2con(int nodeid, gfp_t allocation)
245{
246	struct connection *con;
247
248	mutex_lock(&connections_lock);
249	con = __nodeid2con(nodeid, allocation);
250	mutex_unlock(&connections_lock);
251
252	return con;
253}
254
255/* This is a bit drastic, but only called when things go wrong */
256static struct connection *assoc2con(int assoc_id)
257{
258	int i;
259	struct connection *con;
260
261	mutex_lock(&connections_lock);
262
263	for (i = 0 ; i < CONN_HASH_SIZE; i++) {
264		hlist_for_each_entry(con, &connection_hash[i], list) {
265			if (con->sctp_assoc == assoc_id) {
266				mutex_unlock(&connections_lock);
267				return con;
268			}
269		}
270	}
271	mutex_unlock(&connections_lock);
272	return NULL;
273}
274
275static struct dlm_node_addr *find_node_addr(int nodeid)
276{
277	struct dlm_node_addr *na;
278
279	list_for_each_entry(na, &dlm_node_addrs, list) {
280		if (na->nodeid == nodeid)
281			return na;
282	}
283	return NULL;
284}
285
286static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
287{
288	switch (x->ss_family) {
289	case AF_INET: {
290		struct sockaddr_in *sinx = (struct sockaddr_in *)x;
291		struct sockaddr_in *siny = (struct sockaddr_in *)y;
292		if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
293			return 0;
294		if (sinx->sin_port != siny->sin_port)
295			return 0;
296		break;
297	}
298	case AF_INET6: {
299		struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
300		struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
301		if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
302			return 0;
303		if (sinx->sin6_port != siny->sin6_port)
304			return 0;
305		break;
306	}
307	default:
308		return 0;
309	}
310	return 1;
311}
312
313static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
314			  struct sockaddr *sa_out, bool try_new_addr)
315{
316	struct sockaddr_storage sas;
317	struct dlm_node_addr *na;
318
319	if (!dlm_local_count)
320		return -1;
321
322	spin_lock(&dlm_node_addrs_spin);
323	na = find_node_addr(nodeid);
324	if (na && na->addr_count) {
325		if (try_new_addr) {
326			na->curr_addr_index++;
327			if (na->curr_addr_index == na->addr_count)
328				na->curr_addr_index = 0;
329		}
330
331		memcpy(&sas, na->addr[na->curr_addr_index ],
332			sizeof(struct sockaddr_storage));
333	}
334	spin_unlock(&dlm_node_addrs_spin);
335
336	if (!na)
337		return -EEXIST;
338
339	if (!na->addr_count)
340		return -ENOENT;
341
342	if (sas_out)
343		memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
344
345	if (!sa_out)
346		return 0;
347
348	if (dlm_local_addr[0]->ss_family == AF_INET) {
349		struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
350		struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
351		ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
352	} else {
353		struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
354		struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
355		ret6->sin6_addr = in6->sin6_addr;
356	}
357
358	return 0;
359}
360
361static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
362{
363	struct dlm_node_addr *na;
364	int rv = -EEXIST;
365	int addr_i;
366
367	spin_lock(&dlm_node_addrs_spin);
368	list_for_each_entry(na, &dlm_node_addrs, list) {
369		if (!na->addr_count)
370			continue;
371
372		for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
373			if (addr_compare(na->addr[addr_i], addr)) {
374				*nodeid = na->nodeid;
375				rv = 0;
376				goto unlock;
377			}
378		}
379	}
380unlock:
381	spin_unlock(&dlm_node_addrs_spin);
382	return rv;
383}
384
385int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
386{
387	struct sockaddr_storage *new_addr;
388	struct dlm_node_addr *new_node, *na;
389
390	new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
391	if (!new_node)
392		return -ENOMEM;
393
394	new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
395	if (!new_addr) {
396		kfree(new_node);
397		return -ENOMEM;
398	}
399
400	memcpy(new_addr, addr, len);
401
402	spin_lock(&dlm_node_addrs_spin);
403	na = find_node_addr(nodeid);
404	if (!na) {
405		new_node->nodeid = nodeid;
406		new_node->addr[0] = new_addr;
407		new_node->addr_count = 1;
408		list_add(&new_node->list, &dlm_node_addrs);
409		spin_unlock(&dlm_node_addrs_spin);
410		return 0;
411	}
412
413	if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
414		spin_unlock(&dlm_node_addrs_spin);
415		kfree(new_addr);
416		kfree(new_node);
417		return -ENOSPC;
418	}
419
420	na->addr[na->addr_count++] = new_addr;
421	spin_unlock(&dlm_node_addrs_spin);
422	kfree(new_node);
423	return 0;
424}
425
426/* Data available on socket or listen socket received a connect */
427static void lowcomms_data_ready(struct sock *sk)
428{
429	struct connection *con = sock2con(sk);
430	if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
431		queue_work(recv_workqueue, &con->rwork);
432}
433
434static void lowcomms_write_space(struct sock *sk)
435{
436	struct connection *con = sock2con(sk);
437
438	if (!con)
439		return;
440
441	clear_bit(SOCK_NOSPACE, &con->sock->flags);
442
443	if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
444		con->sock->sk->sk_write_pending--;
445		clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags);
446	}
447
448	if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
449		queue_work(send_workqueue, &con->swork);
450}
451
452static inline void lowcomms_connect_sock(struct connection *con)
453{
454	if (test_bit(CF_CLOSE, &con->flags))
455		return;
456	if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
457		queue_work(send_workqueue, &con->swork);
458}
459
460static void lowcomms_state_change(struct sock *sk)
461{
462	if (sk->sk_state == TCP_ESTABLISHED)
463		lowcomms_write_space(sk);
464}
465
466int dlm_lowcomms_connect_node(int nodeid)
467{
468	struct connection *con;
469
470	/* with sctp there's no connecting without sending */
471	if (dlm_config.ci_protocol != 0)
472		return 0;
473
474	if (nodeid == dlm_our_nodeid())
475		return 0;
476
477	con = nodeid2con(nodeid, GFP_NOFS);
478	if (!con)
479		return -ENOMEM;
480	lowcomms_connect_sock(con);
481	return 0;
482}
483
484/* Make a socket active */
485static void add_sock(struct socket *sock, struct connection *con)
486{
487	con->sock = sock;
488
489	/* Install a data_ready callback */
490	con->sock->sk->sk_data_ready = lowcomms_data_ready;
491	con->sock->sk->sk_write_space = lowcomms_write_space;
492	con->sock->sk->sk_state_change = lowcomms_state_change;
493	con->sock->sk->sk_user_data = con;
494	con->sock->sk->sk_allocation = GFP_NOFS;
495}
496
497/* Add the port number to an IPv6 or 4 sockaddr and return the address
498   length */
499static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
500			  int *addr_len)
501{
502	saddr->ss_family =  dlm_local_addr[0]->ss_family;
503	if (saddr->ss_family == AF_INET) {
504		struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
505		in4_addr->sin_port = cpu_to_be16(port);
506		*addr_len = sizeof(struct sockaddr_in);
507		memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
508	} else {
509		struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
510		in6_addr->sin6_port = cpu_to_be16(port);
511		*addr_len = sizeof(struct sockaddr_in6);
512	}
513	memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
514}
515
516/* Close a remote connection and tidy up */
517static void close_connection(struct connection *con, bool and_other)
518{
519	mutex_lock(&con->sock_mutex);
520
521	if (con->sock) {
522		sock_release(con->sock);
523		con->sock = NULL;
524	}
525	if (con->othercon && and_other) {
526		/* Will only re-enter once. */
527		close_connection(con->othercon, false);
528	}
529	if (con->rx_page) {
530		__free_page(con->rx_page);
531		con->rx_page = NULL;
532	}
533
534	con->retries = 0;
535	mutex_unlock(&con->sock_mutex);
536}
537
538/* We only send shutdown messages to nodes that are not part of the cluster */
539static void sctp_send_shutdown(sctp_assoc_t associd)
540{
541	static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
542	struct msghdr outmessage;
543	struct cmsghdr *cmsg;
544	struct sctp_sndrcvinfo *sinfo;
545	int ret;
546	struct connection *con;
547
548	con = nodeid2con(0,0);
549	BUG_ON(con == NULL);
550
551	outmessage.msg_name = NULL;
552	outmessage.msg_namelen = 0;
553	outmessage.msg_control = outcmsg;
554	outmessage.msg_controllen = sizeof(outcmsg);
555	outmessage.msg_flags = MSG_EOR;
556
557	cmsg = CMSG_FIRSTHDR(&outmessage);
558	cmsg->cmsg_level = IPPROTO_SCTP;
559	cmsg->cmsg_type = SCTP_SNDRCV;
560	cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
561	outmessage.msg_controllen = cmsg->cmsg_len;
562	sinfo = CMSG_DATA(cmsg);
563	memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
564
565	sinfo->sinfo_flags |= MSG_EOF;
566	sinfo->sinfo_assoc_id = associd;
567
568	ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
569
570	if (ret != 0)
571		log_print("send EOF to node failed: %d", ret);
572}
573
574static void sctp_init_failed_foreach(struct connection *con)
575{
576
577	/*
578	 * Don't try to recover base con and handle race where the
579	 * other node's assoc init creates a assoc and we get that
580	 * notification, then we get a notification that our attempt
581	 * failed due. This happens when we are still trying the primary
582	 * address, but the other node has already tried secondary addrs
583	 * and found one that worked.
584	 */
585	if (!con->nodeid || con->sctp_assoc)
586		return;
587
588	log_print("Retrying SCTP association init for node %d\n", con->nodeid);
589
590	con->try_new_addr = true;
591	con->sctp_assoc = 0;
592	if (test_and_clear_bit(CF_INIT_PENDING, &con->flags)) {
593		if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
594			queue_work(send_workqueue, &con->swork);
595	}
596}
597
598/* INIT failed but we don't know which node...
599   restart INIT on all pending nodes */
600static void sctp_init_failed(void)
601{
602	mutex_lock(&connections_lock);
603
604	foreach_conn(sctp_init_failed_foreach);
605
606	mutex_unlock(&connections_lock);
607}
608
609static void retry_failed_sctp_send(struct connection *recv_con,
610				   struct sctp_send_failed *sn_send_failed,
611				   char *buf)
612{
613	int len = sn_send_failed->ssf_length - sizeof(struct sctp_send_failed);
614	struct dlm_mhandle *mh;
615	struct connection *con;
616	char *retry_buf;
617	int nodeid = sn_send_failed->ssf_info.sinfo_ppid;
618
619	log_print("Retry sending %d bytes to node id %d", len, nodeid);
620
621	if (!nodeid) {
622		log_print("Shouldn't resend data via listening connection.");
623		return;
624	}
625
626	con = nodeid2con(nodeid, 0);
627	if (!con) {
628		log_print("Could not look up con for nodeid %d\n",
629			  nodeid);
630		return;
631	}
632
633	mh = dlm_lowcomms_get_buffer(nodeid, len, GFP_NOFS, &retry_buf);
634	if (!mh) {
635		log_print("Could not allocate buf for retry.");
636		return;
637	}
638	memcpy(retry_buf, buf + sizeof(struct sctp_send_failed), len);
639	dlm_lowcomms_commit_buffer(mh);
640
641	/*
642	 * If we got a assoc changed event before the send failed event then
643	 * we only need to retry the send.
644	 */
645	if (con->sctp_assoc) {
646		if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
647			queue_work(send_workqueue, &con->swork);
648	} else
649		sctp_init_failed_foreach(con);
650}
651
652/* Something happened to an association */
653static void process_sctp_notification(struct connection *con,
654				      struct msghdr *msg, char *buf)
655{
656	union sctp_notification *sn = (union sctp_notification *)buf;
657	struct linger linger;
658
659	switch (sn->sn_header.sn_type) {
660	case SCTP_SEND_FAILED:
661		retry_failed_sctp_send(con, &sn->sn_send_failed, buf);
662		break;
663	case SCTP_ASSOC_CHANGE:
664		switch (sn->sn_assoc_change.sac_state) {
665		case SCTP_COMM_UP:
666		case SCTP_RESTART:
667		{
668			/* Check that the new node is in the lockspace */
669			struct sctp_prim prim;
670			int nodeid;
671			int prim_len, ret;
672			int addr_len;
673			struct connection *new_con;
674
675			/*
676			 * We get this before any data for an association.
677			 * We verify that the node is in the cluster and
678			 * then peel off a socket for it.
679			 */
680			if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
681				log_print("COMM_UP for invalid assoc ID %d",
682					 (int)sn->sn_assoc_change.sac_assoc_id);
683				sctp_init_failed();
684				return;
685			}
686			memset(&prim, 0, sizeof(struct sctp_prim));
687			prim_len = sizeof(struct sctp_prim);
688			prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
689
690			ret = kernel_getsockopt(con->sock,
691						IPPROTO_SCTP,
692						SCTP_PRIMARY_ADDR,
693						(char*)&prim,
694						&prim_len);
695			if (ret < 0) {
696				log_print("getsockopt/sctp_primary_addr on "
697					  "new assoc %d failed : %d",
698					  (int)sn->sn_assoc_change.sac_assoc_id,
699					  ret);
700
701				/* Retry INIT later */
702				new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
703				if (new_con)
704					clear_bit(CF_CONNECT_PENDING, &con->flags);
705				return;
706			}
707			make_sockaddr(&prim.ssp_addr, 0, &addr_len);
708			if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
709				unsigned char *b=(unsigned char *)&prim.ssp_addr;
710				log_print("reject connect from unknown addr");
711				print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
712						     b, sizeof(struct sockaddr_storage));
713				sctp_send_shutdown(prim.ssp_assoc_id);
714				return;
715			}
716
717			new_con = nodeid2con(nodeid, GFP_NOFS);
718			if (!new_con)
719				return;
720
721			/* Peel off a new sock */
722			lock_sock(con->sock->sk);
723			ret = sctp_do_peeloff(con->sock->sk,
724				sn->sn_assoc_change.sac_assoc_id,
725				&new_con->sock);
726			release_sock(con->sock->sk);
727			if (ret < 0) {
728				log_print("Can't peel off a socket for "
729					  "connection %d to node %d: err=%d",
730					  (int)sn->sn_assoc_change.sac_assoc_id,
731					  nodeid, ret);
732				return;
733			}
734			add_sock(new_con->sock, new_con);
735
736			linger.l_onoff = 1;
737			linger.l_linger = 0;
738			ret = kernel_setsockopt(new_con->sock, SOL_SOCKET, SO_LINGER,
739						(char *)&linger, sizeof(linger));
740			if (ret < 0)
741				log_print("set socket option SO_LINGER failed");
742
743			log_print("connecting to %d sctp association %d",
744				 nodeid, (int)sn->sn_assoc_change.sac_assoc_id);
745
746			new_con->sctp_assoc = sn->sn_assoc_change.sac_assoc_id;
747			new_con->try_new_addr = false;
748			/* Send any pending writes */
749			clear_bit(CF_CONNECT_PENDING, &new_con->flags);
750			clear_bit(CF_INIT_PENDING, &new_con->flags);
751			if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
752				queue_work(send_workqueue, &new_con->swork);
753			}
754			if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
755				queue_work(recv_workqueue, &new_con->rwork);
756		}
757		break;
758
759		case SCTP_COMM_LOST:
760		case SCTP_SHUTDOWN_COMP:
761		{
762			con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
763			if (con) {
764				con->sctp_assoc = 0;
765			}
766		}
767		break;
768
769		case SCTP_CANT_STR_ASSOC:
770		{
771			/* Will retry init when we get the send failed notification */
772			log_print("Can't start SCTP association - retrying");
773		}
774		break;
775
776		default:
777			log_print("unexpected SCTP assoc change id=%d state=%d",
778				  (int)sn->sn_assoc_change.sac_assoc_id,
779				  sn->sn_assoc_change.sac_state);
780		}
781	default:
782		; /* fall through */
783	}
784}
785
786/* Data received from remote end */
787static int receive_from_sock(struct connection *con)
788{
789	int ret = 0;
790	struct msghdr msg = {};
791	struct kvec iov[2];
792	unsigned len;
793	int r;
794	int call_again_soon = 0;
795	int nvec;
796	char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
797
798	mutex_lock(&con->sock_mutex);
799
800	if (con->sock == NULL) {
801		ret = -EAGAIN;
802		goto out_close;
803	}
804
805	if (con->rx_page == NULL) {
806		/*
807		 * This doesn't need to be atomic, but I think it should
808		 * improve performance if it is.
809		 */
810		con->rx_page = alloc_page(GFP_ATOMIC);
811		if (con->rx_page == NULL)
812			goto out_resched;
813		cbuf_init(&con->cb, PAGE_CACHE_SIZE);
814	}
815
816	/* Only SCTP needs these really */
817	memset(&incmsg, 0, sizeof(incmsg));
818	msg.msg_control = incmsg;
819	msg.msg_controllen = sizeof(incmsg);
820
821	/*
822	 * iov[0] is the bit of the circular buffer between the current end
823	 * point (cb.base + cb.len) and the end of the buffer.
824	 */
825	iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
826	iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
827	iov[1].iov_len = 0;
828	nvec = 1;
829
830	/*
831	 * iov[1] is the bit of the circular buffer between the start of the
832	 * buffer and the start of the currently used section (cb.base)
833	 */
834	if (cbuf_data(&con->cb) >= con->cb.base) {
835		iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
836		iov[1].iov_len = con->cb.base;
837		iov[1].iov_base = page_address(con->rx_page);
838		nvec = 2;
839	}
840	len = iov[0].iov_len + iov[1].iov_len;
841
842	r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
843			       MSG_DONTWAIT | MSG_NOSIGNAL);
844	if (ret <= 0)
845		goto out_close;
846
847	/* Process SCTP notifications */
848	if (msg.msg_flags & MSG_NOTIFICATION) {
849		msg.msg_control = incmsg;
850		msg.msg_controllen = sizeof(incmsg);
851
852		process_sctp_notification(con, &msg,
853				page_address(con->rx_page) + con->cb.base);
854		mutex_unlock(&con->sock_mutex);
855		return 0;
856	}
857	BUG_ON(con->nodeid == 0);
858
859	if (ret == len)
860		call_again_soon = 1;
861	cbuf_add(&con->cb, ret);
862	ret = dlm_process_incoming_buffer(con->nodeid,
863					  page_address(con->rx_page),
864					  con->cb.base, con->cb.len,
865					  PAGE_CACHE_SIZE);
866	if (ret == -EBADMSG) {
867		log_print("lowcomms: addr=%p, base=%u, len=%u, "
868			  "iov_len=%u, iov_base[0]=%p, read=%d",
869			  page_address(con->rx_page), con->cb.base, con->cb.len,
870			  len, iov[0].iov_base, r);
871	}
872	if (ret < 0)
873		goto out_close;
874	cbuf_eat(&con->cb, ret);
875
876	if (cbuf_empty(&con->cb) && !call_again_soon) {
877		__free_page(con->rx_page);
878		con->rx_page = NULL;
879	}
880
881	if (call_again_soon)
882		goto out_resched;
883	mutex_unlock(&con->sock_mutex);
884	return 0;
885
886out_resched:
887	if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
888		queue_work(recv_workqueue, &con->rwork);
889	mutex_unlock(&con->sock_mutex);
890	return -EAGAIN;
891
892out_close:
893	mutex_unlock(&con->sock_mutex);
894	if (ret != -EAGAIN) {
895		close_connection(con, false);
896		/* Reconnect when there is something to send */
897	}
898	/* Don't return success if we really got EOF */
899	if (ret == 0)
900		ret = -EAGAIN;
901
902	return ret;
903}
904
905/* Listening socket is busy, accept a connection */
906static int tcp_accept_from_sock(struct connection *con)
907{
908	int result;
909	struct sockaddr_storage peeraddr;
910	struct socket *newsock;
911	int len;
912	int nodeid;
913	struct connection *newcon;
914	struct connection *addcon;
915
916	mutex_lock(&connections_lock);
917	if (!dlm_allow_conn) {
918		mutex_unlock(&connections_lock);
919		return -1;
920	}
921	mutex_unlock(&connections_lock);
922
923	memset(&peeraddr, 0, sizeof(peeraddr));
924	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
925				  IPPROTO_TCP, &newsock);
926	if (result < 0)
927		return -ENOMEM;
928
929	mutex_lock_nested(&con->sock_mutex, 0);
930
931	result = -ENOTCONN;
932	if (con->sock == NULL)
933		goto accept_err;
934
935	newsock->type = con->sock->type;
936	newsock->ops = con->sock->ops;
937
938	result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
939	if (result < 0)
940		goto accept_err;
941
942	/* Get the connected socket's peer */
943	memset(&peeraddr, 0, sizeof(peeraddr));
944	if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
945				  &len, 2)) {
946		result = -ECONNABORTED;
947		goto accept_err;
948	}
949
950	/* Get the new node's NODEID */
951	make_sockaddr(&peeraddr, 0, &len);
952	if (addr_to_nodeid(&peeraddr, &nodeid)) {
953		unsigned char *b=(unsigned char *)&peeraddr;
954		log_print("connect from non cluster node");
955		print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
956				     b, sizeof(struct sockaddr_storage));
957		sock_release(newsock);
958		mutex_unlock(&con->sock_mutex);
959		return -1;
960	}
961
962	log_print("got connection from %d", nodeid);
963
964	/*  Check to see if we already have a connection to this node. This
965	 *  could happen if the two nodes initiate a connection at roughly
966	 *  the same time and the connections cross on the wire.
967	 *  In this case we store the incoming one in "othercon"
968	 */
969	newcon = nodeid2con(nodeid, GFP_NOFS);
970	if (!newcon) {
971		result = -ENOMEM;
972		goto accept_err;
973	}
974	mutex_lock_nested(&newcon->sock_mutex, 1);
975	if (newcon->sock) {
976		struct connection *othercon = newcon->othercon;
977
978		if (!othercon) {
979			othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
980			if (!othercon) {
981				log_print("failed to allocate incoming socket");
982				mutex_unlock(&newcon->sock_mutex);
983				result = -ENOMEM;
984				goto accept_err;
985			}
986			othercon->nodeid = nodeid;
987			othercon->rx_action = receive_from_sock;
988			mutex_init(&othercon->sock_mutex);
989			INIT_WORK(&othercon->swork, process_send_sockets);
990			INIT_WORK(&othercon->rwork, process_recv_sockets);
991			set_bit(CF_IS_OTHERCON, &othercon->flags);
992		}
993		if (!othercon->sock) {
994			newcon->othercon = othercon;
995			othercon->sock = newsock;
996			newsock->sk->sk_user_data = othercon;
997			add_sock(newsock, othercon);
998			addcon = othercon;
999		}
1000		else {
1001			printk("Extra connection from node %d attempted\n", nodeid);
1002			result = -EAGAIN;
1003			mutex_unlock(&newcon->sock_mutex);
1004			goto accept_err;
1005		}
1006	}
1007	else {
1008		newsock->sk->sk_user_data = newcon;
1009		newcon->rx_action = receive_from_sock;
1010		add_sock(newsock, newcon);
1011		addcon = newcon;
1012	}
1013
1014	mutex_unlock(&newcon->sock_mutex);
1015
1016	/*
1017	 * Add it to the active queue in case we got data
1018	 * between processing the accept adding the socket
1019	 * to the read_sockets list
1020	 */
1021	if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
1022		queue_work(recv_workqueue, &addcon->rwork);
1023	mutex_unlock(&con->sock_mutex);
1024
1025	return 0;
1026
1027accept_err:
1028	mutex_unlock(&con->sock_mutex);
1029	sock_release(newsock);
1030
1031	if (result != -EAGAIN)
1032		log_print("error accepting connection from node: %d", result);
1033	return result;
1034}
1035
1036static void free_entry(struct writequeue_entry *e)
1037{
1038	__free_page(e->page);
1039	kfree(e);
1040}
1041
1042/*
1043 * writequeue_entry_complete - try to delete and free write queue entry
1044 * @e: write queue entry to try to delete
1045 * @completed: bytes completed
1046 *
1047 * writequeue_lock must be held.
1048 */
1049static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1050{
1051	e->offset += completed;
1052	e->len -= completed;
1053
1054	if (e->len == 0 && e->users == 0) {
1055		list_del(&e->list);
1056		free_entry(e);
1057	}
1058}
1059
1060/* Initiate an SCTP association.
1061   This is a special case of send_to_sock() in that we don't yet have a
1062   peeled-off socket for this association, so we use the listening socket
1063   and add the primary IP address of the remote node.
1064 */
1065static void sctp_init_assoc(struct connection *con)
1066{
1067	struct sockaddr_storage rem_addr;
1068	char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
1069	struct msghdr outmessage;
1070	struct cmsghdr *cmsg;
1071	struct sctp_sndrcvinfo *sinfo;
1072	struct connection *base_con;
1073	struct writequeue_entry *e;
1074	int len, offset;
1075	int ret;
1076	int addrlen;
1077	struct kvec iov[1];
1078
1079	mutex_lock(&con->sock_mutex);
1080	if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
1081		goto unlock;
1082
1083	if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr,
1084			   con->try_new_addr)) {
1085		log_print("no address for nodeid %d", con->nodeid);
1086		goto unlock;
1087	}
1088	base_con = nodeid2con(0, 0);
1089	BUG_ON(base_con == NULL);
1090
1091	make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
1092
1093	outmessage.msg_name = &rem_addr;
1094	outmessage.msg_namelen = addrlen;
1095	outmessage.msg_control = outcmsg;
1096	outmessage.msg_controllen = sizeof(outcmsg);
1097	outmessage.msg_flags = MSG_EOR;
1098
1099	spin_lock(&con->writequeue_lock);
1100
1101	if (list_empty(&con->writequeue)) {
1102		spin_unlock(&con->writequeue_lock);
1103		log_print("writequeue empty for nodeid %d", con->nodeid);
1104		goto unlock;
1105	}
1106
1107	e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
1108	len = e->len;
1109	offset = e->offset;
1110
1111	/* Send the first block off the write queue */
1112	iov[0].iov_base = page_address(e->page)+offset;
1113	iov[0].iov_len = len;
1114	spin_unlock(&con->writequeue_lock);
1115
1116	if (rem_addr.ss_family == AF_INET) {
1117		struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr;
1118		log_print("Trying to connect to %pI4", &sin->sin_addr.s_addr);
1119	} else {
1120		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&rem_addr;
1121		log_print("Trying to connect to %pI6", &sin6->sin6_addr);
1122	}
1123
1124	cmsg = CMSG_FIRSTHDR(&outmessage);
1125	cmsg->cmsg_level = IPPROTO_SCTP;
1126	cmsg->cmsg_type = SCTP_SNDRCV;
1127	cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
1128	sinfo = CMSG_DATA(cmsg);
1129	memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
1130	sinfo->sinfo_ppid = cpu_to_le32(con->nodeid);
1131	outmessage.msg_controllen = cmsg->cmsg_len;
1132	sinfo->sinfo_flags |= SCTP_ADDR_OVER;
1133
1134	ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len);
1135	if (ret < 0) {
1136		log_print("Send first packet to node %d failed: %d",
1137			  con->nodeid, ret);
1138
1139		/* Try again later */
1140		clear_bit(CF_CONNECT_PENDING, &con->flags);
1141		clear_bit(CF_INIT_PENDING, &con->flags);
1142	}
1143	else {
1144		spin_lock(&con->writequeue_lock);
1145		writequeue_entry_complete(e, ret);
1146		spin_unlock(&con->writequeue_lock);
1147	}
1148
1149unlock:
1150	mutex_unlock(&con->sock_mutex);
1151}
1152
1153/* Connect a new socket to its peer */
1154static void tcp_connect_to_sock(struct connection *con)
1155{
1156	struct sockaddr_storage saddr, src_addr;
1157	int addr_len;
1158	struct socket *sock = NULL;
1159	int one = 1;
1160	int result;
1161
1162	if (con->nodeid == 0) {
1163		log_print("attempt to connect sock 0 foiled");
1164		return;
1165	}
1166
1167	mutex_lock(&con->sock_mutex);
1168	if (con->retries++ > MAX_CONNECT_RETRIES)
1169		goto out;
1170
1171	/* Some odd races can cause double-connects, ignore them */
1172	if (con->sock)
1173		goto out;
1174
1175	/* Create a socket to communicate with */
1176	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1177				  IPPROTO_TCP, &sock);
1178	if (result < 0)
1179		goto out_err;
1180
1181	memset(&saddr, 0, sizeof(saddr));
1182	result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
1183	if (result < 0) {
1184		log_print("no address for nodeid %d", con->nodeid);
1185		goto out_err;
1186	}
1187
1188	sock->sk->sk_user_data = con;
1189	con->rx_action = receive_from_sock;
1190	con->connect_action = tcp_connect_to_sock;
1191	add_sock(sock, con);
1192
1193	/* Bind to our cluster-known address connecting to avoid
1194	   routing problems */
1195	memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1196	make_sockaddr(&src_addr, 0, &addr_len);
1197	result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1198				 addr_len);
1199	if (result < 0) {
1200		log_print("could not bind for connect: %d", result);
1201		/* This *may* not indicate a critical error */
1202	}
1203
1204	make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1205
1206	log_print("connecting to %d", con->nodeid);
1207
1208	/* Turn off Nagle's algorithm */
1209	kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1210			  sizeof(one));
1211
1212	result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1213				   O_NONBLOCK);
1214	if (result == -EINPROGRESS)
1215		result = 0;
1216	if (result == 0)
1217		goto out;
1218
1219out_err:
1220	if (con->sock) {
1221		sock_release(con->sock);
1222		con->sock = NULL;
1223	} else if (sock) {
1224		sock_release(sock);
1225	}
1226	/*
1227	 * Some errors are fatal and this list might need adjusting. For other
1228	 * errors we try again until the max number of retries is reached.
1229	 */
1230	if (result != -EHOSTUNREACH &&
1231	    result != -ENETUNREACH &&
1232	    result != -ENETDOWN &&
1233	    result != -EINVAL &&
1234	    result != -EPROTONOSUPPORT) {
1235		log_print("connect %d try %d error %d", con->nodeid,
1236			  con->retries, result);
1237		mutex_unlock(&con->sock_mutex);
1238		msleep(1000);
1239		lowcomms_connect_sock(con);
1240		return;
1241	}
1242out:
1243	mutex_unlock(&con->sock_mutex);
1244	return;
1245}
1246
1247static struct socket *tcp_create_listen_sock(struct connection *con,
1248					     struct sockaddr_storage *saddr)
1249{
1250	struct socket *sock = NULL;
1251	int result = 0;
1252	int one = 1;
1253	int addr_len;
1254
1255	if (dlm_local_addr[0]->ss_family == AF_INET)
1256		addr_len = sizeof(struct sockaddr_in);
1257	else
1258		addr_len = sizeof(struct sockaddr_in6);
1259
1260	/* Create a socket to communicate with */
1261	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1262				  IPPROTO_TCP, &sock);
1263	if (result < 0) {
1264		log_print("Can't create listening comms socket");
1265		goto create_out;
1266	}
1267
1268	/* Turn off Nagle's algorithm */
1269	kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1270			  sizeof(one));
1271
1272	result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1273				   (char *)&one, sizeof(one));
1274
1275	if (result < 0) {
1276		log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1277	}
1278	con->rx_action = tcp_accept_from_sock;
1279	con->connect_action = tcp_connect_to_sock;
1280
1281	/* Bind to our port */
1282	make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1283	result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1284	if (result < 0) {
1285		log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1286		sock_release(sock);
1287		sock = NULL;
1288		con->sock = NULL;
1289		goto create_out;
1290	}
1291	result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1292				 (char *)&one, sizeof(one));
1293	if (result < 0) {
1294		log_print("Set keepalive failed: %d", result);
1295	}
1296
1297	result = sock->ops->listen(sock, 5);
1298	if (result < 0) {
1299		log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1300		sock_release(sock);
1301		sock = NULL;
1302		goto create_out;
1303	}
1304
1305create_out:
1306	return sock;
1307}
1308
1309/* Get local addresses */
1310static void init_local(void)
1311{
1312	struct sockaddr_storage sas, *addr;
1313	int i;
1314
1315	dlm_local_count = 0;
1316	for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1317		if (dlm_our_addr(&sas, i))
1318			break;
1319
1320		addr = kmalloc(sizeof(*addr), GFP_NOFS);
1321		if (!addr)
1322			break;
1323		memcpy(addr, &sas, sizeof(*addr));
1324		dlm_local_addr[dlm_local_count++] = addr;
1325	}
1326}
1327
1328/* Bind to an IP address. SCTP allows multiple address so it can do
1329   multi-homing */
1330static int add_sctp_bind_addr(struct connection *sctp_con,
1331			      struct sockaddr_storage *addr,
1332			      int addr_len, int num)
1333{
1334	int result = 0;
1335
1336	if (num == 1)
1337		result = kernel_bind(sctp_con->sock,
1338				     (struct sockaddr *) addr,
1339				     addr_len);
1340	else
1341		result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
1342					   SCTP_SOCKOPT_BINDX_ADD,
1343					   (char *)addr, addr_len);
1344
1345	if (result < 0)
1346		log_print("Can't bind to port %d addr number %d",
1347			  dlm_config.ci_tcp_port, num);
1348
1349	return result;
1350}
1351
1352/* Initialise SCTP socket and bind to all interfaces */
1353static int sctp_listen_for_all(void)
1354{
1355	struct socket *sock = NULL;
1356	struct sockaddr_storage localaddr;
1357	struct sctp_event_subscribe subscribe;
1358	int result = -EINVAL, num = 1, i, addr_len;
1359	struct connection *con = nodeid2con(0, GFP_NOFS);
1360	int bufsize = NEEDED_RMEM;
1361	int one = 1;
1362
1363	if (!con)
1364		return -ENOMEM;
1365
1366	log_print("Using SCTP for communications");
1367
1368	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
1369				  IPPROTO_SCTP, &sock);
1370	if (result < 0) {
1371		log_print("Can't create comms socket, check SCTP is loaded");
1372		goto out;
1373	}
1374
1375	/* Listen for events */
1376	memset(&subscribe, 0, sizeof(subscribe));
1377	subscribe.sctp_data_io_event = 1;
1378	subscribe.sctp_association_event = 1;
1379	subscribe.sctp_send_failure_event = 1;
1380	subscribe.sctp_shutdown_event = 1;
1381	subscribe.sctp_partial_delivery_event = 1;
1382
1383	result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1384				 (char *)&bufsize, sizeof(bufsize));
1385	if (result)
1386		log_print("Error increasing buffer space on socket %d", result);
1387
1388	result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
1389				   (char *)&subscribe, sizeof(subscribe));
1390	if (result < 0) {
1391		log_print("Failed to set SCTP_EVENTS on socket: result=%d",
1392			  result);
1393		goto create_delsock;
1394	}
1395
1396	result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
1397				   sizeof(one));
1398	if (result < 0)
1399		log_print("Could not set SCTP NODELAY error %d\n", result);
1400
1401	/* Init con struct */
1402	sock->sk->sk_user_data = con;
1403	con->sock = sock;
1404	con->sock->sk->sk_data_ready = lowcomms_data_ready;
1405	con->rx_action = receive_from_sock;
1406	con->connect_action = sctp_init_assoc;
1407
1408	/* Bind to all interfaces. */
1409	for (i = 0; i < dlm_local_count; i++) {
1410		memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1411		make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
1412
1413		result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
1414		if (result)
1415			goto create_delsock;
1416		++num;
1417	}
1418
1419	result = sock->ops->listen(sock, 5);
1420	if (result < 0) {
1421		log_print("Can't set socket listening");
1422		goto create_delsock;
1423	}
1424
1425	return 0;
1426
1427create_delsock:
1428	sock_release(sock);
1429	con->sock = NULL;
1430out:
1431	return result;
1432}
1433
1434static int tcp_listen_for_all(void)
1435{
1436	struct socket *sock = NULL;
1437	struct connection *con = nodeid2con(0, GFP_NOFS);
1438	int result = -EINVAL;
1439
1440	if (!con)
1441		return -ENOMEM;
1442
1443	/* We don't support multi-homed hosts */
1444	if (dlm_local_addr[1] != NULL) {
1445		log_print("TCP protocol can't handle multi-homed hosts, "
1446			  "try SCTP");
1447		return -EINVAL;
1448	}
1449
1450	log_print("Using TCP for communications");
1451
1452	sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1453	if (sock) {
1454		add_sock(sock, con);
1455		result = 0;
1456	}
1457	else {
1458		result = -EADDRINUSE;
1459	}
1460
1461	return result;
1462}
1463
1464
1465
1466static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1467						     gfp_t allocation)
1468{
1469	struct writequeue_entry *entry;
1470
1471	entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1472	if (!entry)
1473		return NULL;
1474
1475	entry->page = alloc_page(allocation);
1476	if (!entry->page) {
1477		kfree(entry);
1478		return NULL;
1479	}
1480
1481	entry->offset = 0;
1482	entry->len = 0;
1483	entry->end = 0;
1484	entry->users = 0;
1485	entry->con = con;
1486
1487	return entry;
1488}
1489
1490void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1491{
1492	struct connection *con;
1493	struct writequeue_entry *e;
1494	int offset = 0;
1495
1496	con = nodeid2con(nodeid, allocation);
1497	if (!con)
1498		return NULL;
1499
1500	spin_lock(&con->writequeue_lock);
1501	e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1502	if ((&e->list == &con->writequeue) ||
1503	    (PAGE_CACHE_SIZE - e->end < len)) {
1504		e = NULL;
1505	} else {
1506		offset = e->end;
1507		e->end += len;
1508		e->users++;
1509	}
1510	spin_unlock(&con->writequeue_lock);
1511
1512	if (e) {
1513	got_one:
1514		*ppc = page_address(e->page) + offset;
1515		return e;
1516	}
1517
1518	e = new_writequeue_entry(con, allocation);
1519	if (e) {
1520		spin_lock(&con->writequeue_lock);
1521		offset = e->end;
1522		e->end += len;
1523		e->users++;
1524		list_add_tail(&e->list, &con->writequeue);
1525		spin_unlock(&con->writequeue_lock);
1526		goto got_one;
1527	}
1528	return NULL;
1529}
1530
1531void dlm_lowcomms_commit_buffer(void *mh)
1532{
1533	struct writequeue_entry *e = (struct writequeue_entry *)mh;
1534	struct connection *con = e->con;
1535	int users;
1536
1537	spin_lock(&con->writequeue_lock);
1538	users = --e->users;
1539	if (users)
1540		goto out;
1541	e->len = e->end - e->offset;
1542	spin_unlock(&con->writequeue_lock);
1543
1544	if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
1545		queue_work(send_workqueue, &con->swork);
1546	}
1547	return;
1548
1549out:
1550	spin_unlock(&con->writequeue_lock);
1551	return;
1552}
1553
1554/* Send a message */
1555static void send_to_sock(struct connection *con)
1556{
1557	int ret = 0;
1558	const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1559	struct writequeue_entry *e;
1560	int len, offset;
1561	int count = 0;
1562
1563	mutex_lock(&con->sock_mutex);
1564	if (con->sock == NULL)
1565		goto out_connect;
1566
1567	spin_lock(&con->writequeue_lock);
1568	for (;;) {
1569		e = list_entry(con->writequeue.next, struct writequeue_entry,
1570			       list);
1571		if ((struct list_head *) e == &con->writequeue)
1572			break;
1573
1574		len = e->len;
1575		offset = e->offset;
1576		BUG_ON(len == 0 && e->users == 0);
1577		spin_unlock(&con->writequeue_lock);
1578
1579		ret = 0;
1580		if (len) {
1581			ret = kernel_sendpage(con->sock, e->page, offset, len,
1582					      msg_flags);
1583			if (ret == -EAGAIN || ret == 0) {
1584				if (ret == -EAGAIN &&
1585				    test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) &&
1586				    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1587					/* Notify TCP that we're limited by the
1588					 * application window size.
1589					 */
1590					set_bit(SOCK_NOSPACE, &con->sock->flags);
1591					con->sock->sk->sk_write_pending++;
1592				}
1593				cond_resched();
1594				goto out;
1595			} else if (ret < 0)
1596				goto send_error;
1597		}
1598
1599		/* Don't starve people filling buffers */
1600		if (++count >= MAX_SEND_MSG_COUNT) {
1601			cond_resched();
1602			count = 0;
1603		}
1604
1605		spin_lock(&con->writequeue_lock);
1606		writequeue_entry_complete(e, ret);
1607	}
1608	spin_unlock(&con->writequeue_lock);
1609out:
1610	mutex_unlock(&con->sock_mutex);
1611	return;
1612
1613send_error:
1614	mutex_unlock(&con->sock_mutex);
1615	close_connection(con, false);
1616	lowcomms_connect_sock(con);
1617	return;
1618
1619out_connect:
1620	mutex_unlock(&con->sock_mutex);
1621	if (!test_bit(CF_INIT_PENDING, &con->flags))
1622		lowcomms_connect_sock(con);
1623}
1624
1625static void clean_one_writequeue(struct connection *con)
1626{
1627	struct writequeue_entry *e, *safe;
1628
1629	spin_lock(&con->writequeue_lock);
1630	list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1631		list_del(&e->list);
1632		free_entry(e);
1633	}
1634	spin_unlock(&con->writequeue_lock);
1635}
1636
1637/* Called from recovery when it knows that a node has
1638   left the cluster */
1639int dlm_lowcomms_close(int nodeid)
1640{
1641	struct connection *con;
1642	struct dlm_node_addr *na;
1643
1644	log_print("closing connection to node %d", nodeid);
1645	con = nodeid2con(nodeid, 0);
1646	if (con) {
1647		clear_bit(CF_CONNECT_PENDING, &con->flags);
1648		clear_bit(CF_WRITE_PENDING, &con->flags);
1649		set_bit(CF_CLOSE, &con->flags);
1650		if (cancel_work_sync(&con->swork))
1651			log_print("canceled swork for node %d", nodeid);
1652		if (cancel_work_sync(&con->rwork))
1653			log_print("canceled rwork for node %d", nodeid);
1654		clean_one_writequeue(con);
1655		close_connection(con, true);
1656	}
1657
1658	spin_lock(&dlm_node_addrs_spin);
1659	na = find_node_addr(nodeid);
1660	if (na) {
1661		list_del(&na->list);
1662		while (na->addr_count--)
1663			kfree(na->addr[na->addr_count]);
1664		kfree(na);
1665	}
1666	spin_unlock(&dlm_node_addrs_spin);
1667
1668	return 0;
1669}
1670
1671/* Receive workqueue function */
1672static void process_recv_sockets(struct work_struct *work)
1673{
1674	struct connection *con = container_of(work, struct connection, rwork);
1675	int err;
1676
1677	clear_bit(CF_READ_PENDING, &con->flags);
1678	do {
1679		err = con->rx_action(con);
1680	} while (!err);
1681}
1682
1683/* Send workqueue function */
1684static void process_send_sockets(struct work_struct *work)
1685{
1686	struct connection *con = container_of(work, struct connection, swork);
1687
1688	if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
1689		con->connect_action(con);
1690		set_bit(CF_WRITE_PENDING, &con->flags);
1691	}
1692	if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
1693		send_to_sock(con);
1694}
1695
1696
1697/* Discard all entries on the write queues */
1698static void clean_writequeues(void)
1699{
1700	foreach_conn(clean_one_writequeue);
1701}
1702
1703static void work_stop(void)
1704{
1705	destroy_workqueue(recv_workqueue);
1706	destroy_workqueue(send_workqueue);
1707}
1708
1709static int work_start(void)
1710{
1711	recv_workqueue = alloc_workqueue("dlm_recv",
1712					 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1713	if (!recv_workqueue) {
1714		log_print("can't start dlm_recv");
1715		return -ENOMEM;
1716	}
1717
1718	send_workqueue = alloc_workqueue("dlm_send",
1719					 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1720	if (!send_workqueue) {
1721		log_print("can't start dlm_send");
1722		destroy_workqueue(recv_workqueue);
1723		return -ENOMEM;
1724	}
1725
1726	return 0;
1727}
1728
1729static void stop_conn(struct connection *con)
1730{
1731	con->flags |= 0x0F;
1732	if (con->sock && con->sock->sk)
1733		con->sock->sk->sk_user_data = NULL;
1734}
1735
1736static void free_conn(struct connection *con)
1737{
1738	close_connection(con, true);
1739	if (con->othercon)
1740		kmem_cache_free(con_cache, con->othercon);
1741	hlist_del(&con->list);
1742	kmem_cache_free(con_cache, con);
1743}
1744
1745void dlm_lowcomms_stop(void)
1746{
1747	/* Set all the flags to prevent any
1748	   socket activity.
1749	*/
1750	mutex_lock(&connections_lock);
1751	dlm_allow_conn = 0;
1752	foreach_conn(stop_conn);
1753	mutex_unlock(&connections_lock);
1754
1755	work_stop();
1756
1757	mutex_lock(&connections_lock);
1758	clean_writequeues();
1759
1760	foreach_conn(free_conn);
1761
1762	mutex_unlock(&connections_lock);
1763	kmem_cache_destroy(con_cache);
1764}
1765
1766int dlm_lowcomms_start(void)
1767{
1768	int error = -EINVAL;
1769	struct connection *con;
1770	int i;
1771
1772	for (i = 0; i < CONN_HASH_SIZE; i++)
1773		INIT_HLIST_HEAD(&connection_hash[i]);
1774
1775	init_local();
1776	if (!dlm_local_count) {
1777		error = -ENOTCONN;
1778		log_print("no local IP address has been set");
1779		goto fail;
1780	}
1781
1782	error = -ENOMEM;
1783	con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1784				      __alignof__(struct connection), 0,
1785				      NULL);
1786	if (!con_cache)
1787		goto fail;
1788
1789	error = work_start();
1790	if (error)
1791		goto fail_destroy;
1792
1793	dlm_allow_conn = 1;
1794
1795	/* Start listening */
1796	if (dlm_config.ci_protocol == 0)
1797		error = tcp_listen_for_all();
1798	else
1799		error = sctp_listen_for_all();
1800	if (error)
1801		goto fail_unlisten;
1802
1803	return 0;
1804
1805fail_unlisten:
1806	dlm_allow_conn = 0;
1807	con = nodeid2con(0,0);
1808	if (con) {
1809		close_connection(con, false);
1810		kmem_cache_free(con_cache, con);
1811	}
1812fail_destroy:
1813	kmem_cache_destroy(con_cache);
1814fail:
1815	return error;
1816}
1817
1818void dlm_lowcomms_exit(void)
1819{
1820	struct dlm_node_addr *na, *safe;
1821
1822	spin_lock(&dlm_node_addrs_spin);
1823	list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1824		list_del(&na->list);
1825		while (na->addr_count--)
1826			kfree(na->addr[na->addr_count]);
1827		kfree(na);
1828	}
1829	spin_unlock(&dlm_node_addrs_spin);
1830}
1831