1/******************************************************************************
2*******************************************************************************
3**
4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
6**
7**  This copyrighted material is made available to anyone wishing to use,
8**  modify, copy, or redistribute it subject to the terms and conditions
9**  of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14/*
15 * lowcomms.c
16 *
17 * This is the "low-level" comms layer.
18 *
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
21 *
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is its
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
28 *
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
38 *
39 * lowcomms will choose to use either TCP or SCTP as its transport layer
40 * depending on the configuration variable 'protocol'. This should be set
41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
42 * cluster-wide mechanism as it must be the same on all nodes of the cluster
43 * for the DLM to function.
44 *
45 */
46
47#include <asm/ioctls.h>
48#include <net/sock.h>
49#include <net/tcp.h>
50#include <linux/pagemap.h>
51#include <linux/file.h>
52#include <linux/mutex.h>
53#include <linux/sctp.h>
54#include <linux/slab.h>
55#include <net/sctp/sctp.h>
56#include <net/ipv6.h>
57
58#include "dlm_internal.h"
59#include "lowcomms.h"
60#include "midcomms.h"
61#include "config.h"
62
63#define NEEDED_RMEM (4*1024*1024)
64#define CONN_HASH_SIZE 32
65
66/* Number of messages to send before rescheduling */
67#define MAX_SEND_MSG_COUNT 25
68
69struct cbuf {
70	unsigned int base;
71	unsigned int len;
72	unsigned int mask;
73};
74
75static void cbuf_add(struct cbuf *cb, int n)
76{
77	cb->len += n;
78}
79
80static int cbuf_data(struct cbuf *cb)
81{
82	return ((cb->base + cb->len) & cb->mask);
83}
84
85static void cbuf_init(struct cbuf *cb, int size)
86{
87	cb->base = cb->len = 0;
88	cb->mask = size-1;
89}
90
91static void cbuf_eat(struct cbuf *cb, int n)
92{
93	cb->len  -= n;
94	cb->base += n;
95	cb->base &= cb->mask;
96}
97
98static bool cbuf_empty(struct cbuf *cb)
99{
100	return cb->len == 0;
101}
102
103struct connection {
104	struct socket *sock;	/* NULL if not connected */
105	uint32_t nodeid;	/* So we know who we are in the list */
106	struct mutex sock_mutex;
107	unsigned long flags;
108#define CF_READ_PENDING 1
109#define CF_WRITE_PENDING 2
110#define CF_CONNECT_PENDING 3
111#define CF_INIT_PENDING 4
112#define CF_IS_OTHERCON 5
113#define CF_CLOSE 6
114#define CF_APP_LIMITED 7
115	struct list_head writequeue;  /* List of outgoing writequeue_entries */
116	spinlock_t writequeue_lock;
117	int (*rx_action) (struct connection *);	/* What to do when active */
118	void (*connect_action) (struct connection *);	/* What to do to connect */
119	struct page *rx_page;
120	struct cbuf cb;
121	int retries;
122#define MAX_CONNECT_RETRIES 3
123	struct hlist_node list;
124	struct connection *othercon;
125	struct work_struct rwork; /* Receive workqueue */
126	struct work_struct swork; /* Send workqueue */
127	void (*orig_error_report)(struct sock *sk);
128};
129#define sock2con(x) ((struct connection *)(x)->sk_user_data)
130
131/* An entry waiting to be sent */
132struct writequeue_entry {
133	struct list_head list;
134	struct page *page;
135	int offset;
136	int len;
137	int end;
138	int users;
139	struct connection *con;
140};
141
142struct dlm_node_addr {
143	struct list_head list;
144	int nodeid;
145	int addr_count;
146	int curr_addr_index;
147	struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
148};
149
150static LIST_HEAD(dlm_node_addrs);
151static DEFINE_SPINLOCK(dlm_node_addrs_spin);
152
153static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
154static int dlm_local_count;
155static int dlm_allow_conn;
156
157/* Work queues */
158static struct workqueue_struct *recv_workqueue;
159static struct workqueue_struct *send_workqueue;
160
161static struct hlist_head connection_hash[CONN_HASH_SIZE];
162static DEFINE_MUTEX(connections_lock);
163static struct kmem_cache *con_cache;
164
165static void process_recv_sockets(struct work_struct *work);
166static void process_send_sockets(struct work_struct *work);
167
168
169/* This is deliberately very simple because most clusters have simple
170   sequential nodeids, so we should be able to go straight to a connection
171   struct in the array */
172static inline int nodeid_hash(int nodeid)
173{
174	return nodeid & (CONN_HASH_SIZE-1);
175}
176
177static struct connection *__find_con(int nodeid)
178{
179	int r;
180	struct connection *con;
181
182	r = nodeid_hash(nodeid);
183
184	hlist_for_each_entry(con, &connection_hash[r], list) {
185		if (con->nodeid == nodeid)
186			return con;
187	}
188	return NULL;
189}
190
191/*
192 * If 'allocation' is zero then we don't attempt to create a new
193 * connection structure for this node.
194 */
195static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
196{
197	struct connection *con = NULL;
198	int r;
199
200	con = __find_con(nodeid);
201	if (con || !alloc)
202		return con;
203
204	con = kmem_cache_zalloc(con_cache, alloc);
205	if (!con)
206		return NULL;
207
208	r = nodeid_hash(nodeid);
209	hlist_add_head(&con->list, &connection_hash[r]);
210
211	con->nodeid = nodeid;
212	mutex_init(&con->sock_mutex);
213	INIT_LIST_HEAD(&con->writequeue);
214	spin_lock_init(&con->writequeue_lock);
215	INIT_WORK(&con->swork, process_send_sockets);
216	INIT_WORK(&con->rwork, process_recv_sockets);
217
218	/* Setup action pointers for child sockets */
219	if (con->nodeid) {
220		struct connection *zerocon = __find_con(0);
221
222		con->connect_action = zerocon->connect_action;
223		if (!con->rx_action)
224			con->rx_action = zerocon->rx_action;
225	}
226
227	return con;
228}
229
230/* Loop round all connections */
231static void foreach_conn(void (*conn_func)(struct connection *c))
232{
233	int i;
234	struct hlist_node *n;
235	struct connection *con;
236
237	for (i = 0; i < CONN_HASH_SIZE; i++) {
238		hlist_for_each_entry_safe(con, n, &connection_hash[i], list)
239			conn_func(con);
240	}
241}
242
243static struct connection *nodeid2con(int nodeid, gfp_t allocation)
244{
245	struct connection *con;
246
247	mutex_lock(&connections_lock);
248	con = __nodeid2con(nodeid, allocation);
249	mutex_unlock(&connections_lock);
250
251	return con;
252}
253
254static struct dlm_node_addr *find_node_addr(int nodeid)
255{
256	struct dlm_node_addr *na;
257
258	list_for_each_entry(na, &dlm_node_addrs, list) {
259		if (na->nodeid == nodeid)
260			return na;
261	}
262	return NULL;
263}
264
265static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
266{
267	switch (x->ss_family) {
268	case AF_INET: {
269		struct sockaddr_in *sinx = (struct sockaddr_in *)x;
270		struct sockaddr_in *siny = (struct sockaddr_in *)y;
271		if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
272			return 0;
273		if (sinx->sin_port != siny->sin_port)
274			return 0;
275		break;
276	}
277	case AF_INET6: {
278		struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
279		struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
280		if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
281			return 0;
282		if (sinx->sin6_port != siny->sin6_port)
283			return 0;
284		break;
285	}
286	default:
287		return 0;
288	}
289	return 1;
290}
291
292static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
293			  struct sockaddr *sa_out, bool try_new_addr)
294{
295	struct sockaddr_storage sas;
296	struct dlm_node_addr *na;
297
298	if (!dlm_local_count)
299		return -1;
300
301	spin_lock(&dlm_node_addrs_spin);
302	na = find_node_addr(nodeid);
303	if (na && na->addr_count) {
304		memcpy(&sas, na->addr[na->curr_addr_index],
305		       sizeof(struct sockaddr_storage));
306
307		if (try_new_addr) {
308			na->curr_addr_index++;
309			if (na->curr_addr_index == na->addr_count)
310				na->curr_addr_index = 0;
311		}
312	}
313	spin_unlock(&dlm_node_addrs_spin);
314
315	if (!na)
316		return -EEXIST;
317
318	if (!na->addr_count)
319		return -ENOENT;
320
321	if (sas_out)
322		memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
323
324	if (!sa_out)
325		return 0;
326
327	if (dlm_local_addr[0]->ss_family == AF_INET) {
328		struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
329		struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
330		ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
331	} else {
332		struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
333		struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
334		ret6->sin6_addr = in6->sin6_addr;
335	}
336
337	return 0;
338}
339
340static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
341{
342	struct dlm_node_addr *na;
343	int rv = -EEXIST;
344	int addr_i;
345
346	spin_lock(&dlm_node_addrs_spin);
347	list_for_each_entry(na, &dlm_node_addrs, list) {
348		if (!na->addr_count)
349			continue;
350
351		for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
352			if (addr_compare(na->addr[addr_i], addr)) {
353				*nodeid = na->nodeid;
354				rv = 0;
355				goto unlock;
356			}
357		}
358	}
359unlock:
360	spin_unlock(&dlm_node_addrs_spin);
361	return rv;
362}
363
364int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
365{
366	struct sockaddr_storage *new_addr;
367	struct dlm_node_addr *new_node, *na;
368
369	new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
370	if (!new_node)
371		return -ENOMEM;
372
373	new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
374	if (!new_addr) {
375		kfree(new_node);
376		return -ENOMEM;
377	}
378
379	memcpy(new_addr, addr, len);
380
381	spin_lock(&dlm_node_addrs_spin);
382	na = find_node_addr(nodeid);
383	if (!na) {
384		new_node->nodeid = nodeid;
385		new_node->addr[0] = new_addr;
386		new_node->addr_count = 1;
387		list_add(&new_node->list, &dlm_node_addrs);
388		spin_unlock(&dlm_node_addrs_spin);
389		return 0;
390	}
391
392	if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
393		spin_unlock(&dlm_node_addrs_spin);
394		kfree(new_addr);
395		kfree(new_node);
396		return -ENOSPC;
397	}
398
399	na->addr[na->addr_count++] = new_addr;
400	spin_unlock(&dlm_node_addrs_spin);
401	kfree(new_node);
402	return 0;
403}
404
405/* Data available on socket or listen socket received a connect */
406static void lowcomms_data_ready(struct sock *sk)
407{
408	struct connection *con = sock2con(sk);
409	if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
410		queue_work(recv_workqueue, &con->rwork);
411}
412
413static void lowcomms_write_space(struct sock *sk)
414{
415	struct connection *con = sock2con(sk);
416
417	if (!con)
418		return;
419
420	clear_bit(SOCK_NOSPACE, &con->sock->flags);
421
422	if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
423		con->sock->sk->sk_write_pending--;
424		clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
425	}
426
427	if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
428		queue_work(send_workqueue, &con->swork);
429}
430
431static inline void lowcomms_connect_sock(struct connection *con)
432{
433	if (test_bit(CF_CLOSE, &con->flags))
434		return;
435	if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
436		queue_work(send_workqueue, &con->swork);
437}
438
439static void lowcomms_state_change(struct sock *sk)
440{
441	/* SCTP layer is not calling sk_data_ready when the connection
442	 * is done, so we catch the signal through here. Also, it
443	 * doesn't switch socket state when entering shutdown, so we
444	 * skip the write in that case.
445	 */
446	if (sk->sk_shutdown) {
447		if (sk->sk_shutdown == RCV_SHUTDOWN)
448			lowcomms_data_ready(sk);
449	} else if (sk->sk_state == TCP_ESTABLISHED) {
450		lowcomms_write_space(sk);
451	}
452}
453
454int dlm_lowcomms_connect_node(int nodeid)
455{
456	struct connection *con;
457
458	if (nodeid == dlm_our_nodeid())
459		return 0;
460
461	con = nodeid2con(nodeid, GFP_NOFS);
462	if (!con)
463		return -ENOMEM;
464	lowcomms_connect_sock(con);
465	return 0;
466}
467
468static void lowcomms_error_report(struct sock *sk)
469{
470	struct connection *con = sock2con(sk);
471	struct sockaddr_storage saddr;
472
473	if (nodeid_to_addr(con->nodeid, &saddr, NULL, false)) {
474		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
475				   "sending to node %d, port %d, "
476				   "sk_err=%d/%d\n", dlm_our_nodeid(),
477				   con->nodeid, dlm_config.ci_tcp_port,
478				   sk->sk_err, sk->sk_err_soft);
479		return;
480	} else if (saddr.ss_family == AF_INET) {
481		struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr;
482
483		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
484				   "sending to node %d at %pI4, port %d, "
485				   "sk_err=%d/%d\n", dlm_our_nodeid(),
486				   con->nodeid, &sin4->sin_addr.s_addr,
487				   dlm_config.ci_tcp_port, sk->sk_err,
488				   sk->sk_err_soft);
489	} else {
490		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr;
491
492		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
493				   "sending to node %d at %u.%u.%u.%u, "
494				   "port %d, sk_err=%d/%d\n", dlm_our_nodeid(),
495				   con->nodeid, sin6->sin6_addr.s6_addr32[0],
496				   sin6->sin6_addr.s6_addr32[1],
497				   sin6->sin6_addr.s6_addr32[2],
498				   sin6->sin6_addr.s6_addr32[3],
499				   dlm_config.ci_tcp_port, sk->sk_err,
500				   sk->sk_err_soft);
501	}
502	con->orig_error_report(sk);
503}
504
505/* Make a socket active */
506static void add_sock(struct socket *sock, struct connection *con)
507{
508	con->sock = sock;
509
510	/* Install a data_ready callback */
511	con->sock->sk->sk_data_ready = lowcomms_data_ready;
512	con->sock->sk->sk_write_space = lowcomms_write_space;
513	con->sock->sk->sk_state_change = lowcomms_state_change;
514	con->sock->sk->sk_user_data = con;
515	con->sock->sk->sk_allocation = GFP_NOFS;
516	con->orig_error_report = con->sock->sk->sk_error_report;
517	con->sock->sk->sk_error_report = lowcomms_error_report;
518}
519
520/* Add the port number to an IPv6 or 4 sockaddr and return the address
521   length */
522static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
523			  int *addr_len)
524{
525	saddr->ss_family =  dlm_local_addr[0]->ss_family;
526	if (saddr->ss_family == AF_INET) {
527		struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
528		in4_addr->sin_port = cpu_to_be16(port);
529		*addr_len = sizeof(struct sockaddr_in);
530		memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
531	} else {
532		struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
533		in6_addr->sin6_port = cpu_to_be16(port);
534		*addr_len = sizeof(struct sockaddr_in6);
535	}
536	memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
537}
538
539/* Close a remote connection and tidy up */
540static void close_connection(struct connection *con, bool and_other,
541			     bool tx, bool rx)
542{
543	clear_bit(CF_CONNECT_PENDING, &con->flags);
544	clear_bit(CF_WRITE_PENDING, &con->flags);
545	if (tx && cancel_work_sync(&con->swork))
546		log_print("canceled swork for node %d", con->nodeid);
547	if (rx && cancel_work_sync(&con->rwork))
548		log_print("canceled rwork for node %d", con->nodeid);
549
550	mutex_lock(&con->sock_mutex);
551	if (con->sock) {
552		sock_release(con->sock);
553		con->sock = NULL;
554	}
555	if (con->othercon && and_other) {
556		/* Will only re-enter once. */
557		close_connection(con->othercon, false, true, true);
558	}
559	if (con->rx_page) {
560		__free_page(con->rx_page);
561		con->rx_page = NULL;
562	}
563
564	con->retries = 0;
565	mutex_unlock(&con->sock_mutex);
566}
567
568/* Data received from remote end */
569static int receive_from_sock(struct connection *con)
570{
571	int ret = 0;
572	struct msghdr msg = {};
573	struct kvec iov[2];
574	unsigned len;
575	int r;
576	int call_again_soon = 0;
577	int nvec;
578
579	mutex_lock(&con->sock_mutex);
580
581	if (con->sock == NULL) {
582		ret = -EAGAIN;
583		goto out_close;
584	}
585	if (con->nodeid == 0) {
586		ret = -EINVAL;
587		goto out_close;
588	}
589
590	if (con->rx_page == NULL) {
591		/*
592		 * This doesn't need to be atomic, but I think it should
593		 * improve performance if it is.
594		 */
595		con->rx_page = alloc_page(GFP_ATOMIC);
596		if (con->rx_page == NULL)
597			goto out_resched;
598		cbuf_init(&con->cb, PAGE_CACHE_SIZE);
599	}
600
601	/*
602	 * iov[0] is the bit of the circular buffer between the current end
603	 * point (cb.base + cb.len) and the end of the buffer.
604	 */
605	iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
606	iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
607	iov[1].iov_len = 0;
608	nvec = 1;
609
610	/*
611	 * iov[1] is the bit of the circular buffer between the start of the
612	 * buffer and the start of the currently used section (cb.base)
613	 */
614	if (cbuf_data(&con->cb) >= con->cb.base) {
615		iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
616		iov[1].iov_len = con->cb.base;
617		iov[1].iov_base = page_address(con->rx_page);
618		nvec = 2;
619	}
620	len = iov[0].iov_len + iov[1].iov_len;
621
622	r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
623			       MSG_DONTWAIT | MSG_NOSIGNAL);
624	if (ret <= 0)
625		goto out_close;
626	else if (ret == len)
627		call_again_soon = 1;
628
629	cbuf_add(&con->cb, ret);
630	ret = dlm_process_incoming_buffer(con->nodeid,
631					  page_address(con->rx_page),
632					  con->cb.base, con->cb.len,
633					  PAGE_CACHE_SIZE);
634	if (ret == -EBADMSG) {
635		log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d",
636			  page_address(con->rx_page), con->cb.base,
637			  con->cb.len, r);
638	}
639	if (ret < 0)
640		goto out_close;
641	cbuf_eat(&con->cb, ret);
642
643	if (cbuf_empty(&con->cb) && !call_again_soon) {
644		__free_page(con->rx_page);
645		con->rx_page = NULL;
646	}
647
648	if (call_again_soon)
649		goto out_resched;
650	mutex_unlock(&con->sock_mutex);
651	return 0;
652
653out_resched:
654	if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
655		queue_work(recv_workqueue, &con->rwork);
656	mutex_unlock(&con->sock_mutex);
657	return -EAGAIN;
658
659out_close:
660	mutex_unlock(&con->sock_mutex);
661	if (ret != -EAGAIN) {
662		close_connection(con, false, true, false);
663		/* Reconnect when there is something to send */
664	}
665	/* Don't return success if we really got EOF */
666	if (ret == 0)
667		ret = -EAGAIN;
668
669	return ret;
670}
671
672/* Listening socket is busy, accept a connection */
673static int tcp_accept_from_sock(struct connection *con)
674{
675	int result;
676	struct sockaddr_storage peeraddr;
677	struct socket *newsock;
678	int len;
679	int nodeid;
680	struct connection *newcon;
681	struct connection *addcon;
682
683	mutex_lock(&connections_lock);
684	if (!dlm_allow_conn) {
685		mutex_unlock(&connections_lock);
686		return -1;
687	}
688	mutex_unlock(&connections_lock);
689
690	memset(&peeraddr, 0, sizeof(peeraddr));
691	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
692				  SOCK_STREAM, IPPROTO_TCP, &newsock);
693	if (result < 0)
694		return -ENOMEM;
695
696	mutex_lock_nested(&con->sock_mutex, 0);
697
698	result = -ENOTCONN;
699	if (con->sock == NULL)
700		goto accept_err;
701
702	newsock->type = con->sock->type;
703	newsock->ops = con->sock->ops;
704
705	result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
706	if (result < 0)
707		goto accept_err;
708
709	/* Get the connected socket's peer */
710	memset(&peeraddr, 0, sizeof(peeraddr));
711	if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
712				  &len, 2)) {
713		result = -ECONNABORTED;
714		goto accept_err;
715	}
716
717	/* Get the new node's NODEID */
718	make_sockaddr(&peeraddr, 0, &len);
719	if (addr_to_nodeid(&peeraddr, &nodeid)) {
720		unsigned char *b=(unsigned char *)&peeraddr;
721		log_print("connect from non cluster node");
722		print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
723				     b, sizeof(struct sockaddr_storage));
724		sock_release(newsock);
725		mutex_unlock(&con->sock_mutex);
726		return -1;
727	}
728
729	log_print("got connection from %d", nodeid);
730
731	/*  Check to see if we already have a connection to this node. This
732	 *  could happen if the two nodes initiate a connection at roughly
733	 *  the same time and the connections cross on the wire.
734	 *  In this case we store the incoming one in "othercon"
735	 */
736	newcon = nodeid2con(nodeid, GFP_NOFS);
737	if (!newcon) {
738		result = -ENOMEM;
739		goto accept_err;
740	}
741	mutex_lock_nested(&newcon->sock_mutex, 1);
742	if (newcon->sock) {
743		struct connection *othercon = newcon->othercon;
744
745		if (!othercon) {
746			othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
747			if (!othercon) {
748				log_print("failed to allocate incoming socket");
749				mutex_unlock(&newcon->sock_mutex);
750				result = -ENOMEM;
751				goto accept_err;
752			}
753			othercon->nodeid = nodeid;
754			othercon->rx_action = receive_from_sock;
755			mutex_init(&othercon->sock_mutex);
756			INIT_WORK(&othercon->swork, process_send_sockets);
757			INIT_WORK(&othercon->rwork, process_recv_sockets);
758			set_bit(CF_IS_OTHERCON, &othercon->flags);
759		}
760		if (!othercon->sock) {
761			newcon->othercon = othercon;
762			othercon->sock = newsock;
763			newsock->sk->sk_user_data = othercon;
764			add_sock(newsock, othercon);
765			addcon = othercon;
766		}
767		else {
768			printk("Extra connection from node %d attempted\n", nodeid);
769			result = -EAGAIN;
770			mutex_unlock(&newcon->sock_mutex);
771			goto accept_err;
772		}
773	}
774	else {
775		newsock->sk->sk_user_data = newcon;
776		newcon->rx_action = receive_from_sock;
777		add_sock(newsock, newcon);
778		addcon = newcon;
779	}
780
781	mutex_unlock(&newcon->sock_mutex);
782
783	/*
784	 * Add it to the active queue in case we got data
785	 * between processing the accept adding the socket
786	 * to the read_sockets list
787	 */
788	if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
789		queue_work(recv_workqueue, &addcon->rwork);
790	mutex_unlock(&con->sock_mutex);
791
792	return 0;
793
794accept_err:
795	mutex_unlock(&con->sock_mutex);
796	sock_release(newsock);
797
798	if (result != -EAGAIN)
799		log_print("error accepting connection from node: %d", result);
800	return result;
801}
802
803static int sctp_accept_from_sock(struct connection *con)
804{
805	/* Check that the new node is in the lockspace */
806	struct sctp_prim prim;
807	int nodeid;
808	int prim_len, ret;
809	int addr_len;
810	struct connection *newcon;
811	struct connection *addcon;
812	struct socket *newsock;
813
814	mutex_lock(&connections_lock);
815	if (!dlm_allow_conn) {
816		mutex_unlock(&connections_lock);
817		return -1;
818	}
819	mutex_unlock(&connections_lock);
820
821	mutex_lock_nested(&con->sock_mutex, 0);
822
823	ret = kernel_accept(con->sock, &newsock, O_NONBLOCK);
824	if (ret < 0)
825		goto accept_err;
826
827	memset(&prim, 0, sizeof(struct sctp_prim));
828	prim_len = sizeof(struct sctp_prim);
829
830	ret = kernel_getsockopt(newsock, IPPROTO_SCTP, SCTP_PRIMARY_ADDR,
831				(char *)&prim, &prim_len);
832	if (ret < 0) {
833		log_print("getsockopt/sctp_primary_addr failed: %d", ret);
834		goto accept_err;
835	}
836
837	make_sockaddr(&prim.ssp_addr, 0, &addr_len);
838	if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
839		unsigned char *b = (unsigned char *)&prim.ssp_addr;
840
841		log_print("reject connect from unknown addr");
842		print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
843				     b, sizeof(struct sockaddr_storage));
844		goto accept_err;
845	}
846
847	newcon = nodeid2con(nodeid, GFP_NOFS);
848	if (!newcon) {
849		ret = -ENOMEM;
850		goto accept_err;
851	}
852
853	mutex_lock_nested(&newcon->sock_mutex, 1);
854
855	if (newcon->sock) {
856		struct connection *othercon = newcon->othercon;
857
858		if (!othercon) {
859			othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
860			if (!othercon) {
861				log_print("failed to allocate incoming socket");
862				mutex_unlock(&newcon->sock_mutex);
863				ret = -ENOMEM;
864				goto accept_err;
865			}
866			othercon->nodeid = nodeid;
867			othercon->rx_action = receive_from_sock;
868			mutex_init(&othercon->sock_mutex);
869			INIT_WORK(&othercon->swork, process_send_sockets);
870			INIT_WORK(&othercon->rwork, process_recv_sockets);
871			set_bit(CF_IS_OTHERCON, &othercon->flags);
872		}
873		if (!othercon->sock) {
874			newcon->othercon = othercon;
875			othercon->sock = newsock;
876			newsock->sk->sk_user_data = othercon;
877			add_sock(newsock, othercon);
878			addcon = othercon;
879		} else {
880			printk("Extra connection from node %d attempted\n", nodeid);
881			ret = -EAGAIN;
882			mutex_unlock(&newcon->sock_mutex);
883			goto accept_err;
884		}
885	} else {
886		newsock->sk->sk_user_data = newcon;
887		newcon->rx_action = receive_from_sock;
888		add_sock(newsock, newcon);
889		addcon = newcon;
890	}
891
892	log_print("connected to %d", nodeid);
893
894	mutex_unlock(&newcon->sock_mutex);
895
896	/*
897	 * Add it to the active queue in case we got data
898	 * between processing the accept adding the socket
899	 * to the read_sockets list
900	 */
901	if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
902		queue_work(recv_workqueue, &addcon->rwork);
903	mutex_unlock(&con->sock_mutex);
904
905	return 0;
906
907accept_err:
908	mutex_unlock(&con->sock_mutex);
909	if (newsock)
910		sock_release(newsock);
911	if (ret != -EAGAIN)
912		log_print("error accepting connection from node: %d", ret);
913
914	return ret;
915}
916
917static void free_entry(struct writequeue_entry *e)
918{
919	__free_page(e->page);
920	kfree(e);
921}
922
923/*
924 * writequeue_entry_complete - try to delete and free write queue entry
925 * @e: write queue entry to try to delete
926 * @completed: bytes completed
927 *
928 * writequeue_lock must be held.
929 */
930static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
931{
932	e->offset += completed;
933	e->len -= completed;
934
935	if (e->len == 0 && e->users == 0) {
936		list_del(&e->list);
937		free_entry(e);
938	}
939}
940
941/*
942 * sctp_bind_addrs - bind a SCTP socket to all our addresses
943 */
944static int sctp_bind_addrs(struct connection *con, uint16_t port)
945{
946	struct sockaddr_storage localaddr;
947	int i, addr_len, result = 0;
948
949	for (i = 0; i < dlm_local_count; i++) {
950		memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
951		make_sockaddr(&localaddr, port, &addr_len);
952
953		if (!i)
954			result = kernel_bind(con->sock,
955					     (struct sockaddr *)&localaddr,
956					     addr_len);
957		else
958			result = kernel_setsockopt(con->sock, SOL_SCTP,
959						   SCTP_SOCKOPT_BINDX_ADD,
960						   (char *)&localaddr, addr_len);
961
962		if (result < 0) {
963			log_print("Can't bind to %d addr number %d, %d.\n",
964				  port, i + 1, result);
965			break;
966		}
967	}
968	return result;
969}
970
971/* Initiate an SCTP association.
972   This is a special case of send_to_sock() in that we don't yet have a
973   peeled-off socket for this association, so we use the listening socket
974   and add the primary IP address of the remote node.
975 */
976static void sctp_connect_to_sock(struct connection *con)
977{
978	struct sockaddr_storage daddr;
979	int one = 1;
980	int result;
981	int addr_len;
982	struct socket *sock;
983
984	if (con->nodeid == 0) {
985		log_print("attempt to connect sock 0 foiled");
986		return;
987	}
988
989	mutex_lock(&con->sock_mutex);
990
991	/* Some odd races can cause double-connects, ignore them */
992	if (con->retries++ > MAX_CONNECT_RETRIES)
993		goto out;
994
995	if (con->sock) {
996		log_print("node %d already connected.", con->nodeid);
997		goto out;
998	}
999
1000	memset(&daddr, 0, sizeof(daddr));
1001	result = nodeid_to_addr(con->nodeid, &daddr, NULL, true);
1002	if (result < 0) {
1003		log_print("no address for nodeid %d", con->nodeid);
1004		goto out;
1005	}
1006
1007	/* Create a socket to communicate with */
1008	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1009				  SOCK_STREAM, IPPROTO_SCTP, &sock);
1010	if (result < 0)
1011		goto socket_err;
1012
1013	sock->sk->sk_user_data = con;
1014	con->rx_action = receive_from_sock;
1015	con->connect_action = sctp_connect_to_sock;
1016	add_sock(sock, con);
1017
1018	/* Bind to all addresses. */
1019	if (sctp_bind_addrs(con, 0))
1020		goto bind_err;
1021
1022	make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
1023
1024	log_print("connecting to %d", con->nodeid);
1025
1026	/* Turn off Nagle's algorithm */
1027	kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1028			  sizeof(one));
1029
1030	result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
1031				   O_NONBLOCK);
1032	if (result == -EINPROGRESS)
1033		result = 0;
1034	if (result == 0)
1035		goto out;
1036
1037
1038bind_err:
1039	con->sock = NULL;
1040	sock_release(sock);
1041
1042socket_err:
1043	/*
1044	 * Some errors are fatal and this list might need adjusting. For other
1045	 * errors we try again until the max number of retries is reached.
1046	 */
1047	if (result != -EHOSTUNREACH &&
1048	    result != -ENETUNREACH &&
1049	    result != -ENETDOWN &&
1050	    result != -EINVAL &&
1051	    result != -EPROTONOSUPPORT) {
1052		log_print("connect %d try %d error %d", con->nodeid,
1053			  con->retries, result);
1054		mutex_unlock(&con->sock_mutex);
1055		msleep(1000);
1056		clear_bit(CF_CONNECT_PENDING, &con->flags);
1057		lowcomms_connect_sock(con);
1058		return;
1059	}
1060
1061out:
1062	mutex_unlock(&con->sock_mutex);
1063	set_bit(CF_WRITE_PENDING, &con->flags);
1064}
1065
1066/* Connect a new socket to its peer */
1067static void tcp_connect_to_sock(struct connection *con)
1068{
1069	struct sockaddr_storage saddr, src_addr;
1070	int addr_len;
1071	struct socket *sock = NULL;
1072	int one = 1;
1073	int result;
1074
1075	if (con->nodeid == 0) {
1076		log_print("attempt to connect sock 0 foiled");
1077		return;
1078	}
1079
1080	mutex_lock(&con->sock_mutex);
1081	if (con->retries++ > MAX_CONNECT_RETRIES)
1082		goto out;
1083
1084	/* Some odd races can cause double-connects, ignore them */
1085	if (con->sock)
1086		goto out;
1087
1088	/* Create a socket to communicate with */
1089	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1090				  SOCK_STREAM, IPPROTO_TCP, &sock);
1091	if (result < 0)
1092		goto out_err;
1093
1094	memset(&saddr, 0, sizeof(saddr));
1095	result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
1096	if (result < 0) {
1097		log_print("no address for nodeid %d", con->nodeid);
1098		goto out_err;
1099	}
1100
1101	sock->sk->sk_user_data = con;
1102	con->rx_action = receive_from_sock;
1103	con->connect_action = tcp_connect_to_sock;
1104	add_sock(sock, con);
1105
1106	/* Bind to our cluster-known address connecting to avoid
1107	   routing problems */
1108	memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1109	make_sockaddr(&src_addr, 0, &addr_len);
1110	result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1111				 addr_len);
1112	if (result < 0) {
1113		log_print("could not bind for connect: %d", result);
1114		/* This *may* not indicate a critical error */
1115	}
1116
1117	make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1118
1119	log_print("connecting to %d", con->nodeid);
1120
1121	/* Turn off Nagle's algorithm */
1122	kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1123			  sizeof(one));
1124
1125	result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1126				   O_NONBLOCK);
1127	if (result == -EINPROGRESS)
1128		result = 0;
1129	if (result == 0)
1130		goto out;
1131
1132out_err:
1133	if (con->sock) {
1134		sock_release(con->sock);
1135		con->sock = NULL;
1136	} else if (sock) {
1137		sock_release(sock);
1138	}
1139	/*
1140	 * Some errors are fatal and this list might need adjusting. For other
1141	 * errors we try again until the max number of retries is reached.
1142	 */
1143	if (result != -EHOSTUNREACH &&
1144	    result != -ENETUNREACH &&
1145	    result != -ENETDOWN &&
1146	    result != -EINVAL &&
1147	    result != -EPROTONOSUPPORT) {
1148		log_print("connect %d try %d error %d", con->nodeid,
1149			  con->retries, result);
1150		mutex_unlock(&con->sock_mutex);
1151		msleep(1000);
1152		clear_bit(CF_CONNECT_PENDING, &con->flags);
1153		lowcomms_connect_sock(con);
1154		return;
1155	}
1156out:
1157	mutex_unlock(&con->sock_mutex);
1158	set_bit(CF_WRITE_PENDING, &con->flags);
1159	return;
1160}
1161
1162static struct socket *tcp_create_listen_sock(struct connection *con,
1163					     struct sockaddr_storage *saddr)
1164{
1165	struct socket *sock = NULL;
1166	int result = 0;
1167	int one = 1;
1168	int addr_len;
1169
1170	if (dlm_local_addr[0]->ss_family == AF_INET)
1171		addr_len = sizeof(struct sockaddr_in);
1172	else
1173		addr_len = sizeof(struct sockaddr_in6);
1174
1175	/* Create a socket to communicate with */
1176	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1177				  SOCK_STREAM, IPPROTO_TCP, &sock);
1178	if (result < 0) {
1179		log_print("Can't create listening comms socket");
1180		goto create_out;
1181	}
1182
1183	/* Turn off Nagle's algorithm */
1184	kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1185			  sizeof(one));
1186
1187	result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1188				   (char *)&one, sizeof(one));
1189
1190	if (result < 0) {
1191		log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1192	}
1193	con->rx_action = tcp_accept_from_sock;
1194	con->connect_action = tcp_connect_to_sock;
1195
1196	/* Bind to our port */
1197	make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1198	result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1199	if (result < 0) {
1200		log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1201		sock_release(sock);
1202		sock = NULL;
1203		con->sock = NULL;
1204		goto create_out;
1205	}
1206	result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1207				 (char *)&one, sizeof(one));
1208	if (result < 0) {
1209		log_print("Set keepalive failed: %d", result);
1210	}
1211
1212	result = sock->ops->listen(sock, 5);
1213	if (result < 0) {
1214		log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1215		sock_release(sock);
1216		sock = NULL;
1217		goto create_out;
1218	}
1219
1220create_out:
1221	return sock;
1222}
1223
1224/* Get local addresses */
1225static void init_local(void)
1226{
1227	struct sockaddr_storage sas, *addr;
1228	int i;
1229
1230	dlm_local_count = 0;
1231	for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1232		if (dlm_our_addr(&sas, i))
1233			break;
1234
1235		addr = kmalloc(sizeof(*addr), GFP_NOFS);
1236		if (!addr)
1237			break;
1238		memcpy(addr, &sas, sizeof(*addr));
1239		dlm_local_addr[dlm_local_count++] = addr;
1240	}
1241}
1242
1243/* Initialise SCTP socket and bind to all interfaces */
1244static int sctp_listen_for_all(void)
1245{
1246	struct socket *sock = NULL;
1247	int result = -EINVAL;
1248	struct connection *con = nodeid2con(0, GFP_NOFS);
1249	int bufsize = NEEDED_RMEM;
1250	int one = 1;
1251
1252	if (!con)
1253		return -ENOMEM;
1254
1255	log_print("Using SCTP for communications");
1256
1257	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1258				  SOCK_STREAM, IPPROTO_SCTP, &sock);
1259	if (result < 0) {
1260		log_print("Can't create comms socket, check SCTP is loaded");
1261		goto out;
1262	}
1263
1264	result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1265				 (char *)&bufsize, sizeof(bufsize));
1266	if (result)
1267		log_print("Error increasing buffer space on socket %d", result);
1268
1269	result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
1270				   sizeof(one));
1271	if (result < 0)
1272		log_print("Could not set SCTP NODELAY error %d\n", result);
1273
1274	/* Init con struct */
1275	sock->sk->sk_user_data = con;
1276	con->sock = sock;
1277	con->sock->sk->sk_data_ready = lowcomms_data_ready;
1278	con->rx_action = sctp_accept_from_sock;
1279	con->connect_action = sctp_connect_to_sock;
1280
1281	/* Bind to all addresses. */
1282	if (sctp_bind_addrs(con, dlm_config.ci_tcp_port))
1283		goto create_delsock;
1284
1285	result = sock->ops->listen(sock, 5);
1286	if (result < 0) {
1287		log_print("Can't set socket listening");
1288		goto create_delsock;
1289	}
1290
1291	return 0;
1292
1293create_delsock:
1294	sock_release(sock);
1295	con->sock = NULL;
1296out:
1297	return result;
1298}
1299
1300static int tcp_listen_for_all(void)
1301{
1302	struct socket *sock = NULL;
1303	struct connection *con = nodeid2con(0, GFP_NOFS);
1304	int result = -EINVAL;
1305
1306	if (!con)
1307		return -ENOMEM;
1308
1309	/* We don't support multi-homed hosts */
1310	if (dlm_local_addr[1] != NULL) {
1311		log_print("TCP protocol can't handle multi-homed hosts, "
1312			  "try SCTP");
1313		return -EINVAL;
1314	}
1315
1316	log_print("Using TCP for communications");
1317
1318	sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1319	if (sock) {
1320		add_sock(sock, con);
1321		result = 0;
1322	}
1323	else {
1324		result = -EADDRINUSE;
1325	}
1326
1327	return result;
1328}
1329
1330
1331
1332static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1333						     gfp_t allocation)
1334{
1335	struct writequeue_entry *entry;
1336
1337	entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1338	if (!entry)
1339		return NULL;
1340
1341	entry->page = alloc_page(allocation);
1342	if (!entry->page) {
1343		kfree(entry);
1344		return NULL;
1345	}
1346
1347	entry->offset = 0;
1348	entry->len = 0;
1349	entry->end = 0;
1350	entry->users = 0;
1351	entry->con = con;
1352
1353	return entry;
1354}
1355
1356void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1357{
1358	struct connection *con;
1359	struct writequeue_entry *e;
1360	int offset = 0;
1361
1362	con = nodeid2con(nodeid, allocation);
1363	if (!con)
1364		return NULL;
1365
1366	spin_lock(&con->writequeue_lock);
1367	e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1368	if ((&e->list == &con->writequeue) ||
1369	    (PAGE_CACHE_SIZE - e->end < len)) {
1370		e = NULL;
1371	} else {
1372		offset = e->end;
1373		e->end += len;
1374		e->users++;
1375	}
1376	spin_unlock(&con->writequeue_lock);
1377
1378	if (e) {
1379	got_one:
1380		*ppc = page_address(e->page) + offset;
1381		return e;
1382	}
1383
1384	e = new_writequeue_entry(con, allocation);
1385	if (e) {
1386		spin_lock(&con->writequeue_lock);
1387		offset = e->end;
1388		e->end += len;
1389		e->users++;
1390		list_add_tail(&e->list, &con->writequeue);
1391		spin_unlock(&con->writequeue_lock);
1392		goto got_one;
1393	}
1394	return NULL;
1395}
1396
1397void dlm_lowcomms_commit_buffer(void *mh)
1398{
1399	struct writequeue_entry *e = (struct writequeue_entry *)mh;
1400	struct connection *con = e->con;
1401	int users;
1402
1403	spin_lock(&con->writequeue_lock);
1404	users = --e->users;
1405	if (users)
1406		goto out;
1407	e->len = e->end - e->offset;
1408	spin_unlock(&con->writequeue_lock);
1409
1410	if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
1411		queue_work(send_workqueue, &con->swork);
1412	}
1413	return;
1414
1415out:
1416	spin_unlock(&con->writequeue_lock);
1417	return;
1418}
1419
1420/* Send a message */
1421static void send_to_sock(struct connection *con)
1422{
1423	int ret = 0;
1424	const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1425	struct writequeue_entry *e;
1426	int len, offset;
1427	int count = 0;
1428
1429	mutex_lock(&con->sock_mutex);
1430	if (con->sock == NULL)
1431		goto out_connect;
1432
1433	spin_lock(&con->writequeue_lock);
1434	for (;;) {
1435		e = list_entry(con->writequeue.next, struct writequeue_entry,
1436			       list);
1437		if ((struct list_head *) e == &con->writequeue)
1438			break;
1439
1440		len = e->len;
1441		offset = e->offset;
1442		BUG_ON(len == 0 && e->users == 0);
1443		spin_unlock(&con->writequeue_lock);
1444
1445		ret = 0;
1446		if (len) {
1447			ret = kernel_sendpage(con->sock, e->page, offset, len,
1448					      msg_flags);
1449			if (ret == -EAGAIN || ret == 0) {
1450				if (ret == -EAGAIN &&
1451				    test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1452				    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1453					/* Notify TCP that we're limited by the
1454					 * application window size.
1455					 */
1456					set_bit(SOCK_NOSPACE, &con->sock->flags);
1457					con->sock->sk->sk_write_pending++;
1458				}
1459				cond_resched();
1460				goto out;
1461			} else if (ret < 0)
1462				goto send_error;
1463		}
1464
1465		/* Don't starve people filling buffers */
1466		if (++count >= MAX_SEND_MSG_COUNT) {
1467			cond_resched();
1468			count = 0;
1469		}
1470
1471		spin_lock(&con->writequeue_lock);
1472		writequeue_entry_complete(e, ret);
1473	}
1474	spin_unlock(&con->writequeue_lock);
1475out:
1476	mutex_unlock(&con->sock_mutex);
1477	return;
1478
1479send_error:
1480	mutex_unlock(&con->sock_mutex);
1481	close_connection(con, false, false, true);
1482	lowcomms_connect_sock(con);
1483	return;
1484
1485out_connect:
1486	mutex_unlock(&con->sock_mutex);
1487	lowcomms_connect_sock(con);
1488}
1489
1490static void clean_one_writequeue(struct connection *con)
1491{
1492	struct writequeue_entry *e, *safe;
1493
1494	spin_lock(&con->writequeue_lock);
1495	list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1496		list_del(&e->list);
1497		free_entry(e);
1498	}
1499	spin_unlock(&con->writequeue_lock);
1500}
1501
1502/* Called from recovery when it knows that a node has
1503   left the cluster */
1504int dlm_lowcomms_close(int nodeid)
1505{
1506	struct connection *con;
1507	struct dlm_node_addr *na;
1508
1509	log_print("closing connection to node %d", nodeid);
1510	con = nodeid2con(nodeid, 0);
1511	if (con) {
1512		set_bit(CF_CLOSE, &con->flags);
1513		close_connection(con, true, true, true);
1514		clean_one_writequeue(con);
1515	}
1516
1517	spin_lock(&dlm_node_addrs_spin);
1518	na = find_node_addr(nodeid);
1519	if (na) {
1520		list_del(&na->list);
1521		while (na->addr_count--)
1522			kfree(na->addr[na->addr_count]);
1523		kfree(na);
1524	}
1525	spin_unlock(&dlm_node_addrs_spin);
1526
1527	return 0;
1528}
1529
1530/* Receive workqueue function */
1531static void process_recv_sockets(struct work_struct *work)
1532{
1533	struct connection *con = container_of(work, struct connection, rwork);
1534	int err;
1535
1536	clear_bit(CF_READ_PENDING, &con->flags);
1537	do {
1538		err = con->rx_action(con);
1539	} while (!err);
1540}
1541
1542/* Send workqueue function */
1543static void process_send_sockets(struct work_struct *work)
1544{
1545	struct connection *con = container_of(work, struct connection, swork);
1546
1547	if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags))
1548		con->connect_action(con);
1549	if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
1550		send_to_sock(con);
1551}
1552
1553
1554/* Discard all entries on the write queues */
1555static void clean_writequeues(void)
1556{
1557	foreach_conn(clean_one_writequeue);
1558}
1559
1560static void work_stop(void)
1561{
1562	destroy_workqueue(recv_workqueue);
1563	destroy_workqueue(send_workqueue);
1564}
1565
1566static int work_start(void)
1567{
1568	recv_workqueue = alloc_workqueue("dlm_recv",
1569					 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1570	if (!recv_workqueue) {
1571		log_print("can't start dlm_recv");
1572		return -ENOMEM;
1573	}
1574
1575	send_workqueue = alloc_workqueue("dlm_send",
1576					 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1577	if (!send_workqueue) {
1578		log_print("can't start dlm_send");
1579		destroy_workqueue(recv_workqueue);
1580		return -ENOMEM;
1581	}
1582
1583	return 0;
1584}
1585
1586static void stop_conn(struct connection *con)
1587{
1588	con->flags |= 0x0F;
1589	if (con->sock && con->sock->sk)
1590		con->sock->sk->sk_user_data = NULL;
1591}
1592
1593static void free_conn(struct connection *con)
1594{
1595	close_connection(con, true, true, true);
1596	if (con->othercon)
1597		kmem_cache_free(con_cache, con->othercon);
1598	hlist_del(&con->list);
1599	kmem_cache_free(con_cache, con);
1600}
1601
1602void dlm_lowcomms_stop(void)
1603{
1604	/* Set all the flags to prevent any
1605	   socket activity.
1606	*/
1607	mutex_lock(&connections_lock);
1608	dlm_allow_conn = 0;
1609	foreach_conn(stop_conn);
1610	mutex_unlock(&connections_lock);
1611
1612	work_stop();
1613
1614	mutex_lock(&connections_lock);
1615	clean_writequeues();
1616
1617	foreach_conn(free_conn);
1618
1619	mutex_unlock(&connections_lock);
1620	kmem_cache_destroy(con_cache);
1621}
1622
1623int dlm_lowcomms_start(void)
1624{
1625	int error = -EINVAL;
1626	struct connection *con;
1627	int i;
1628
1629	for (i = 0; i < CONN_HASH_SIZE; i++)
1630		INIT_HLIST_HEAD(&connection_hash[i]);
1631
1632	init_local();
1633	if (!dlm_local_count) {
1634		error = -ENOTCONN;
1635		log_print("no local IP address has been set");
1636		goto fail;
1637	}
1638
1639	error = -ENOMEM;
1640	con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1641				      __alignof__(struct connection), 0,
1642				      NULL);
1643	if (!con_cache)
1644		goto fail;
1645
1646	error = work_start();
1647	if (error)
1648		goto fail_destroy;
1649
1650	dlm_allow_conn = 1;
1651
1652	/* Start listening */
1653	if (dlm_config.ci_protocol == 0)
1654		error = tcp_listen_for_all();
1655	else
1656		error = sctp_listen_for_all();
1657	if (error)
1658		goto fail_unlisten;
1659
1660	return 0;
1661
1662fail_unlisten:
1663	dlm_allow_conn = 0;
1664	con = nodeid2con(0,0);
1665	if (con) {
1666		close_connection(con, false, true, true);
1667		kmem_cache_free(con_cache, con);
1668	}
1669fail_destroy:
1670	kmem_cache_destroy(con_cache);
1671fail:
1672	return error;
1673}
1674
1675void dlm_lowcomms_exit(void)
1676{
1677	struct dlm_node_addr *na, *safe;
1678
1679	spin_lock(&dlm_node_addrs_spin);
1680	list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1681		list_del(&na->list);
1682		while (na->addr_count--)
1683			kfree(na->addr[na->addr_count]);
1684		kfree(na);
1685	}
1686	spin_unlock(&dlm_node_addrs_spin);
1687}
1688