1/*
2 * Intel MIC Platform Software Stack (MPSS)
3 *
4 * Copyright(c) 2014 Intel Corporation.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2, as
8 * published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * Intel SCIF driver.
16 *
17 */
18#include "../bus/scif_bus.h"
19#include "scif_peer_bus.h"
20#include "scif_main.h"
21#include "scif_nodeqp.h"
22#include "scif_map.h"
23
24/*
25 ************************************************************************
26 * SCIF node Queue Pair (QP) setup flow:
27 *
28 * 1) SCIF driver gets probed with a scif_hw_dev via the scif_hw_bus
29 * 2) scif_setup_qp(..) allocates the local qp and calls
30 *	scif_setup_qp_connect(..) which allocates and maps the local
31 *	buffer for the inbound QP
32 * 3) The local node updates the device page with the DMA address of the QP
33 * 4) A delayed work is scheduled (qp_dwork) which periodically reads if
34 *	the peer node has updated its QP DMA address
35 * 5) Once a valid non zero address is found in the QP DMA address field
36 *	in the device page, the local node maps the remote node's QP,
37 *	updates its outbound QP and sends a SCIF_INIT message to the peer
38 * 6) The SCIF_INIT message is received by the peer node QP interrupt bottom
39 *	half handler by calling scif_init(..)
40 * 7) scif_init(..) registers a new SCIF peer node by calling
41 *	scif_peer_register_device(..) which signifies the addition of a new
42 *	SCIF node
43 * 8) On the mgmt node, P2P network setup/teardown is initiated if all the
44 *	remote nodes are online via scif_p2p_setup(..)
45 * 9) For P2P setup, the host maps the remote nodes' aperture and memory
46 *	bars and sends a SCIF_NODE_ADD message to both nodes
47 * 10) As part of scif_nodeadd, both nodes set up their local inbound
48 *	QPs and send a SCIF_NODE_ADD_ACK to the mgmt node
49 * 11) As part of scif_node_add_ack(..) the mgmt node forwards the
50 *	SCIF_NODE_ADD_ACK to the remote nodes
51 * 12) As part of scif_node_add_ack(..) the remote nodes update their
52 *	outbound QPs, make sure they can access memory on the remote node
53 *	and then add a new SCIF peer node by calling
54 *	scif_peer_register_device(..) which signifies the addition of a new
55 *	SCIF node.
56 * 13) The SCIF network is now established across all nodes.
57 *
58 ************************************************************************
59 * SCIF node QP teardown flow (initiated by non mgmt node):
60 *
61 * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
62 * 2) The device page QP DMA address field is updated with 0x0
63 * 3) A non mgmt node now cleans up all local data structures and sends a
64 *	SCIF_EXIT message to the peer and waits for a SCIF_EXIT_ACK
65 * 4) As part of scif_exit(..) handling scif_disconnect_node(..) is called
66 * 5) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the
67 *	peers and waits for a SCIF_NODE_REMOVE_ACK
68 * 6) As part of scif_node_remove(..) a remote node unregisters the peer
69 *	node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
70 * 7) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
71 *	it sends itself a node remove message whose handling cleans up local
72 *	data structures and unregisters the peer node from the SCIF network
73 * 8) The mgmt node sends a SCIF_EXIT_ACK
74 * 9) Upon receipt of the SCIF_EXIT_ACK the node initiating the teardown
75 *	completes the SCIF remove routine
76 * 10) The SCIF network is now torn down for the node initiating the
77 *	teardown sequence
78 *
79 ************************************************************************
80 * SCIF node QP teardown flow (initiated by mgmt node):
81 *
82 * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
83 * 2) The device page QP DMA address field is updated with 0x0
84 * 3) The mgmt node calls scif_disconnect_node(..)
85 * 4) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the peers
86 *	and waits for a SCIF_NODE_REMOVE_ACK
87 * 5) As part of scif_node_remove(..) a remote node unregisters the peer
88 *	node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
89 * 6) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
90 *	it unregisters the peer node from the SCIF network
91 * 7) The mgmt node sends a SCIF_EXIT message and waits for a SCIF_EXIT_ACK.
92 * 8) A non mgmt node upon receipt of a SCIF_EXIT message calls scif_stop(..)
93 *	which would clean up local data structures for all SCIF nodes and
94 *	then send a SCIF_EXIT_ACK back to the mgmt node
95 * 9) Upon receipt of the SCIF_EXIT_ACK the the mgmt node sends itself a node
96 *	remove message whose handling cleans up local data structures and
97 *	destroys any P2P mappings.
98 * 10) The SCIF hardware device for which a remove callback was received is now
99 *	disconnected from the SCIF network.
100 */
101/*
102 * Initializes "local" data structures for the QP. Allocates the QP
103 * ring buffer (rb) and initializes the "in bound" queue.
104 */
105int scif_setup_qp_connect(struct scif_qp *qp, dma_addr_t *qp_offset,
106			  int local_size, struct scif_dev *scifdev)
107{
108	void *local_q = qp->inbound_q.rb_base;
109	int err = 0;
110	u32 tmp_rd = 0;
111
112	spin_lock_init(&qp->send_lock);
113	spin_lock_init(&qp->recv_lock);
114
115	/* Allocate rb only if not already allocated */
116	if (!local_q) {
117		local_q = kzalloc(local_size, GFP_KERNEL);
118		if (!local_q) {
119			err = -ENOMEM;
120			return err;
121		}
122	}
123
124	err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
125	if (err)
126		goto kfree;
127	/*
128	 * To setup the inbound_q, the buffer lives locally, the read pointer
129	 * is remote and the write pointer is local.
130	 */
131	scif_rb_init(&qp->inbound_q,
132		     &tmp_rd,
133		     &qp->local_write,
134		     local_q, get_count_order(local_size));
135	/*
136	 * The read pointer is NULL initially and it is unsafe to use the ring
137	 * buffer til this changes!
138	 */
139	qp->inbound_q.read_ptr = NULL;
140	err = scif_map_single(qp_offset, qp,
141			      scifdev, sizeof(struct scif_qp));
142	if (err)
143		goto unmap;
144	qp->local_qp = *qp_offset;
145	return err;
146unmap:
147	scif_unmap_single(qp->local_buf, scifdev, local_size);
148	qp->local_buf = 0;
149kfree:
150	kfree(local_q);
151	return err;
152}
153
154/* When the other side has already done it's allocation, this is called */
155int scif_setup_qp_accept(struct scif_qp *qp, dma_addr_t *qp_offset,
156			 dma_addr_t phys, int local_size,
157			 struct scif_dev *scifdev)
158{
159	void *local_q;
160	void *remote_q;
161	struct scif_qp *remote_qp;
162	int remote_size;
163	int err = 0;
164
165	spin_lock_init(&qp->send_lock);
166	spin_lock_init(&qp->recv_lock);
167	/* Start by figuring out where we need to point */
168	remote_qp = scif_ioremap(phys, sizeof(struct scif_qp), scifdev);
169	if (!remote_qp)
170		return -EIO;
171	qp->remote_qp = remote_qp;
172	if (qp->remote_qp->magic != SCIFEP_MAGIC) {
173		err = -EIO;
174		goto iounmap;
175	}
176	qp->remote_buf = remote_qp->local_buf;
177	remote_size = qp->remote_qp->inbound_q.size;
178	remote_q = scif_ioremap(qp->remote_buf, remote_size, scifdev);
179	if (!remote_q) {
180		err = -EIO;
181		goto iounmap;
182	}
183	qp->remote_qp->local_write = 0;
184	/*
185	 * To setup the outbound_q, the buffer lives in remote memory,
186	 * the read pointer is local, the write pointer is remote
187	 */
188	scif_rb_init(&qp->outbound_q,
189		     &qp->local_read,
190		     &qp->remote_qp->local_write,
191		     remote_q,
192		     get_count_order(remote_size));
193	local_q = kzalloc(local_size, GFP_KERNEL);
194	if (!local_q) {
195		err = -ENOMEM;
196		goto iounmap_1;
197	}
198	err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
199	if (err)
200		goto kfree;
201	qp->remote_qp->local_read = 0;
202	/*
203	 * To setup the inbound_q, the buffer lives locally, the read pointer
204	 * is remote and the write pointer is local
205	 */
206	scif_rb_init(&qp->inbound_q,
207		     &qp->remote_qp->local_read,
208		     &qp->local_write,
209		     local_q, get_count_order(local_size));
210	err = scif_map_single(qp_offset, qp, scifdev,
211			      sizeof(struct scif_qp));
212	if (err)
213		goto unmap;
214	qp->local_qp = *qp_offset;
215	return err;
216unmap:
217	scif_unmap_single(qp->local_buf, scifdev, local_size);
218	qp->local_buf = 0;
219kfree:
220	kfree(local_q);
221iounmap_1:
222	scif_iounmap(remote_q, remote_size, scifdev);
223	qp->outbound_q.rb_base = NULL;
224iounmap:
225	scif_iounmap(qp->remote_qp, sizeof(struct scif_qp), scifdev);
226	qp->remote_qp = NULL;
227	return err;
228}
229
230int scif_setup_qp_connect_response(struct scif_dev *scifdev,
231				   struct scif_qp *qp, u64 payload)
232{
233	int err = 0;
234	void *r_buf;
235	int remote_size;
236	phys_addr_t tmp_phys;
237
238	qp->remote_qp = scif_ioremap(payload, sizeof(struct scif_qp), scifdev);
239
240	if (!qp->remote_qp) {
241		err = -ENOMEM;
242		goto error;
243	}
244
245	if (qp->remote_qp->magic != SCIFEP_MAGIC) {
246		dev_err(&scifdev->sdev->dev,
247			"SCIFEP_MAGIC mismatch between self %d remote %d\n",
248			scif_dev[scif_info.nodeid].node, scifdev->node);
249		err = -ENODEV;
250		goto error;
251	}
252
253	tmp_phys = qp->remote_qp->local_buf;
254	remote_size = qp->remote_qp->inbound_q.size;
255	r_buf = scif_ioremap(tmp_phys, remote_size, scifdev);
256
257	if (!r_buf)
258		return -EIO;
259
260	qp->local_read = 0;
261	scif_rb_init(&qp->outbound_q,
262		     &qp->local_read,
263		     &qp->remote_qp->local_write,
264		     r_buf,
265		     get_count_order(remote_size));
266	/*
267	 * Because the node QP may already be processing an INIT message, set
268	 * the read pointer so the cached read offset isn't lost
269	 */
270	qp->remote_qp->local_read = qp->inbound_q.current_read_offset;
271	/*
272	 * resetup the inbound_q now that we know where the
273	 * inbound_read really is.
274	 */
275	scif_rb_init(&qp->inbound_q,
276		     &qp->remote_qp->local_read,
277		     &qp->local_write,
278		     qp->inbound_q.rb_base,
279		     get_count_order(qp->inbound_q.size));
280error:
281	return err;
282}
283
284static __always_inline void
285scif_send_msg_intr(struct scif_dev *scifdev)
286{
287	struct scif_hw_dev *sdev = scifdev->sdev;
288
289	if (scifdev_is_p2p(scifdev))
290		sdev->hw_ops->send_p2p_intr(sdev, scifdev->rdb, &scifdev->mmio);
291	else
292		sdev->hw_ops->send_intr(sdev, scifdev->rdb);
293}
294
295int scif_qp_response(phys_addr_t phys, struct scif_dev *scifdev)
296{
297	int err = 0;
298	struct scifmsg msg;
299
300	err = scif_setup_qp_connect_response(scifdev, scifdev->qpairs, phys);
301	if (!err) {
302		/*
303		 * Now that everything is setup and mapped, we're ready
304		 * to tell the peer about our queue's location
305		 */
306		msg.uop = SCIF_INIT;
307		msg.dst.node = scifdev->node;
308		err = scif_nodeqp_send(scifdev, &msg);
309	}
310	return err;
311}
312
313void scif_send_exit(struct scif_dev *scifdev)
314{
315	struct scifmsg msg;
316	int ret;
317
318	scifdev->exit = OP_IN_PROGRESS;
319	msg.uop = SCIF_EXIT;
320	msg.src.node = scif_info.nodeid;
321	msg.dst.node = scifdev->node;
322	ret = scif_nodeqp_send(scifdev, &msg);
323	if (ret)
324		goto done;
325	/* Wait for a SCIF_EXIT_ACK message */
326	wait_event_timeout(scif_info.exitwq, scifdev->exit == OP_COMPLETED,
327			   SCIF_NODE_ALIVE_TIMEOUT);
328done:
329	scifdev->exit = OP_IDLE;
330}
331
332int scif_setup_qp(struct scif_dev *scifdev)
333{
334	int err = 0;
335	int local_size;
336	struct scif_qp *qp;
337
338	local_size = SCIF_NODE_QP_SIZE;
339
340	qp = kzalloc(sizeof(*qp), GFP_KERNEL);
341	if (!qp) {
342		err = -ENOMEM;
343		return err;
344	}
345	qp->magic = SCIFEP_MAGIC;
346	scifdev->qpairs = qp;
347	err = scif_setup_qp_connect(qp, &scifdev->qp_dma_addr,
348				    local_size, scifdev);
349	if (err)
350		goto free_qp;
351	/*
352	 * We're as setup as we can be. The inbound_q is setup, w/o a usable
353	 * outbound q.  When we get a message, the read_ptr will be updated,
354	 * and we will pull the message.
355	 */
356	return err;
357free_qp:
358	kfree(scifdev->qpairs);
359	scifdev->qpairs = NULL;
360	return err;
361}
362
363static void scif_p2p_freesg(struct scatterlist *sg)
364{
365	kfree(sg);
366}
367
368static struct scatterlist *
369scif_p2p_setsg(phys_addr_t pa, int page_size, int page_cnt)
370{
371	struct scatterlist *sg;
372	struct page *page;
373	int i;
374
375	sg = kcalloc(page_cnt, sizeof(struct scatterlist), GFP_KERNEL);
376	if (!sg)
377		return NULL;
378	sg_init_table(sg, page_cnt);
379	for (i = 0; i < page_cnt; i++) {
380		page = pfn_to_page(pa >> PAGE_SHIFT);
381		sg_set_page(&sg[i], page, page_size, 0);
382		pa += page_size;
383	}
384	return sg;
385}
386
387/* Init p2p mappings required to access peerdev from scifdev */
388static struct scif_p2p_info *
389scif_init_p2p_info(struct scif_dev *scifdev, struct scif_dev *peerdev)
390{
391	struct scif_p2p_info *p2p;
392	int num_mmio_pages, num_aper_pages, sg_page_shift, err, num_aper_chunks;
393	struct scif_hw_dev *psdev = peerdev->sdev;
394	struct scif_hw_dev *sdev = scifdev->sdev;
395
396	num_mmio_pages = psdev->mmio->len >> PAGE_SHIFT;
397	num_aper_pages = psdev->aper->len >> PAGE_SHIFT;
398
399	p2p = kzalloc(sizeof(*p2p), GFP_KERNEL);
400	if (!p2p)
401		return NULL;
402	p2p->ppi_sg[SCIF_PPI_MMIO] = scif_p2p_setsg(psdev->mmio->pa,
403						    PAGE_SIZE, num_mmio_pages);
404	if (!p2p->ppi_sg[SCIF_PPI_MMIO])
405		goto free_p2p;
406	p2p->sg_nentries[SCIF_PPI_MMIO] = num_mmio_pages;
407	sg_page_shift = get_order(min(psdev->aper->len, (u64)(1 << 30)));
408	num_aper_chunks = num_aper_pages >> (sg_page_shift - PAGE_SHIFT);
409	p2p->ppi_sg[SCIF_PPI_APER] = scif_p2p_setsg(psdev->aper->pa,
410						    1 << sg_page_shift,
411						    num_aper_chunks);
412	p2p->sg_nentries[SCIF_PPI_APER] = num_aper_chunks;
413	err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
414			 num_mmio_pages, PCI_DMA_BIDIRECTIONAL);
415	if (err != num_mmio_pages)
416		goto scif_p2p_free;
417	err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
418			 num_aper_chunks, PCI_DMA_BIDIRECTIONAL);
419	if (err != num_aper_chunks)
420		goto dma_unmap;
421	p2p->ppi_da[SCIF_PPI_MMIO] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_MMIO]);
422	p2p->ppi_da[SCIF_PPI_APER] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_APER]);
423	p2p->ppi_len[SCIF_PPI_MMIO] = num_mmio_pages;
424	p2p->ppi_len[SCIF_PPI_APER] = num_aper_pages;
425	p2p->ppi_peer_id = peerdev->node;
426	return p2p;
427dma_unmap:
428	dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
429		     p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL);
430scif_p2p_free:
431	scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
432	scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
433free_p2p:
434	kfree(p2p);
435	return NULL;
436}
437
438/* Uninitialize and release resources from a p2p mapping */
439static void scif_deinit_p2p_info(struct scif_dev *scifdev,
440				 struct scif_p2p_info *p2p)
441{
442	struct scif_hw_dev *sdev = scifdev->sdev;
443
444	dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
445		     p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL);
446	dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
447		     p2p->sg_nentries[SCIF_PPI_APER], DMA_BIDIRECTIONAL);
448	scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
449	scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
450	kfree(p2p);
451}
452
453/**
454 * scif_node_connect: Respond to SCIF_NODE_CONNECT interrupt message
455 * @dst: Destination node
456 *
457 * Connect the src and dst node by setting up the p2p connection
458 * between them. Management node here acts like a proxy.
459 */
460static void scif_node_connect(struct scif_dev *scifdev, int dst)
461{
462	struct scif_dev *dev_j = scifdev;
463	struct scif_dev *dev_i = NULL;
464	struct scif_p2p_info *p2p_ij = NULL;    /* bus addr for j from i */
465	struct scif_p2p_info *p2p_ji = NULL;    /* bus addr for i from j */
466	struct scif_p2p_info *p2p;
467	struct list_head *pos, *tmp;
468	struct scifmsg msg;
469	int err;
470	u64 tmppayload;
471
472	if (dst < 1 || dst > scif_info.maxid)
473		return;
474
475	dev_i = &scif_dev[dst];
476
477	if (!_scifdev_alive(dev_i))
478		return;
479	/*
480	 * If the p2p connection is already setup or in the process of setting
481	 * up then just ignore this request. The requested node will get
482	 * informed by SCIF_NODE_ADD_ACK or SCIF_NODE_ADD_NACK
483	 */
484	if (!list_empty(&dev_i->p2p)) {
485		list_for_each_safe(pos, tmp, &dev_i->p2p) {
486			p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
487			if (p2p->ppi_peer_id == dev_j->node)
488				return;
489		}
490	}
491	p2p_ij = scif_init_p2p_info(dev_i, dev_j);
492	if (!p2p_ij)
493		return;
494	p2p_ji = scif_init_p2p_info(dev_j, dev_i);
495	if (!p2p_ji) {
496		scif_deinit_p2p_info(dev_i, p2p_ij);
497		return;
498	}
499	list_add_tail(&p2p_ij->ppi_list, &dev_i->p2p);
500	list_add_tail(&p2p_ji->ppi_list, &dev_j->p2p);
501
502	/*
503	 * Send a SCIF_NODE_ADD to dev_i, pass it its bus address
504	 * as seen from dev_j
505	 */
506	msg.uop = SCIF_NODE_ADD;
507	msg.src.node = dev_j->node;
508	msg.dst.node = dev_i->node;
509
510	msg.payload[0] = p2p_ji->ppi_da[SCIF_PPI_APER];
511	msg.payload[1] = p2p_ij->ppi_da[SCIF_PPI_MMIO];
512	msg.payload[2] = p2p_ij->ppi_da[SCIF_PPI_APER];
513	msg.payload[3] = p2p_ij->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
514
515	err = scif_nodeqp_send(dev_i,  &msg);
516	if (err) {
517		dev_err(&scifdev->sdev->dev,
518			"%s %d error %d\n", __func__, __LINE__, err);
519		return;
520	}
521
522	/* Same as above but to dev_j */
523	msg.uop = SCIF_NODE_ADD;
524	msg.src.node = dev_i->node;
525	msg.dst.node = dev_j->node;
526
527	tmppayload = msg.payload[0];
528	msg.payload[0] = msg.payload[2];
529	msg.payload[2] = tmppayload;
530	msg.payload[1] = p2p_ji->ppi_da[SCIF_PPI_MMIO];
531	msg.payload[3] = p2p_ji->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
532
533	scif_nodeqp_send(dev_j, &msg);
534}
535
536static void scif_p2p_setup(void)
537{
538	int i, j;
539
540	if (!scif_info.p2p_enable)
541		return;
542
543	for (i = 1; i <= scif_info.maxid; i++)
544		if (!_scifdev_alive(&scif_dev[i]))
545			return;
546
547	for (i = 1; i <= scif_info.maxid; i++) {
548		for (j = 1; j <= scif_info.maxid; j++) {
549			struct scif_dev *scifdev = &scif_dev[i];
550
551			if (i == j)
552				continue;
553			scif_node_connect(scifdev, j);
554		}
555	}
556}
557
558static char *message_types[] = {"BAD",
559				"INIT",
560				"EXIT",
561				"SCIF_EXIT_ACK",
562				"SCIF_NODE_ADD",
563				"SCIF_NODE_ADD_ACK",
564				"SCIF_NODE_ADD_NACK",
565				"REMOVE_NODE",
566				"REMOVE_NODE_ACK",
567				"CNCT_REQ",
568				"CNCT_GNT",
569				"CNCT_GNTACK",
570				"CNCT_GNTNACK",
571				"CNCT_REJ",
572				"DISCNCT",
573				"DISCNT_ACK",
574				"CLIENT_SENT",
575				"CLIENT_RCVD",
576				"SCIF_GET_NODE_INFO",
577				"REGISTER",
578				"REGISTER_ACK",
579				"REGISTER_NACK",
580				"UNREGISTER",
581				"UNREGISTER_ACK",
582				"UNREGISTER_NACK",
583				"ALLOC_REQ",
584				"ALLOC_GNT",
585				"ALLOC_REJ",
586				"FREE_PHYS",
587				"FREE_VIRT",
588				"MUNMAP",
589				"MARK",
590				"MARK_ACK",
591				"MARK_NACK",
592				"WAIT",
593				"WAIT_ACK",
594				"WAIT_NACK",
595				"SIGNAL_LOCAL",
596				"SIGNAL_REMOTE",
597				"SIG_ACK",
598				"SIG_NACK"};
599
600static void
601scif_display_message(struct scif_dev *scifdev, struct scifmsg *msg,
602		     const char *label)
603{
604	if (!scif_info.en_msg_log)
605		return;
606	if (msg->uop > SCIF_MAX_MSG) {
607		dev_err(&scifdev->sdev->dev,
608			"%s: unknown msg type %d\n", label, msg->uop);
609		return;
610	}
611	dev_info(&scifdev->sdev->dev,
612		 "%s: msg type %s, src %d:%d, dest %d:%d payload 0x%llx:0x%llx:0x%llx:0x%llx\n",
613		 label, message_types[msg->uop], msg->src.node, msg->src.port,
614		 msg->dst.node, msg->dst.port, msg->payload[0], msg->payload[1],
615		 msg->payload[2], msg->payload[3]);
616}
617
618int _scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
619{
620	struct scif_qp *qp = scifdev->qpairs;
621	int err = -ENOMEM, loop_cnt = 0;
622
623	scif_display_message(scifdev, msg, "Sent");
624	if (!qp) {
625		err = -EINVAL;
626		goto error;
627	}
628	spin_lock(&qp->send_lock);
629
630	while ((err = scif_rb_write(&qp->outbound_q,
631				    msg, sizeof(struct scifmsg)))) {
632		mdelay(1);
633#define SCIF_NODEQP_SEND_TO_MSEC (3 * 1000)
634		if (loop_cnt++ > (SCIF_NODEQP_SEND_TO_MSEC)) {
635			err = -ENODEV;
636			break;
637		}
638	}
639	if (!err)
640		scif_rb_commit(&qp->outbound_q);
641	spin_unlock(&qp->send_lock);
642	if (!err) {
643		if (scifdev_self(scifdev))
644			/*
645			 * For loopback we need to emulate an interrupt by
646			 * queuing work for the queue handling real node
647			 * Qp interrupts.
648			 */
649			queue_work(scifdev->intr_wq, &scifdev->intr_bh);
650		else
651			scif_send_msg_intr(scifdev);
652	}
653error:
654	if (err)
655		dev_dbg(&scifdev->sdev->dev,
656			"%s %d error %d uop %d\n",
657			 __func__, __LINE__, err, msg->uop);
658	return err;
659}
660
661/**
662 * scif_nodeqp_send - Send a message on the node queue pair
663 * @scifdev: Scif Device.
664 * @msg: The message to be sent.
665 */
666int scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
667{
668	int err;
669	struct device *spdev = NULL;
670
671	if (msg->uop > SCIF_EXIT_ACK) {
672		/* Dont send messages once the exit flow has begun */
673		if (OP_IDLE != scifdev->exit)
674			return -ENODEV;
675		spdev = scif_get_peer_dev(scifdev);
676		if (IS_ERR(spdev)) {
677			err = PTR_ERR(spdev);
678			return err;
679		}
680	}
681	err = _scif_nodeqp_send(scifdev, msg);
682	if (msg->uop > SCIF_EXIT_ACK)
683		scif_put_peer_dev(spdev);
684	return err;
685}
686
687/*
688 * scif_misc_handler:
689 *
690 * Work queue handler for servicing miscellaneous SCIF tasks.
691 * Examples include:
692 * 1) Remote fence requests.
693 * 2) Destruction of temporary registered windows
694 *    created during scif_vreadfrom()/scif_vwriteto().
695 * 3) Cleanup of zombie endpoints.
696 */
697void scif_misc_handler(struct work_struct *work)
698{
699	scif_rma_handle_remote_fences();
700	scif_rma_destroy_windows();
701	scif_rma_destroy_tcw_invalid();
702	scif_cleanup_zombie_epd();
703}
704
705/**
706 * scif_init() - Respond to SCIF_INIT interrupt message
707 * @scifdev:    Remote SCIF device node
708 * @msg:        Interrupt message
709 */
710static __always_inline void
711scif_init(struct scif_dev *scifdev, struct scifmsg *msg)
712{
713	/*
714	 * Allow the thread waiting for device page updates for the peer QP DMA
715	 * address to complete initializing the inbound_q.
716	 */
717	flush_delayed_work(&scifdev->qp_dwork);
718
719	scif_peer_register_device(scifdev);
720
721	if (scif_is_mgmt_node()) {
722		mutex_lock(&scif_info.conflock);
723		scif_p2p_setup();
724		mutex_unlock(&scif_info.conflock);
725	}
726}
727
728/**
729 * scif_exit() - Respond to SCIF_EXIT interrupt message
730 * @scifdev:    Remote SCIF device node
731 * @msg:        Interrupt message
732 *
733 * This function stops the SCIF interface for the node which sent
734 * the SCIF_EXIT message and starts waiting for that node to
735 * resetup the queue pair again.
736 */
737static __always_inline void
738scif_exit(struct scif_dev *scifdev, struct scifmsg *unused)
739{
740	scifdev->exit_ack_pending = true;
741	if (scif_is_mgmt_node())
742		scif_disconnect_node(scifdev->node, false);
743	else
744		scif_stop(scifdev);
745	schedule_delayed_work(&scifdev->qp_dwork,
746			      msecs_to_jiffies(1000));
747}
748
749/**
750 * scif_exitack() - Respond to SCIF_EXIT_ACK interrupt message
751 * @scifdev:    Remote SCIF device node
752 * @msg:        Interrupt message
753 *
754 */
755static __always_inline void
756scif_exit_ack(struct scif_dev *scifdev, struct scifmsg *unused)
757{
758	scifdev->exit = OP_COMPLETED;
759	wake_up(&scif_info.exitwq);
760}
761
762/**
763 * scif_node_add() - Respond to SCIF_NODE_ADD interrupt message
764 * @scifdev:    Remote SCIF device node
765 * @msg:        Interrupt message
766 *
767 * When the mgmt node driver has finished initializing a MIC node queue pair it
768 * marks the node as online. It then looks for all currently online MIC cards
769 * and send a SCIF_NODE_ADD message to identify the ID of the new card for
770 * peer to peer initialization
771 *
772 * The local node allocates its incoming queue and sends its address in the
773 * SCIF_NODE_ADD_ACK message back to the mgmt node, the mgmt node "reflects"
774 * this message to the new node
775 */
776static __always_inline void
777scif_node_add(struct scif_dev *scifdev, struct scifmsg *msg)
778{
779	struct scif_dev *newdev;
780	dma_addr_t qp_offset;
781	int qp_connect;
782	struct scif_hw_dev *sdev;
783
784	dev_dbg(&scifdev->sdev->dev,
785		"Scifdev %d:%d received NODE_ADD msg for node %d\n",
786		scifdev->node, msg->dst.node, msg->src.node);
787	dev_dbg(&scifdev->sdev->dev,
788		"Remote address for this node's aperture %llx\n",
789		msg->payload[0]);
790	newdev = &scif_dev[msg->src.node];
791	newdev->node = msg->src.node;
792	newdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
793	sdev = newdev->sdev;
794
795	if (scif_setup_intr_wq(newdev)) {
796		dev_err(&scifdev->sdev->dev,
797			"failed to setup interrupts for %d\n", msg->src.node);
798		goto interrupt_setup_error;
799	}
800	newdev->mmio.va = ioremap_nocache(msg->payload[1], sdev->mmio->len);
801	if (!newdev->mmio.va) {
802		dev_err(&scifdev->sdev->dev,
803			"failed to map mmio for %d\n", msg->src.node);
804		goto mmio_map_error;
805	}
806	newdev->qpairs = kzalloc(sizeof(*newdev->qpairs), GFP_KERNEL);
807	if (!newdev->qpairs)
808		goto qp_alloc_error;
809	/*
810	 * Set the base address of the remote node's memory since it gets
811	 * added to qp_offset
812	 */
813	newdev->base_addr = msg->payload[0];
814
815	qp_connect = scif_setup_qp_connect(newdev->qpairs, &qp_offset,
816					   SCIF_NODE_QP_SIZE, newdev);
817	if (qp_connect) {
818		dev_err(&scifdev->sdev->dev,
819			"failed to setup qp_connect %d\n", qp_connect);
820		goto qp_connect_error;
821	}
822
823	newdev->db = sdev->hw_ops->next_db(sdev);
824	newdev->cookie = sdev->hw_ops->request_irq(sdev, scif_intr_handler,
825						   "SCIF_INTR", newdev,
826						   newdev->db);
827	if (IS_ERR(newdev->cookie))
828		goto qp_connect_error;
829	newdev->qpairs->magic = SCIFEP_MAGIC;
830	newdev->qpairs->qp_state = SCIF_QP_OFFLINE;
831
832	msg->uop = SCIF_NODE_ADD_ACK;
833	msg->dst.node = msg->src.node;
834	msg->src.node = scif_info.nodeid;
835	msg->payload[0] = qp_offset;
836	msg->payload[2] = newdev->db;
837	scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
838	return;
839qp_connect_error:
840	kfree(newdev->qpairs);
841	newdev->qpairs = NULL;
842qp_alloc_error:
843	iounmap(newdev->mmio.va);
844	newdev->mmio.va = NULL;
845mmio_map_error:
846interrupt_setup_error:
847	dev_err(&scifdev->sdev->dev,
848		"node add failed for node %d\n", msg->src.node);
849	msg->uop = SCIF_NODE_ADD_NACK;
850	msg->dst.node = msg->src.node;
851	msg->src.node = scif_info.nodeid;
852	scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
853}
854
855void scif_poll_qp_state(struct work_struct *work)
856{
857#define SCIF_NODE_QP_RETRY 100
858#define SCIF_NODE_QP_TIMEOUT 100
859	struct scif_dev *peerdev = container_of(work, struct scif_dev,
860							p2p_dwork.work);
861	struct scif_qp *qp = &peerdev->qpairs[0];
862
863	if (qp->qp_state != SCIF_QP_ONLINE ||
864	    qp->remote_qp->qp_state != SCIF_QP_ONLINE) {
865		if (peerdev->p2p_retry++ == SCIF_NODE_QP_RETRY) {
866			dev_err(&peerdev->sdev->dev,
867				"Warning: QP check timeout with state %d\n",
868				qp->qp_state);
869			goto timeout;
870		}
871		schedule_delayed_work(&peerdev->p2p_dwork,
872				      msecs_to_jiffies(SCIF_NODE_QP_TIMEOUT));
873		return;
874	}
875	return;
876timeout:
877	dev_err(&peerdev->sdev->dev,
878		"%s %d remote node %d offline,  state = 0x%x\n",
879		__func__, __LINE__, peerdev->node, qp->qp_state);
880	qp->remote_qp->qp_state = SCIF_QP_OFFLINE;
881	scif_peer_unregister_device(peerdev);
882	scif_cleanup_scifdev(peerdev);
883}
884
885/**
886 * scif_node_add_ack() - Respond to SCIF_NODE_ADD_ACK interrupt message
887 * @scifdev:    Remote SCIF device node
888 * @msg:        Interrupt message
889 *
890 * After a MIC node receives the SCIF_NODE_ADD_ACK message it send this
891 * message to the mgmt node to confirm the sequence is finished.
892 *
893 */
894static __always_inline void
895scif_node_add_ack(struct scif_dev *scifdev, struct scifmsg *msg)
896{
897	struct scif_dev *peerdev;
898	struct scif_qp *qp;
899	struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
900
901	dev_dbg(&scifdev->sdev->dev,
902		"Scifdev %d received SCIF_NODE_ADD_ACK msg src %d dst %d\n",
903		scifdev->node, msg->src.node, msg->dst.node);
904	dev_dbg(&scifdev->sdev->dev,
905		"payload %llx %llx %llx %llx\n", msg->payload[0],
906		msg->payload[1], msg->payload[2], msg->payload[3]);
907	if (scif_is_mgmt_node()) {
908		/*
909		 * the lock serializes with scif_qp_response_ack. The mgmt node
910		 * is forwarding the NODE_ADD_ACK message from src to dst we
911		 * need to make sure that the dst has already received a
912		 * NODE_ADD for src and setup its end of the qp to dst
913		 */
914		mutex_lock(&scif_info.conflock);
915		msg->payload[1] = scif_info.maxid;
916		scif_nodeqp_send(dst_dev, msg);
917		mutex_unlock(&scif_info.conflock);
918		return;
919	}
920	peerdev = &scif_dev[msg->src.node];
921	peerdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
922	peerdev->node = msg->src.node;
923
924	qp = &peerdev->qpairs[0];
925
926	if ((scif_setup_qp_connect_response(peerdev, &peerdev->qpairs[0],
927					    msg->payload[0])))
928		goto local_error;
929	peerdev->rdb = msg->payload[2];
930	qp->remote_qp->qp_state = SCIF_QP_ONLINE;
931
932	scif_peer_register_device(peerdev);
933
934	schedule_delayed_work(&peerdev->p2p_dwork, 0);
935	return;
936local_error:
937	scif_cleanup_scifdev(peerdev);
938}
939
940/**
941 * scif_node_add_nack: Respond to SCIF_NODE_ADD_NACK interrupt message
942 * @msg:        Interrupt message
943 *
944 * SCIF_NODE_ADD failed, so inform the waiting wq.
945 */
946static __always_inline void
947scif_node_add_nack(struct scif_dev *scifdev, struct scifmsg *msg)
948{
949	if (scif_is_mgmt_node()) {
950		struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
951
952		dev_dbg(&scifdev->sdev->dev,
953			"SCIF_NODE_ADD_NACK received from %d\n", scifdev->node);
954		scif_nodeqp_send(dst_dev, msg);
955	}
956}
957
958/*
959 * scif_node_remove: Handle SCIF_NODE_REMOVE message
960 * @msg: Interrupt message
961 *
962 * Handle node removal.
963 */
964static __always_inline void
965scif_node_remove(struct scif_dev *scifdev, struct scifmsg *msg)
966{
967	int node = msg->payload[0];
968	struct scif_dev *scdev = &scif_dev[node];
969
970	scdev->node_remove_ack_pending = true;
971	scif_handle_remove_node(node);
972}
973
974/*
975 * scif_node_remove_ack: Handle SCIF_NODE_REMOVE_ACK message
976 * @msg: Interrupt message
977 *
978 * The peer has acked a SCIF_NODE_REMOVE message.
979 */
980static __always_inline void
981scif_node_remove_ack(struct scif_dev *scifdev, struct scifmsg *msg)
982{
983	struct scif_dev *sdev = &scif_dev[msg->payload[0]];
984
985	atomic_inc(&sdev->disconn_rescnt);
986	wake_up(&sdev->disconn_wq);
987}
988
989/**
990 * scif_get_node_info: Respond to SCIF_GET_NODE_INFO interrupt message
991 * @msg:        Interrupt message
992 *
993 * Retrieve node info i.e maxid and total from the mgmt node.
994 */
995static __always_inline void
996scif_get_node_info_resp(struct scif_dev *scifdev, struct scifmsg *msg)
997{
998	if (scif_is_mgmt_node()) {
999		swap(msg->dst.node, msg->src.node);
1000		mutex_lock(&scif_info.conflock);
1001		msg->payload[1] = scif_info.maxid;
1002		msg->payload[2] = scif_info.total;
1003		mutex_unlock(&scif_info.conflock);
1004		scif_nodeqp_send(scifdev, msg);
1005	} else {
1006		struct completion *node_info =
1007			(struct completion *)msg->payload[3];
1008
1009		mutex_lock(&scif_info.conflock);
1010		scif_info.maxid = msg->payload[1];
1011		scif_info.total = msg->payload[2];
1012		complete_all(node_info);
1013		mutex_unlock(&scif_info.conflock);
1014	}
1015}
1016
1017static void
1018scif_msg_unknown(struct scif_dev *scifdev, struct scifmsg *msg)
1019{
1020	/* Bogus Node Qp Message? */
1021	dev_err(&scifdev->sdev->dev,
1022		"Unknown message 0x%xn scifdev->node 0x%x\n",
1023		msg->uop, scifdev->node);
1024}
1025
1026static void (*scif_intr_func[SCIF_MAX_MSG + 1])
1027	    (struct scif_dev *, struct scifmsg *msg) = {
1028	scif_msg_unknown,	/* Error */
1029	scif_init,		/* SCIF_INIT */
1030	scif_exit,		/* SCIF_EXIT */
1031	scif_exit_ack,		/* SCIF_EXIT_ACK */
1032	scif_node_add,		/* SCIF_NODE_ADD */
1033	scif_node_add_ack,	/* SCIF_NODE_ADD_ACK */
1034	scif_node_add_nack,	/* SCIF_NODE_ADD_NACK */
1035	scif_node_remove,	/* SCIF_NODE_REMOVE */
1036	scif_node_remove_ack,	/* SCIF_NODE_REMOVE_ACK */
1037	scif_cnctreq,		/* SCIF_CNCT_REQ */
1038	scif_cnctgnt,		/* SCIF_CNCT_GNT */
1039	scif_cnctgnt_ack,	/* SCIF_CNCT_GNTACK */
1040	scif_cnctgnt_nack,	/* SCIF_CNCT_GNTNACK */
1041	scif_cnctrej,		/* SCIF_CNCT_REJ */
1042	scif_discnct,		/* SCIF_DISCNCT */
1043	scif_discnt_ack,	/* SCIF_DISCNT_ACK */
1044	scif_clientsend,	/* SCIF_CLIENT_SENT */
1045	scif_clientrcvd,	/* SCIF_CLIENT_RCVD */
1046	scif_get_node_info_resp,/* SCIF_GET_NODE_INFO */
1047	scif_recv_reg,		/* SCIF_REGISTER */
1048	scif_recv_reg_ack,	/* SCIF_REGISTER_ACK */
1049	scif_recv_reg_nack,	/* SCIF_REGISTER_NACK */
1050	scif_recv_unreg,	/* SCIF_UNREGISTER */
1051	scif_recv_unreg_ack,	/* SCIF_UNREGISTER_ACK */
1052	scif_recv_unreg_nack,	/* SCIF_UNREGISTER_NACK */
1053	scif_alloc_req,		/* SCIF_ALLOC_REQ */
1054	scif_alloc_gnt_rej,	/* SCIF_ALLOC_GNT */
1055	scif_alloc_gnt_rej,	/* SCIF_ALLOC_REJ */
1056	scif_free_virt,		/* SCIF_FREE_VIRT */
1057	scif_recv_munmap,	/* SCIF_MUNMAP */
1058	scif_recv_mark,		/* SCIF_MARK */
1059	scif_recv_mark_resp,	/* SCIF_MARK_ACK */
1060	scif_recv_mark_resp,	/* SCIF_MARK_NACK */
1061	scif_recv_wait,		/* SCIF_WAIT */
1062	scif_recv_wait_resp,	/* SCIF_WAIT_ACK */
1063	scif_recv_wait_resp,	/* SCIF_WAIT_NACK */
1064	scif_recv_sig_local,	/* SCIF_SIG_LOCAL */
1065	scif_recv_sig_remote,	/* SCIF_SIG_REMOTE */
1066	scif_recv_sig_resp,	/* SCIF_SIG_ACK */
1067	scif_recv_sig_resp,	/* SCIF_SIG_NACK */
1068};
1069
1070/**
1071 * scif_nodeqp_msg_handler() - Common handler for node messages
1072 * @scifdev: Remote device to respond to
1073 * @qp: Remote memory pointer
1074 * @msg: The message to be handled.
1075 *
1076 * This routine calls the appropriate routine to handle a Node Qp
1077 * message receipt
1078 */
1079static int scif_max_msg_id = SCIF_MAX_MSG;
1080
1081static void
1082scif_nodeqp_msg_handler(struct scif_dev *scifdev,
1083			struct scif_qp *qp, struct scifmsg *msg)
1084{
1085	scif_display_message(scifdev, msg, "Rcvd");
1086
1087	if (msg->uop > (u32)scif_max_msg_id) {
1088		/* Bogus Node Qp Message? */
1089		dev_err(&scifdev->sdev->dev,
1090			"Unknown message 0x%xn scifdev->node 0x%x\n",
1091			msg->uop, scifdev->node);
1092		return;
1093	}
1094
1095	scif_intr_func[msg->uop](scifdev, msg);
1096}
1097
1098/**
1099 * scif_nodeqp_intrhandler() - Interrupt handler for node messages
1100 * @scifdev:    Remote device to respond to
1101 * @qp:         Remote memory pointer
1102 *
1103 * This routine is triggered by the interrupt mechanism.  It reads
1104 * messages from the node queue RB and calls the Node QP Message handling
1105 * routine.
1106 */
1107void scif_nodeqp_intrhandler(struct scif_dev *scifdev, struct scif_qp *qp)
1108{
1109	struct scifmsg msg;
1110	int read_size;
1111
1112	do {
1113		read_size = scif_rb_get_next(&qp->inbound_q, &msg, sizeof(msg));
1114		if (!read_size)
1115			break;
1116		scif_nodeqp_msg_handler(scifdev, qp, &msg);
1117		/*
1118		 * The node queue pair is unmapped so skip the read pointer
1119		 * update after receipt of a SCIF_EXIT_ACK
1120		 */
1121		if (SCIF_EXIT_ACK == msg.uop)
1122			break;
1123		scif_rb_update_read_ptr(&qp->inbound_q);
1124	} while (1);
1125}
1126
1127/**
1128 * scif_loopb_wq_handler - Loopback Workqueue Handler.
1129 * @work: loop back work
1130 *
1131 * This work queue routine is invoked by the loopback work queue handler.
1132 * It grabs the recv lock, dequeues any available messages from the head
1133 * of the loopback message list, calls the node QP message handler,
1134 * waits for it to return, then frees up this message and dequeues more
1135 * elements of the list if available.
1136 */
1137static void scif_loopb_wq_handler(struct work_struct *unused)
1138{
1139	struct scif_dev *scifdev = scif_info.loopb_dev;
1140	struct scif_qp *qp = scifdev->qpairs;
1141	struct scif_loopb_msg *msg;
1142
1143	do {
1144		msg = NULL;
1145		spin_lock(&qp->recv_lock);
1146		if (!list_empty(&scif_info.loopb_recv_q)) {
1147			msg = list_first_entry(&scif_info.loopb_recv_q,
1148					       struct scif_loopb_msg,
1149					       list);
1150			list_del(&msg->list);
1151		}
1152		spin_unlock(&qp->recv_lock);
1153
1154		if (msg) {
1155			scif_nodeqp_msg_handler(scifdev, qp, &msg->msg);
1156			kfree(msg);
1157		}
1158	} while (msg);
1159}
1160
1161/**
1162 * scif_loopb_msg_handler() - Workqueue handler for loopback messages.
1163 * @scifdev: SCIF device
1164 * @qp: Queue pair.
1165 *
1166 * This work queue routine is triggered when a loopback message is received.
1167 *
1168 * We need special handling for receiving Node Qp messages on a loopback SCIF
1169 * device via two workqueues for receiving messages.
1170 *
1171 * The reason we need the extra workqueue which is not required with *normal*
1172 * non-loopback SCIF devices is the potential classic deadlock described below:
1173 *
1174 * Thread A tries to send a message on a loopback SCIF device and blocks since
1175 * there is no space in the RB while it has the send_lock held or another
1176 * lock called lock X for example.
1177 *
1178 * Thread B: The Loopback Node QP message receive workqueue receives the message
1179 * and tries to send a message (eg an ACK) to the loopback SCIF device. It tries
1180 * to grab the send lock again or lock X and deadlocks with Thread A. The RB
1181 * cannot be drained any further due to this classic deadlock.
1182 *
1183 * In order to avoid deadlocks as mentioned above we have an extra level of
1184 * indirection achieved by having two workqueues.
1185 * 1) The first workqueue whose handler is scif_loopb_msg_handler reads
1186 * messages from the Node QP RB, adds them to a list and queues work for the
1187 * second workqueue.
1188 *
1189 * 2) The second workqueue whose handler is scif_loopb_wq_handler dequeues
1190 * messages from the list, handles them, frees up the memory and dequeues
1191 * more elements from the list if possible.
1192 */
1193int
1194scif_loopb_msg_handler(struct scif_dev *scifdev, struct scif_qp *qp)
1195{
1196	int read_size;
1197	struct scif_loopb_msg *msg;
1198
1199	do {
1200		msg = kmalloc(sizeof(*msg), GFP_KERNEL);
1201		if (!msg)
1202			return -ENOMEM;
1203		read_size = scif_rb_get_next(&qp->inbound_q, &msg->msg,
1204					     sizeof(struct scifmsg));
1205		if (read_size != sizeof(struct scifmsg)) {
1206			kfree(msg);
1207			scif_rb_update_read_ptr(&qp->inbound_q);
1208			break;
1209		}
1210		spin_lock(&qp->recv_lock);
1211		list_add_tail(&msg->list, &scif_info.loopb_recv_q);
1212		spin_unlock(&qp->recv_lock);
1213		queue_work(scif_info.loopb_wq, &scif_info.loopb_work);
1214		scif_rb_update_read_ptr(&qp->inbound_q);
1215	} while (read_size == sizeof(struct scifmsg));
1216	return read_size;
1217}
1218
1219/**
1220 * scif_setup_loopback_qp - One time setup work for Loopback Node Qp.
1221 * @scifdev: SCIF device
1222 *
1223 * Sets up the required loopback workqueues, queue pairs and ring buffers
1224 */
1225int scif_setup_loopback_qp(struct scif_dev *scifdev)
1226{
1227	int err = 0;
1228	void *local_q;
1229	struct scif_qp *qp;
1230
1231	err = scif_setup_intr_wq(scifdev);
1232	if (err)
1233		goto exit;
1234	INIT_LIST_HEAD(&scif_info.loopb_recv_q);
1235	snprintf(scif_info.loopb_wqname, sizeof(scif_info.loopb_wqname),
1236		 "SCIF LOOPB %d", scifdev->node);
1237	scif_info.loopb_wq =
1238		alloc_ordered_workqueue(scif_info.loopb_wqname, 0);
1239	if (!scif_info.loopb_wq) {
1240		err = -ENOMEM;
1241		goto destroy_intr;
1242	}
1243	INIT_WORK(&scif_info.loopb_work, scif_loopb_wq_handler);
1244	/* Allocate Self Qpair */
1245	scifdev->qpairs = kzalloc(sizeof(*scifdev->qpairs), GFP_KERNEL);
1246	if (!scifdev->qpairs) {
1247		err = -ENOMEM;
1248		goto destroy_loopb_wq;
1249	}
1250
1251	qp = scifdev->qpairs;
1252	qp->magic = SCIFEP_MAGIC;
1253	spin_lock_init(&qp->send_lock);
1254	spin_lock_init(&qp->recv_lock);
1255
1256	local_q = kzalloc(SCIF_NODE_QP_SIZE, GFP_KERNEL);
1257	if (!local_q) {
1258		err = -ENOMEM;
1259		goto free_qpairs;
1260	}
1261	/*
1262	 * For loopback the inbound_q and outbound_q are essentially the same
1263	 * since the Node sends a message on the loopback interface to the
1264	 * outbound_q which is then received on the inbound_q.
1265	 */
1266	scif_rb_init(&qp->outbound_q,
1267		     &qp->local_read,
1268		     &qp->local_write,
1269		     local_q, get_count_order(SCIF_NODE_QP_SIZE));
1270
1271	scif_rb_init(&qp->inbound_q,
1272		     &qp->local_read,
1273		     &qp->local_write,
1274		     local_q, get_count_order(SCIF_NODE_QP_SIZE));
1275	scif_info.nodeid = scifdev->node;
1276
1277	scif_peer_register_device(scifdev);
1278
1279	scif_info.loopb_dev = scifdev;
1280	return err;
1281free_qpairs:
1282	kfree(scifdev->qpairs);
1283destroy_loopb_wq:
1284	destroy_workqueue(scif_info.loopb_wq);
1285destroy_intr:
1286	scif_destroy_intr_wq(scifdev);
1287exit:
1288	return err;
1289}
1290
1291/**
1292 * scif_destroy_loopback_qp - One time uninit work for Loopback Node Qp
1293 * @scifdev: SCIF device
1294 *
1295 * Destroys the workqueues and frees up the Ring Buffer and Queue Pair memory.
1296 */
1297int scif_destroy_loopback_qp(struct scif_dev *scifdev)
1298{
1299	scif_peer_unregister_device(scifdev);
1300	destroy_workqueue(scif_info.loopb_wq);
1301	scif_destroy_intr_wq(scifdev);
1302	kfree(scifdev->qpairs->outbound_q.rb_base);
1303	kfree(scifdev->qpairs);
1304	scifdev->sdev = NULL;
1305	scif_info.loopb_dev = NULL;
1306	return 0;
1307}
1308
1309void scif_destroy_p2p(struct scif_dev *scifdev)
1310{
1311	struct scif_dev *peer_dev;
1312	struct scif_p2p_info *p2p;
1313	struct list_head *pos, *tmp;
1314	int bd;
1315
1316	mutex_lock(&scif_info.conflock);
1317	/* Free P2P mappings in the given node for all its peer nodes */
1318	list_for_each_safe(pos, tmp, &scifdev->p2p) {
1319		p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1320		dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
1321			     p2p->sg_nentries[SCIF_PPI_MMIO],
1322			     DMA_BIDIRECTIONAL);
1323		dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
1324			     p2p->sg_nentries[SCIF_PPI_APER],
1325			     DMA_BIDIRECTIONAL);
1326		scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1327		scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1328		list_del(pos);
1329		kfree(p2p);
1330	}
1331
1332	/* Free P2P mapping created in the peer nodes for the given node */
1333	for (bd = SCIF_MGMT_NODE + 1; bd <= scif_info.maxid; bd++) {
1334		peer_dev = &scif_dev[bd];
1335		list_for_each_safe(pos, tmp, &peer_dev->p2p) {
1336			p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1337			if (p2p->ppi_peer_id == scifdev->node) {
1338				dma_unmap_sg(&peer_dev->sdev->dev,
1339					     p2p->ppi_sg[SCIF_PPI_MMIO],
1340					     p2p->sg_nentries[SCIF_PPI_MMIO],
1341					     DMA_BIDIRECTIONAL);
1342				dma_unmap_sg(&peer_dev->sdev->dev,
1343					     p2p->ppi_sg[SCIF_PPI_APER],
1344					     p2p->sg_nentries[SCIF_PPI_APER],
1345					     DMA_BIDIRECTIONAL);
1346				scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1347				scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1348				list_del(pos);
1349				kfree(p2p);
1350			}
1351		}
1352	}
1353	mutex_unlock(&scif_info.conflock);
1354}
1355