root/drivers/misc/mic/scif/scif_nodeqp.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. scif_setup_qp_connect
  2. scif_setup_qp_accept
  3. scif_setup_qp_connect_response
  4. scif_send_msg_intr
  5. scif_qp_response
  6. scif_send_exit
  7. scif_setup_qp
  8. scif_p2p_freesg
  9. scif_p2p_setsg
  10. scif_init_p2p_info
  11. scif_deinit_p2p_info
  12. scif_node_connect
  13. scif_p2p_setup
  14. scif_display_message
  15. _scif_nodeqp_send
  16. scif_nodeqp_send
  17. scif_misc_handler
  18. scif_init
  19. scif_exit
  20. scif_exit_ack
  21. scif_node_add
  22. scif_poll_qp_state
  23. scif_node_add_ack
  24. scif_node_add_nack
  25. scif_node_remove
  26. scif_node_remove_ack
  27. scif_get_node_info_resp
  28. scif_msg_unknown
  29. scif_nodeqp_msg_handler
  30. scif_nodeqp_intrhandler
  31. scif_loopb_wq_handler
  32. scif_loopb_msg_handler
  33. scif_setup_loopback_qp
  34. scif_destroy_loopback_qp
  35. scif_destroy_p2p

   1 // SPDX-License-Identifier: GPL-2.0-only
   2 /*
   3  * Intel MIC Platform Software Stack (MPSS)
   4  *
   5  * Copyright(c) 2014 Intel Corporation.
   6  *
   7  * Intel SCIF driver.
   8  */
   9 #include "../bus/scif_bus.h"
  10 #include "scif_peer_bus.h"
  11 #include "scif_main.h"
  12 #include "scif_nodeqp.h"
  13 #include "scif_map.h"
  14 
  15 /*
  16  ************************************************************************
  17  * SCIF node Queue Pair (QP) setup flow:
  18  *
  19  * 1) SCIF driver gets probed with a scif_hw_dev via the scif_hw_bus
  20  * 2) scif_setup_qp(..) allocates the local qp and calls
  21  *      scif_setup_qp_connect(..) which allocates and maps the local
  22  *      buffer for the inbound QP
  23  * 3) The local node updates the device page with the DMA address of the QP
  24  * 4) A delayed work is scheduled (qp_dwork) which periodically reads if
  25  *      the peer node has updated its QP DMA address
  26  * 5) Once a valid non zero address is found in the QP DMA address field
  27  *      in the device page, the local node maps the remote node's QP,
  28  *      updates its outbound QP and sends a SCIF_INIT message to the peer
  29  * 6) The SCIF_INIT message is received by the peer node QP interrupt bottom
  30  *      half handler by calling scif_init(..)
  31  * 7) scif_init(..) registers a new SCIF peer node by calling
  32  *      scif_peer_register_device(..) which signifies the addition of a new
  33  *      SCIF node
  34  * 8) On the mgmt node, P2P network setup/teardown is initiated if all the
  35  *      remote nodes are online via scif_p2p_setup(..)
  36  * 9) For P2P setup, the host maps the remote nodes' aperture and memory
  37  *      bars and sends a SCIF_NODE_ADD message to both nodes
  38  * 10) As part of scif_nodeadd, both nodes set up their local inbound
  39  *      QPs and send a SCIF_NODE_ADD_ACK to the mgmt node
  40  * 11) As part of scif_node_add_ack(..) the mgmt node forwards the
  41  *      SCIF_NODE_ADD_ACK to the remote nodes
  42  * 12) As part of scif_node_add_ack(..) the remote nodes update their
  43  *      outbound QPs, make sure they can access memory on the remote node
  44  *      and then add a new SCIF peer node by calling
  45  *      scif_peer_register_device(..) which signifies the addition of a new
  46  *      SCIF node.
  47  * 13) The SCIF network is now established across all nodes.
  48  *
  49  ************************************************************************
  50  * SCIF node QP teardown flow (initiated by non mgmt node):
  51  *
  52  * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
  53  * 2) The device page QP DMA address field is updated with 0x0
  54  * 3) A non mgmt node now cleans up all local data structures and sends a
  55  *      SCIF_EXIT message to the peer and waits for a SCIF_EXIT_ACK
  56  * 4) As part of scif_exit(..) handling scif_disconnect_node(..) is called
  57  * 5) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the
  58  *      peers and waits for a SCIF_NODE_REMOVE_ACK
  59  * 6) As part of scif_node_remove(..) a remote node unregisters the peer
  60  *      node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
  61  * 7) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
  62  *      it sends itself a node remove message whose handling cleans up local
  63  *      data structures and unregisters the peer node from the SCIF network
  64  * 8) The mgmt node sends a SCIF_EXIT_ACK
  65  * 9) Upon receipt of the SCIF_EXIT_ACK the node initiating the teardown
  66  *      completes the SCIF remove routine
  67  * 10) The SCIF network is now torn down for the node initiating the
  68  *      teardown sequence
  69  *
  70  ************************************************************************
  71  * SCIF node QP teardown flow (initiated by mgmt node):
  72  *
  73  * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
  74  * 2) The device page QP DMA address field is updated with 0x0
  75  * 3) The mgmt node calls scif_disconnect_node(..)
  76  * 4) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the peers
  77  *      and waits for a SCIF_NODE_REMOVE_ACK
  78  * 5) As part of scif_node_remove(..) a remote node unregisters the peer
  79  *      node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
  80  * 6) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
  81  *      it unregisters the peer node from the SCIF network
  82  * 7) The mgmt node sends a SCIF_EXIT message and waits for a SCIF_EXIT_ACK.
  83  * 8) A non mgmt node upon receipt of a SCIF_EXIT message calls scif_stop(..)
  84  *      which would clean up local data structures for all SCIF nodes and
  85  *      then send a SCIF_EXIT_ACK back to the mgmt node
  86  * 9) Upon receipt of the SCIF_EXIT_ACK the the mgmt node sends itself a node
  87  *      remove message whose handling cleans up local data structures and
  88  *      destroys any P2P mappings.
  89  * 10) The SCIF hardware device for which a remove callback was received is now
  90  *      disconnected from the SCIF network.
  91  */
  92 /*
  93  * Initializes "local" data structures for the QP. Allocates the QP
  94  * ring buffer (rb) and initializes the "in bound" queue.
  95  */
  96 int scif_setup_qp_connect(struct scif_qp *qp, dma_addr_t *qp_offset,
  97                           int local_size, struct scif_dev *scifdev)
  98 {
  99         void *local_q = qp->inbound_q.rb_base;
 100         int err = 0;
 101         u32 tmp_rd = 0;
 102 
 103         spin_lock_init(&qp->send_lock);
 104         spin_lock_init(&qp->recv_lock);
 105 
 106         /* Allocate rb only if not already allocated */
 107         if (!local_q) {
 108                 local_q = kzalloc(local_size, GFP_KERNEL);
 109                 if (!local_q) {
 110                         err = -ENOMEM;
 111                         return err;
 112                 }
 113         }
 114 
 115         err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
 116         if (err)
 117                 goto kfree;
 118         /*
 119          * To setup the inbound_q, the buffer lives locally, the read pointer
 120          * is remote and the write pointer is local.
 121          */
 122         scif_rb_init(&qp->inbound_q,
 123                      &tmp_rd,
 124                      &qp->local_write,
 125                      local_q, get_count_order(local_size));
 126         /*
 127          * The read pointer is NULL initially and it is unsafe to use the ring
 128          * buffer til this changes!
 129          */
 130         qp->inbound_q.read_ptr = NULL;
 131         err = scif_map_single(qp_offset, qp,
 132                               scifdev, sizeof(struct scif_qp));
 133         if (err)
 134                 goto unmap;
 135         qp->local_qp = *qp_offset;
 136         return err;
 137 unmap:
 138         scif_unmap_single(qp->local_buf, scifdev, local_size);
 139         qp->local_buf = 0;
 140 kfree:
 141         kfree(local_q);
 142         return err;
 143 }
 144 
 145 /* When the other side has already done it's allocation, this is called */
 146 int scif_setup_qp_accept(struct scif_qp *qp, dma_addr_t *qp_offset,
 147                          dma_addr_t phys, int local_size,
 148                          struct scif_dev *scifdev)
 149 {
 150         void *local_q;
 151         void *remote_q;
 152         struct scif_qp *remote_qp;
 153         int remote_size;
 154         int err = 0;
 155 
 156         spin_lock_init(&qp->send_lock);
 157         spin_lock_init(&qp->recv_lock);
 158         /* Start by figuring out where we need to point */
 159         remote_qp = scif_ioremap(phys, sizeof(struct scif_qp), scifdev);
 160         if (!remote_qp)
 161                 return -EIO;
 162         qp->remote_qp = remote_qp;
 163         if (qp->remote_qp->magic != SCIFEP_MAGIC) {
 164                 err = -EIO;
 165                 goto iounmap;
 166         }
 167         qp->remote_buf = remote_qp->local_buf;
 168         remote_size = qp->remote_qp->inbound_q.size;
 169         remote_q = scif_ioremap(qp->remote_buf, remote_size, scifdev);
 170         if (!remote_q) {
 171                 err = -EIO;
 172                 goto iounmap;
 173         }
 174         qp->remote_qp->local_write = 0;
 175         /*
 176          * To setup the outbound_q, the buffer lives in remote memory,
 177          * the read pointer is local, the write pointer is remote
 178          */
 179         scif_rb_init(&qp->outbound_q,
 180                      &qp->local_read,
 181                      &qp->remote_qp->local_write,
 182                      remote_q,
 183                      get_count_order(remote_size));
 184         local_q = kzalloc(local_size, GFP_KERNEL);
 185         if (!local_q) {
 186                 err = -ENOMEM;
 187                 goto iounmap_1;
 188         }
 189         err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
 190         if (err)
 191                 goto kfree;
 192         qp->remote_qp->local_read = 0;
 193         /*
 194          * To setup the inbound_q, the buffer lives locally, the read pointer
 195          * is remote and the write pointer is local
 196          */
 197         scif_rb_init(&qp->inbound_q,
 198                      &qp->remote_qp->local_read,
 199                      &qp->local_write,
 200                      local_q, get_count_order(local_size));
 201         err = scif_map_single(qp_offset, qp, scifdev,
 202                               sizeof(struct scif_qp));
 203         if (err)
 204                 goto unmap;
 205         qp->local_qp = *qp_offset;
 206         return err;
 207 unmap:
 208         scif_unmap_single(qp->local_buf, scifdev, local_size);
 209         qp->local_buf = 0;
 210 kfree:
 211         kfree(local_q);
 212 iounmap_1:
 213         scif_iounmap(remote_q, remote_size, scifdev);
 214         qp->outbound_q.rb_base = NULL;
 215 iounmap:
 216         scif_iounmap(qp->remote_qp, sizeof(struct scif_qp), scifdev);
 217         qp->remote_qp = NULL;
 218         return err;
 219 }
 220 
 221 int scif_setup_qp_connect_response(struct scif_dev *scifdev,
 222                                    struct scif_qp *qp, u64 payload)
 223 {
 224         int err = 0;
 225         void *r_buf;
 226         int remote_size;
 227         phys_addr_t tmp_phys;
 228 
 229         qp->remote_qp = scif_ioremap(payload, sizeof(struct scif_qp), scifdev);
 230 
 231         if (!qp->remote_qp) {
 232                 err = -ENOMEM;
 233                 goto error;
 234         }
 235 
 236         if (qp->remote_qp->magic != SCIFEP_MAGIC) {
 237                 dev_err(&scifdev->sdev->dev,
 238                         "SCIFEP_MAGIC mismatch between self %d remote %d\n",
 239                         scif_dev[scif_info.nodeid].node, scifdev->node);
 240                 err = -ENODEV;
 241                 goto error;
 242         }
 243 
 244         tmp_phys = qp->remote_qp->local_buf;
 245         remote_size = qp->remote_qp->inbound_q.size;
 246         r_buf = scif_ioremap(tmp_phys, remote_size, scifdev);
 247 
 248         if (!r_buf)
 249                 return -EIO;
 250 
 251         qp->local_read = 0;
 252         scif_rb_init(&qp->outbound_q,
 253                      &qp->local_read,
 254                      &qp->remote_qp->local_write,
 255                      r_buf,
 256                      get_count_order(remote_size));
 257         /*
 258          * Because the node QP may already be processing an INIT message, set
 259          * the read pointer so the cached read offset isn't lost
 260          */
 261         qp->remote_qp->local_read = qp->inbound_q.current_read_offset;
 262         /*
 263          * resetup the inbound_q now that we know where the
 264          * inbound_read really is.
 265          */
 266         scif_rb_init(&qp->inbound_q,
 267                      &qp->remote_qp->local_read,
 268                      &qp->local_write,
 269                      qp->inbound_q.rb_base,
 270                      get_count_order(qp->inbound_q.size));
 271 error:
 272         return err;
 273 }
 274 
 275 static __always_inline void
 276 scif_send_msg_intr(struct scif_dev *scifdev)
 277 {
 278         struct scif_hw_dev *sdev = scifdev->sdev;
 279 
 280         if (scifdev_is_p2p(scifdev))
 281                 sdev->hw_ops->send_p2p_intr(sdev, scifdev->rdb, &scifdev->mmio);
 282         else
 283                 sdev->hw_ops->send_intr(sdev, scifdev->rdb);
 284 }
 285 
 286 int scif_qp_response(phys_addr_t phys, struct scif_dev *scifdev)
 287 {
 288         int err = 0;
 289         struct scifmsg msg;
 290 
 291         err = scif_setup_qp_connect_response(scifdev, scifdev->qpairs, phys);
 292         if (!err) {
 293                 /*
 294                  * Now that everything is setup and mapped, we're ready
 295                  * to tell the peer about our queue's location
 296                  */
 297                 msg.uop = SCIF_INIT;
 298                 msg.dst.node = scifdev->node;
 299                 err = scif_nodeqp_send(scifdev, &msg);
 300         }
 301         return err;
 302 }
 303 
 304 void scif_send_exit(struct scif_dev *scifdev)
 305 {
 306         struct scifmsg msg;
 307         int ret;
 308 
 309         scifdev->exit = OP_IN_PROGRESS;
 310         msg.uop = SCIF_EXIT;
 311         msg.src.node = scif_info.nodeid;
 312         msg.dst.node = scifdev->node;
 313         ret = scif_nodeqp_send(scifdev, &msg);
 314         if (ret)
 315                 goto done;
 316         /* Wait for a SCIF_EXIT_ACK message */
 317         wait_event_timeout(scif_info.exitwq, scifdev->exit == OP_COMPLETED,
 318                            SCIF_NODE_ALIVE_TIMEOUT);
 319 done:
 320         scifdev->exit = OP_IDLE;
 321 }
 322 
 323 int scif_setup_qp(struct scif_dev *scifdev)
 324 {
 325         int err = 0;
 326         int local_size;
 327         struct scif_qp *qp;
 328 
 329         local_size = SCIF_NODE_QP_SIZE;
 330 
 331         qp = kzalloc(sizeof(*qp), GFP_KERNEL);
 332         if (!qp) {
 333                 err = -ENOMEM;
 334                 return err;
 335         }
 336         qp->magic = SCIFEP_MAGIC;
 337         scifdev->qpairs = qp;
 338         err = scif_setup_qp_connect(qp, &scifdev->qp_dma_addr,
 339                                     local_size, scifdev);
 340         if (err)
 341                 goto free_qp;
 342         /*
 343          * We're as setup as we can be. The inbound_q is setup, w/o a usable
 344          * outbound q.  When we get a message, the read_ptr will be updated,
 345          * and we will pull the message.
 346          */
 347         return err;
 348 free_qp:
 349         kfree(scifdev->qpairs);
 350         scifdev->qpairs = NULL;
 351         return err;
 352 }
 353 
 354 static void scif_p2p_freesg(struct scatterlist *sg)
 355 {
 356         kfree(sg);
 357 }
 358 
 359 static struct scatterlist *
 360 scif_p2p_setsg(phys_addr_t pa, int page_size, int page_cnt)
 361 {
 362         struct scatterlist *sg;
 363         struct page *page;
 364         int i;
 365 
 366         sg = kcalloc(page_cnt, sizeof(struct scatterlist), GFP_KERNEL);
 367         if (!sg)
 368                 return NULL;
 369         sg_init_table(sg, page_cnt);
 370         for (i = 0; i < page_cnt; i++) {
 371                 page = pfn_to_page(pa >> PAGE_SHIFT);
 372                 sg_set_page(&sg[i], page, page_size, 0);
 373                 pa += page_size;
 374         }
 375         return sg;
 376 }
 377 
 378 /* Init p2p mappings required to access peerdev from scifdev */
 379 static struct scif_p2p_info *
 380 scif_init_p2p_info(struct scif_dev *scifdev, struct scif_dev *peerdev)
 381 {
 382         struct scif_p2p_info *p2p;
 383         int num_mmio_pages, num_aper_pages, sg_page_shift, err, num_aper_chunks;
 384         struct scif_hw_dev *psdev = peerdev->sdev;
 385         struct scif_hw_dev *sdev = scifdev->sdev;
 386 
 387         num_mmio_pages = psdev->mmio->len >> PAGE_SHIFT;
 388         num_aper_pages = psdev->aper->len >> PAGE_SHIFT;
 389 
 390         p2p = kzalloc(sizeof(*p2p), GFP_KERNEL);
 391         if (!p2p)
 392                 return NULL;
 393         p2p->ppi_sg[SCIF_PPI_MMIO] = scif_p2p_setsg(psdev->mmio->pa,
 394                                                     PAGE_SIZE, num_mmio_pages);
 395         if (!p2p->ppi_sg[SCIF_PPI_MMIO])
 396                 goto free_p2p;
 397         p2p->sg_nentries[SCIF_PPI_MMIO] = num_mmio_pages;
 398         sg_page_shift = get_order(min(psdev->aper->len, (u64)(1 << 30)));
 399         num_aper_chunks = num_aper_pages >> (sg_page_shift - PAGE_SHIFT);
 400         p2p->ppi_sg[SCIF_PPI_APER] = scif_p2p_setsg(psdev->aper->pa,
 401                                                     1 << sg_page_shift,
 402                                                     num_aper_chunks);
 403         p2p->sg_nentries[SCIF_PPI_APER] = num_aper_chunks;
 404         err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
 405                          num_mmio_pages, PCI_DMA_BIDIRECTIONAL);
 406         if (err != num_mmio_pages)
 407                 goto scif_p2p_free;
 408         err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
 409                          num_aper_chunks, PCI_DMA_BIDIRECTIONAL);
 410         if (err != num_aper_chunks)
 411                 goto dma_unmap;
 412         p2p->ppi_da[SCIF_PPI_MMIO] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_MMIO]);
 413         p2p->ppi_da[SCIF_PPI_APER] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_APER]);
 414         p2p->ppi_len[SCIF_PPI_MMIO] = num_mmio_pages;
 415         p2p->ppi_len[SCIF_PPI_APER] = num_aper_pages;
 416         p2p->ppi_peer_id = peerdev->node;
 417         return p2p;
 418 dma_unmap:
 419         dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
 420                      p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL);
 421 scif_p2p_free:
 422         scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
 423         scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
 424 free_p2p:
 425         kfree(p2p);
 426         return NULL;
 427 }
 428 
 429 /* Uninitialize and release resources from a p2p mapping */
 430 static void scif_deinit_p2p_info(struct scif_dev *scifdev,
 431                                  struct scif_p2p_info *p2p)
 432 {
 433         struct scif_hw_dev *sdev = scifdev->sdev;
 434 
 435         dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
 436                      p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL);
 437         dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
 438                      p2p->sg_nentries[SCIF_PPI_APER], DMA_BIDIRECTIONAL);
 439         scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
 440         scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
 441         kfree(p2p);
 442 }
 443 
 444 /**
 445  * scif_node_connect: Respond to SCIF_NODE_CONNECT interrupt message
 446  * @dst: Destination node
 447  *
 448  * Connect the src and dst node by setting up the p2p connection
 449  * between them. Management node here acts like a proxy.
 450  */
 451 static void scif_node_connect(struct scif_dev *scifdev, int dst)
 452 {
 453         struct scif_dev *dev_j = scifdev;
 454         struct scif_dev *dev_i = NULL;
 455         struct scif_p2p_info *p2p_ij = NULL;    /* bus addr for j from i */
 456         struct scif_p2p_info *p2p_ji = NULL;    /* bus addr for i from j */
 457         struct scif_p2p_info *p2p;
 458         struct list_head *pos, *tmp;
 459         struct scifmsg msg;
 460         int err;
 461         u64 tmppayload;
 462 
 463         if (dst < 1 || dst > scif_info.maxid)
 464                 return;
 465 
 466         dev_i = &scif_dev[dst];
 467 
 468         if (!_scifdev_alive(dev_i))
 469                 return;
 470         /*
 471          * If the p2p connection is already setup or in the process of setting
 472          * up then just ignore this request. The requested node will get
 473          * informed by SCIF_NODE_ADD_ACK or SCIF_NODE_ADD_NACK
 474          */
 475         if (!list_empty(&dev_i->p2p)) {
 476                 list_for_each_safe(pos, tmp, &dev_i->p2p) {
 477                         p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
 478                         if (p2p->ppi_peer_id == dev_j->node)
 479                                 return;
 480                 }
 481         }
 482         p2p_ij = scif_init_p2p_info(dev_i, dev_j);
 483         if (!p2p_ij)
 484                 return;
 485         p2p_ji = scif_init_p2p_info(dev_j, dev_i);
 486         if (!p2p_ji) {
 487                 scif_deinit_p2p_info(dev_i, p2p_ij);
 488                 return;
 489         }
 490         list_add_tail(&p2p_ij->ppi_list, &dev_i->p2p);
 491         list_add_tail(&p2p_ji->ppi_list, &dev_j->p2p);
 492 
 493         /*
 494          * Send a SCIF_NODE_ADD to dev_i, pass it its bus address
 495          * as seen from dev_j
 496          */
 497         msg.uop = SCIF_NODE_ADD;
 498         msg.src.node = dev_j->node;
 499         msg.dst.node = dev_i->node;
 500 
 501         msg.payload[0] = p2p_ji->ppi_da[SCIF_PPI_APER];
 502         msg.payload[1] = p2p_ij->ppi_da[SCIF_PPI_MMIO];
 503         msg.payload[2] = p2p_ij->ppi_da[SCIF_PPI_APER];
 504         msg.payload[3] = p2p_ij->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
 505 
 506         err = scif_nodeqp_send(dev_i,  &msg);
 507         if (err) {
 508                 dev_err(&scifdev->sdev->dev,
 509                         "%s %d error %d\n", __func__, __LINE__, err);
 510                 return;
 511         }
 512 
 513         /* Same as above but to dev_j */
 514         msg.uop = SCIF_NODE_ADD;
 515         msg.src.node = dev_i->node;
 516         msg.dst.node = dev_j->node;
 517 
 518         tmppayload = msg.payload[0];
 519         msg.payload[0] = msg.payload[2];
 520         msg.payload[2] = tmppayload;
 521         msg.payload[1] = p2p_ji->ppi_da[SCIF_PPI_MMIO];
 522         msg.payload[3] = p2p_ji->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
 523 
 524         scif_nodeqp_send(dev_j, &msg);
 525 }
 526 
 527 static void scif_p2p_setup(void)
 528 {
 529         int i, j;
 530 
 531         if (!scif_info.p2p_enable)
 532                 return;
 533 
 534         for (i = 1; i <= scif_info.maxid; i++)
 535                 if (!_scifdev_alive(&scif_dev[i]))
 536                         return;
 537 
 538         for (i = 1; i <= scif_info.maxid; i++) {
 539                 for (j = 1; j <= scif_info.maxid; j++) {
 540                         struct scif_dev *scifdev = &scif_dev[i];
 541 
 542                         if (i == j)
 543                                 continue;
 544                         scif_node_connect(scifdev, j);
 545                 }
 546         }
 547 }
 548 
 549 static char *message_types[] = {"BAD",
 550                                 "INIT",
 551                                 "EXIT",
 552                                 "SCIF_EXIT_ACK",
 553                                 "SCIF_NODE_ADD",
 554                                 "SCIF_NODE_ADD_ACK",
 555                                 "SCIF_NODE_ADD_NACK",
 556                                 "REMOVE_NODE",
 557                                 "REMOVE_NODE_ACK",
 558                                 "CNCT_REQ",
 559                                 "CNCT_GNT",
 560                                 "CNCT_GNTACK",
 561                                 "CNCT_GNTNACK",
 562                                 "CNCT_REJ",
 563                                 "DISCNCT",
 564                                 "DISCNT_ACK",
 565                                 "CLIENT_SENT",
 566                                 "CLIENT_RCVD",
 567                                 "SCIF_GET_NODE_INFO",
 568                                 "REGISTER",
 569                                 "REGISTER_ACK",
 570                                 "REGISTER_NACK",
 571                                 "UNREGISTER",
 572                                 "UNREGISTER_ACK",
 573                                 "UNREGISTER_NACK",
 574                                 "ALLOC_REQ",
 575                                 "ALLOC_GNT",
 576                                 "ALLOC_REJ",
 577                                 "FREE_PHYS",
 578                                 "FREE_VIRT",
 579                                 "MUNMAP",
 580                                 "MARK",
 581                                 "MARK_ACK",
 582                                 "MARK_NACK",
 583                                 "WAIT",
 584                                 "WAIT_ACK",
 585                                 "WAIT_NACK",
 586                                 "SIGNAL_LOCAL",
 587                                 "SIGNAL_REMOTE",
 588                                 "SIG_ACK",
 589                                 "SIG_NACK"};
 590 
 591 static void
 592 scif_display_message(struct scif_dev *scifdev, struct scifmsg *msg,
 593                      const char *label)
 594 {
 595         if (!scif_info.en_msg_log)
 596                 return;
 597         if (msg->uop > SCIF_MAX_MSG) {
 598                 dev_err(&scifdev->sdev->dev,
 599                         "%s: unknown msg type %d\n", label, msg->uop);
 600                 return;
 601         }
 602         dev_info(&scifdev->sdev->dev,
 603                  "%s: msg type %s, src %d:%d, dest %d:%d payload 0x%llx:0x%llx:0x%llx:0x%llx\n",
 604                  label, message_types[msg->uop], msg->src.node, msg->src.port,
 605                  msg->dst.node, msg->dst.port, msg->payload[0], msg->payload[1],
 606                  msg->payload[2], msg->payload[3]);
 607 }
 608 
 609 int _scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
 610 {
 611         struct scif_qp *qp = scifdev->qpairs;
 612         int err = -ENOMEM, loop_cnt = 0;
 613 
 614         scif_display_message(scifdev, msg, "Sent");
 615         if (!qp) {
 616                 err = -EINVAL;
 617                 goto error;
 618         }
 619         spin_lock(&qp->send_lock);
 620 
 621         while ((err = scif_rb_write(&qp->outbound_q,
 622                                     msg, sizeof(struct scifmsg)))) {
 623                 mdelay(1);
 624 #define SCIF_NODEQP_SEND_TO_MSEC (3 * 1000)
 625                 if (loop_cnt++ > (SCIF_NODEQP_SEND_TO_MSEC)) {
 626                         err = -ENODEV;
 627                         break;
 628                 }
 629         }
 630         if (!err)
 631                 scif_rb_commit(&qp->outbound_q);
 632         spin_unlock(&qp->send_lock);
 633         if (!err) {
 634                 if (scifdev_self(scifdev))
 635                         /*
 636                          * For loopback we need to emulate an interrupt by
 637                          * queuing work for the queue handling real node
 638                          * Qp interrupts.
 639                          */
 640                         queue_work(scifdev->intr_wq, &scifdev->intr_bh);
 641                 else
 642                         scif_send_msg_intr(scifdev);
 643         }
 644 error:
 645         if (err)
 646                 dev_dbg(&scifdev->sdev->dev,
 647                         "%s %d error %d uop %d\n",
 648                          __func__, __LINE__, err, msg->uop);
 649         return err;
 650 }
 651 
 652 /**
 653  * scif_nodeqp_send - Send a message on the node queue pair
 654  * @scifdev: Scif Device.
 655  * @msg: The message to be sent.
 656  */
 657 int scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
 658 {
 659         int err;
 660         struct device *spdev = NULL;
 661 
 662         if (msg->uop > SCIF_EXIT_ACK) {
 663                 /* Dont send messages once the exit flow has begun */
 664                 if (OP_IDLE != scifdev->exit)
 665                         return -ENODEV;
 666                 spdev = scif_get_peer_dev(scifdev);
 667                 if (IS_ERR(spdev)) {
 668                         err = PTR_ERR(spdev);
 669                         return err;
 670                 }
 671         }
 672         err = _scif_nodeqp_send(scifdev, msg);
 673         if (msg->uop > SCIF_EXIT_ACK)
 674                 scif_put_peer_dev(spdev);
 675         return err;
 676 }
 677 
 678 /*
 679  * scif_misc_handler:
 680  *
 681  * Work queue handler for servicing miscellaneous SCIF tasks.
 682  * Examples include:
 683  * 1) Remote fence requests.
 684  * 2) Destruction of temporary registered windows
 685  *    created during scif_vreadfrom()/scif_vwriteto().
 686  * 3) Cleanup of zombie endpoints.
 687  */
 688 void scif_misc_handler(struct work_struct *work)
 689 {
 690         scif_rma_handle_remote_fences();
 691         scif_rma_destroy_windows();
 692         scif_rma_destroy_tcw_invalid();
 693         scif_cleanup_zombie_epd();
 694 }
 695 
 696 /**
 697  * scif_init() - Respond to SCIF_INIT interrupt message
 698  * @scifdev:    Remote SCIF device node
 699  * @msg:        Interrupt message
 700  */
 701 static __always_inline void
 702 scif_init(struct scif_dev *scifdev, struct scifmsg *msg)
 703 {
 704         /*
 705          * Allow the thread waiting for device page updates for the peer QP DMA
 706          * address to complete initializing the inbound_q.
 707          */
 708         flush_delayed_work(&scifdev->qp_dwork);
 709 
 710         scif_peer_register_device(scifdev);
 711 
 712         if (scif_is_mgmt_node()) {
 713                 mutex_lock(&scif_info.conflock);
 714                 scif_p2p_setup();
 715                 mutex_unlock(&scif_info.conflock);
 716         }
 717 }
 718 
 719 /**
 720  * scif_exit() - Respond to SCIF_EXIT interrupt message
 721  * @scifdev:    Remote SCIF device node
 722  * @msg:        Interrupt message
 723  *
 724  * This function stops the SCIF interface for the node which sent
 725  * the SCIF_EXIT message and starts waiting for that node to
 726  * resetup the queue pair again.
 727  */
 728 static __always_inline void
 729 scif_exit(struct scif_dev *scifdev, struct scifmsg *unused)
 730 {
 731         scifdev->exit_ack_pending = true;
 732         if (scif_is_mgmt_node())
 733                 scif_disconnect_node(scifdev->node, false);
 734         else
 735                 scif_stop(scifdev);
 736         schedule_delayed_work(&scifdev->qp_dwork,
 737                               msecs_to_jiffies(1000));
 738 }
 739 
 740 /**
 741  * scif_exitack() - Respond to SCIF_EXIT_ACK interrupt message
 742  * @scifdev:    Remote SCIF device node
 743  * @msg:        Interrupt message
 744  *
 745  */
 746 static __always_inline void
 747 scif_exit_ack(struct scif_dev *scifdev, struct scifmsg *unused)
 748 {
 749         scifdev->exit = OP_COMPLETED;
 750         wake_up(&scif_info.exitwq);
 751 }
 752 
 753 /**
 754  * scif_node_add() - Respond to SCIF_NODE_ADD interrupt message
 755  * @scifdev:    Remote SCIF device node
 756  * @msg:        Interrupt message
 757  *
 758  * When the mgmt node driver has finished initializing a MIC node queue pair it
 759  * marks the node as online. It then looks for all currently online MIC cards
 760  * and send a SCIF_NODE_ADD message to identify the ID of the new card for
 761  * peer to peer initialization
 762  *
 763  * The local node allocates its incoming queue and sends its address in the
 764  * SCIF_NODE_ADD_ACK message back to the mgmt node, the mgmt node "reflects"
 765  * this message to the new node
 766  */
 767 static __always_inline void
 768 scif_node_add(struct scif_dev *scifdev, struct scifmsg *msg)
 769 {
 770         struct scif_dev *newdev;
 771         dma_addr_t qp_offset;
 772         int qp_connect;
 773         struct scif_hw_dev *sdev;
 774 
 775         dev_dbg(&scifdev->sdev->dev,
 776                 "Scifdev %d:%d received NODE_ADD msg for node %d\n",
 777                 scifdev->node, msg->dst.node, msg->src.node);
 778         dev_dbg(&scifdev->sdev->dev,
 779                 "Remote address for this node's aperture %llx\n",
 780                 msg->payload[0]);
 781         newdev = &scif_dev[msg->src.node];
 782         newdev->node = msg->src.node;
 783         newdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
 784         sdev = newdev->sdev;
 785 
 786         if (scif_setup_intr_wq(newdev)) {
 787                 dev_err(&scifdev->sdev->dev,
 788                         "failed to setup interrupts for %d\n", msg->src.node);
 789                 goto interrupt_setup_error;
 790         }
 791         newdev->mmio.va = ioremap_nocache(msg->payload[1], sdev->mmio->len);
 792         if (!newdev->mmio.va) {
 793                 dev_err(&scifdev->sdev->dev,
 794                         "failed to map mmio for %d\n", msg->src.node);
 795                 goto mmio_map_error;
 796         }
 797         newdev->qpairs = kzalloc(sizeof(*newdev->qpairs), GFP_KERNEL);
 798         if (!newdev->qpairs)
 799                 goto qp_alloc_error;
 800         /*
 801          * Set the base address of the remote node's memory since it gets
 802          * added to qp_offset
 803          */
 804         newdev->base_addr = msg->payload[0];
 805 
 806         qp_connect = scif_setup_qp_connect(newdev->qpairs, &qp_offset,
 807                                            SCIF_NODE_QP_SIZE, newdev);
 808         if (qp_connect) {
 809                 dev_err(&scifdev->sdev->dev,
 810                         "failed to setup qp_connect %d\n", qp_connect);
 811                 goto qp_connect_error;
 812         }
 813 
 814         newdev->db = sdev->hw_ops->next_db(sdev);
 815         newdev->cookie = sdev->hw_ops->request_irq(sdev, scif_intr_handler,
 816                                                    "SCIF_INTR", newdev,
 817                                                    newdev->db);
 818         if (IS_ERR(newdev->cookie))
 819                 goto qp_connect_error;
 820         newdev->qpairs->magic = SCIFEP_MAGIC;
 821         newdev->qpairs->qp_state = SCIF_QP_OFFLINE;
 822 
 823         msg->uop = SCIF_NODE_ADD_ACK;
 824         msg->dst.node = msg->src.node;
 825         msg->src.node = scif_info.nodeid;
 826         msg->payload[0] = qp_offset;
 827         msg->payload[2] = newdev->db;
 828         scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
 829         return;
 830 qp_connect_error:
 831         kfree(newdev->qpairs);
 832         newdev->qpairs = NULL;
 833 qp_alloc_error:
 834         iounmap(newdev->mmio.va);
 835         newdev->mmio.va = NULL;
 836 mmio_map_error:
 837 interrupt_setup_error:
 838         dev_err(&scifdev->sdev->dev,
 839                 "node add failed for node %d\n", msg->src.node);
 840         msg->uop = SCIF_NODE_ADD_NACK;
 841         msg->dst.node = msg->src.node;
 842         msg->src.node = scif_info.nodeid;
 843         scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
 844 }
 845 
 846 void scif_poll_qp_state(struct work_struct *work)
 847 {
 848 #define SCIF_NODE_QP_RETRY 100
 849 #define SCIF_NODE_QP_TIMEOUT 100
 850         struct scif_dev *peerdev = container_of(work, struct scif_dev,
 851                                                         p2p_dwork.work);
 852         struct scif_qp *qp = &peerdev->qpairs[0];
 853 
 854         if (qp->qp_state != SCIF_QP_ONLINE ||
 855             qp->remote_qp->qp_state != SCIF_QP_ONLINE) {
 856                 if (peerdev->p2p_retry++ == SCIF_NODE_QP_RETRY) {
 857                         dev_err(&peerdev->sdev->dev,
 858                                 "Warning: QP check timeout with state %d\n",
 859                                 qp->qp_state);
 860                         goto timeout;
 861                 }
 862                 schedule_delayed_work(&peerdev->p2p_dwork,
 863                                       msecs_to_jiffies(SCIF_NODE_QP_TIMEOUT));
 864                 return;
 865         }
 866         return;
 867 timeout:
 868         dev_err(&peerdev->sdev->dev,
 869                 "%s %d remote node %d offline,  state = 0x%x\n",
 870                 __func__, __LINE__, peerdev->node, qp->qp_state);
 871         qp->remote_qp->qp_state = SCIF_QP_OFFLINE;
 872         scif_peer_unregister_device(peerdev);
 873         scif_cleanup_scifdev(peerdev);
 874 }
 875 
 876 /**
 877  * scif_node_add_ack() - Respond to SCIF_NODE_ADD_ACK interrupt message
 878  * @scifdev:    Remote SCIF device node
 879  * @msg:        Interrupt message
 880  *
 881  * After a MIC node receives the SCIF_NODE_ADD_ACK message it send this
 882  * message to the mgmt node to confirm the sequence is finished.
 883  *
 884  */
 885 static __always_inline void
 886 scif_node_add_ack(struct scif_dev *scifdev, struct scifmsg *msg)
 887 {
 888         struct scif_dev *peerdev;
 889         struct scif_qp *qp;
 890         struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
 891 
 892         dev_dbg(&scifdev->sdev->dev,
 893                 "Scifdev %d received SCIF_NODE_ADD_ACK msg src %d dst %d\n",
 894                 scifdev->node, msg->src.node, msg->dst.node);
 895         dev_dbg(&scifdev->sdev->dev,
 896                 "payload %llx %llx %llx %llx\n", msg->payload[0],
 897                 msg->payload[1], msg->payload[2], msg->payload[3]);
 898         if (scif_is_mgmt_node()) {
 899                 /*
 900                  * the lock serializes with scif_qp_response_ack. The mgmt node
 901                  * is forwarding the NODE_ADD_ACK message from src to dst we
 902                  * need to make sure that the dst has already received a
 903                  * NODE_ADD for src and setup its end of the qp to dst
 904                  */
 905                 mutex_lock(&scif_info.conflock);
 906                 msg->payload[1] = scif_info.maxid;
 907                 scif_nodeqp_send(dst_dev, msg);
 908                 mutex_unlock(&scif_info.conflock);
 909                 return;
 910         }
 911         peerdev = &scif_dev[msg->src.node];
 912         peerdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
 913         peerdev->node = msg->src.node;
 914 
 915         qp = &peerdev->qpairs[0];
 916 
 917         if ((scif_setup_qp_connect_response(peerdev, &peerdev->qpairs[0],
 918                                             msg->payload[0])))
 919                 goto local_error;
 920         peerdev->rdb = msg->payload[2];
 921         qp->remote_qp->qp_state = SCIF_QP_ONLINE;
 922 
 923         scif_peer_register_device(peerdev);
 924 
 925         schedule_delayed_work(&peerdev->p2p_dwork, 0);
 926         return;
 927 local_error:
 928         scif_cleanup_scifdev(peerdev);
 929 }
 930 
 931 /**
 932  * scif_node_add_nack: Respond to SCIF_NODE_ADD_NACK interrupt message
 933  * @msg:        Interrupt message
 934  *
 935  * SCIF_NODE_ADD failed, so inform the waiting wq.
 936  */
 937 static __always_inline void
 938 scif_node_add_nack(struct scif_dev *scifdev, struct scifmsg *msg)
 939 {
 940         if (scif_is_mgmt_node()) {
 941                 struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
 942 
 943                 dev_dbg(&scifdev->sdev->dev,
 944                         "SCIF_NODE_ADD_NACK received from %d\n", scifdev->node);
 945                 scif_nodeqp_send(dst_dev, msg);
 946         }
 947 }
 948 
 949 /*
 950  * scif_node_remove: Handle SCIF_NODE_REMOVE message
 951  * @msg: Interrupt message
 952  *
 953  * Handle node removal.
 954  */
 955 static __always_inline void
 956 scif_node_remove(struct scif_dev *scifdev, struct scifmsg *msg)
 957 {
 958         int node = msg->payload[0];
 959         struct scif_dev *scdev = &scif_dev[node];
 960 
 961         scdev->node_remove_ack_pending = true;
 962         scif_handle_remove_node(node);
 963 }
 964 
 965 /*
 966  * scif_node_remove_ack: Handle SCIF_NODE_REMOVE_ACK message
 967  * @msg: Interrupt message
 968  *
 969  * The peer has acked a SCIF_NODE_REMOVE message.
 970  */
 971 static __always_inline void
 972 scif_node_remove_ack(struct scif_dev *scifdev, struct scifmsg *msg)
 973 {
 974         struct scif_dev *sdev = &scif_dev[msg->payload[0]];
 975 
 976         atomic_inc(&sdev->disconn_rescnt);
 977         wake_up(&sdev->disconn_wq);
 978 }
 979 
 980 /**
 981  * scif_get_node_info: Respond to SCIF_GET_NODE_INFO interrupt message
 982  * @msg:        Interrupt message
 983  *
 984  * Retrieve node info i.e maxid and total from the mgmt node.
 985  */
 986 static __always_inline void
 987 scif_get_node_info_resp(struct scif_dev *scifdev, struct scifmsg *msg)
 988 {
 989         if (scif_is_mgmt_node()) {
 990                 swap(msg->dst.node, msg->src.node);
 991                 mutex_lock(&scif_info.conflock);
 992                 msg->payload[1] = scif_info.maxid;
 993                 msg->payload[2] = scif_info.total;
 994                 mutex_unlock(&scif_info.conflock);
 995                 scif_nodeqp_send(scifdev, msg);
 996         } else {
 997                 struct completion *node_info =
 998                         (struct completion *)msg->payload[3];
 999 
1000                 mutex_lock(&scif_info.conflock);
1001                 scif_info.maxid = msg->payload[1];
1002                 scif_info.total = msg->payload[2];
1003                 complete_all(node_info);
1004                 mutex_unlock(&scif_info.conflock);
1005         }
1006 }
1007 
1008 static void
1009 scif_msg_unknown(struct scif_dev *scifdev, struct scifmsg *msg)
1010 {
1011         /* Bogus Node Qp Message? */
1012         dev_err(&scifdev->sdev->dev,
1013                 "Unknown message 0x%xn scifdev->node 0x%x\n",
1014                 msg->uop, scifdev->node);
1015 }
1016 
1017 static void (*scif_intr_func[SCIF_MAX_MSG + 1])
1018             (struct scif_dev *, struct scifmsg *msg) = {
1019         scif_msg_unknown,       /* Error */
1020         scif_init,              /* SCIF_INIT */
1021         scif_exit,              /* SCIF_EXIT */
1022         scif_exit_ack,          /* SCIF_EXIT_ACK */
1023         scif_node_add,          /* SCIF_NODE_ADD */
1024         scif_node_add_ack,      /* SCIF_NODE_ADD_ACK */
1025         scif_node_add_nack,     /* SCIF_NODE_ADD_NACK */
1026         scif_node_remove,       /* SCIF_NODE_REMOVE */
1027         scif_node_remove_ack,   /* SCIF_NODE_REMOVE_ACK */
1028         scif_cnctreq,           /* SCIF_CNCT_REQ */
1029         scif_cnctgnt,           /* SCIF_CNCT_GNT */
1030         scif_cnctgnt_ack,       /* SCIF_CNCT_GNTACK */
1031         scif_cnctgnt_nack,      /* SCIF_CNCT_GNTNACK */
1032         scif_cnctrej,           /* SCIF_CNCT_REJ */
1033         scif_discnct,           /* SCIF_DISCNCT */
1034         scif_discnt_ack,        /* SCIF_DISCNT_ACK */
1035         scif_clientsend,        /* SCIF_CLIENT_SENT */
1036         scif_clientrcvd,        /* SCIF_CLIENT_RCVD */
1037         scif_get_node_info_resp,/* SCIF_GET_NODE_INFO */
1038         scif_recv_reg,          /* SCIF_REGISTER */
1039         scif_recv_reg_ack,      /* SCIF_REGISTER_ACK */
1040         scif_recv_reg_nack,     /* SCIF_REGISTER_NACK */
1041         scif_recv_unreg,        /* SCIF_UNREGISTER */
1042         scif_recv_unreg_ack,    /* SCIF_UNREGISTER_ACK */
1043         scif_recv_unreg_nack,   /* SCIF_UNREGISTER_NACK */
1044         scif_alloc_req,         /* SCIF_ALLOC_REQ */
1045         scif_alloc_gnt_rej,     /* SCIF_ALLOC_GNT */
1046         scif_alloc_gnt_rej,     /* SCIF_ALLOC_REJ */
1047         scif_free_virt,         /* SCIF_FREE_VIRT */
1048         scif_recv_munmap,       /* SCIF_MUNMAP */
1049         scif_recv_mark,         /* SCIF_MARK */
1050         scif_recv_mark_resp,    /* SCIF_MARK_ACK */
1051         scif_recv_mark_resp,    /* SCIF_MARK_NACK */
1052         scif_recv_wait,         /* SCIF_WAIT */
1053         scif_recv_wait_resp,    /* SCIF_WAIT_ACK */
1054         scif_recv_wait_resp,    /* SCIF_WAIT_NACK */
1055         scif_recv_sig_local,    /* SCIF_SIG_LOCAL */
1056         scif_recv_sig_remote,   /* SCIF_SIG_REMOTE */
1057         scif_recv_sig_resp,     /* SCIF_SIG_ACK */
1058         scif_recv_sig_resp,     /* SCIF_SIG_NACK */
1059 };
1060 
1061 /**
1062  * scif_nodeqp_msg_handler() - Common handler for node messages
1063  * @scifdev: Remote device to respond to
1064  * @qp: Remote memory pointer
1065  * @msg: The message to be handled.
1066  *
1067  * This routine calls the appropriate routine to handle a Node Qp
1068  * message receipt
1069  */
1070 static int scif_max_msg_id = SCIF_MAX_MSG;
1071 
1072 static void
1073 scif_nodeqp_msg_handler(struct scif_dev *scifdev,
1074                         struct scif_qp *qp, struct scifmsg *msg)
1075 {
1076         scif_display_message(scifdev, msg, "Rcvd");
1077 
1078         if (msg->uop > (u32)scif_max_msg_id) {
1079                 /* Bogus Node Qp Message? */
1080                 dev_err(&scifdev->sdev->dev,
1081                         "Unknown message 0x%xn scifdev->node 0x%x\n",
1082                         msg->uop, scifdev->node);
1083                 return;
1084         }
1085 
1086         scif_intr_func[msg->uop](scifdev, msg);
1087 }
1088 
1089 /**
1090  * scif_nodeqp_intrhandler() - Interrupt handler for node messages
1091  * @scifdev:    Remote device to respond to
1092  * @qp:         Remote memory pointer
1093  *
1094  * This routine is triggered by the interrupt mechanism.  It reads
1095  * messages from the node queue RB and calls the Node QP Message handling
1096  * routine.
1097  */
1098 void scif_nodeqp_intrhandler(struct scif_dev *scifdev, struct scif_qp *qp)
1099 {
1100         struct scifmsg msg;
1101         int read_size;
1102 
1103         do {
1104                 read_size = scif_rb_get_next(&qp->inbound_q, &msg, sizeof(msg));
1105                 if (!read_size)
1106                         break;
1107                 scif_nodeqp_msg_handler(scifdev, qp, &msg);
1108                 /*
1109                  * The node queue pair is unmapped so skip the read pointer
1110                  * update after receipt of a SCIF_EXIT_ACK
1111                  */
1112                 if (SCIF_EXIT_ACK == msg.uop)
1113                         break;
1114                 scif_rb_update_read_ptr(&qp->inbound_q);
1115         } while (1);
1116 }
1117 
1118 /**
1119  * scif_loopb_wq_handler - Loopback Workqueue Handler.
1120  * @work: loop back work
1121  *
1122  * This work queue routine is invoked by the loopback work queue handler.
1123  * It grabs the recv lock, dequeues any available messages from the head
1124  * of the loopback message list, calls the node QP message handler,
1125  * waits for it to return, then frees up this message and dequeues more
1126  * elements of the list if available.
1127  */
1128 static void scif_loopb_wq_handler(struct work_struct *unused)
1129 {
1130         struct scif_dev *scifdev = scif_info.loopb_dev;
1131         struct scif_qp *qp = scifdev->qpairs;
1132         struct scif_loopb_msg *msg;
1133 
1134         do {
1135                 msg = NULL;
1136                 spin_lock(&qp->recv_lock);
1137                 if (!list_empty(&scif_info.loopb_recv_q)) {
1138                         msg = list_first_entry(&scif_info.loopb_recv_q,
1139                                                struct scif_loopb_msg,
1140                                                list);
1141                         list_del(&msg->list);
1142                 }
1143                 spin_unlock(&qp->recv_lock);
1144 
1145                 if (msg) {
1146                         scif_nodeqp_msg_handler(scifdev, qp, &msg->msg);
1147                         kfree(msg);
1148                 }
1149         } while (msg);
1150 }
1151 
1152 /**
1153  * scif_loopb_msg_handler() - Workqueue handler for loopback messages.
1154  * @scifdev: SCIF device
1155  * @qp: Queue pair.
1156  *
1157  * This work queue routine is triggered when a loopback message is received.
1158  *
1159  * We need special handling for receiving Node Qp messages on a loopback SCIF
1160  * device via two workqueues for receiving messages.
1161  *
1162  * The reason we need the extra workqueue which is not required with *normal*
1163  * non-loopback SCIF devices is the potential classic deadlock described below:
1164  *
1165  * Thread A tries to send a message on a loopback SCIF device and blocks since
1166  * there is no space in the RB while it has the send_lock held or another
1167  * lock called lock X for example.
1168  *
1169  * Thread B: The Loopback Node QP message receive workqueue receives the message
1170  * and tries to send a message (eg an ACK) to the loopback SCIF device. It tries
1171  * to grab the send lock again or lock X and deadlocks with Thread A. The RB
1172  * cannot be drained any further due to this classic deadlock.
1173  *
1174  * In order to avoid deadlocks as mentioned above we have an extra level of
1175  * indirection achieved by having two workqueues.
1176  * 1) The first workqueue whose handler is scif_loopb_msg_handler reads
1177  * messages from the Node QP RB, adds them to a list and queues work for the
1178  * second workqueue.
1179  *
1180  * 2) The second workqueue whose handler is scif_loopb_wq_handler dequeues
1181  * messages from the list, handles them, frees up the memory and dequeues
1182  * more elements from the list if possible.
1183  */
1184 int
1185 scif_loopb_msg_handler(struct scif_dev *scifdev, struct scif_qp *qp)
1186 {
1187         int read_size;
1188         struct scif_loopb_msg *msg;
1189 
1190         do {
1191                 msg = kmalloc(sizeof(*msg), GFP_KERNEL);
1192                 if (!msg)
1193                         return -ENOMEM;
1194                 read_size = scif_rb_get_next(&qp->inbound_q, &msg->msg,
1195                                              sizeof(struct scifmsg));
1196                 if (read_size != sizeof(struct scifmsg)) {
1197                         kfree(msg);
1198                         scif_rb_update_read_ptr(&qp->inbound_q);
1199                         break;
1200                 }
1201                 spin_lock(&qp->recv_lock);
1202                 list_add_tail(&msg->list, &scif_info.loopb_recv_q);
1203                 spin_unlock(&qp->recv_lock);
1204                 queue_work(scif_info.loopb_wq, &scif_info.loopb_work);
1205                 scif_rb_update_read_ptr(&qp->inbound_q);
1206         } while (read_size == sizeof(struct scifmsg));
1207         return read_size;
1208 }
1209 
1210 /**
1211  * scif_setup_loopback_qp - One time setup work for Loopback Node Qp.
1212  * @scifdev: SCIF device
1213  *
1214  * Sets up the required loopback workqueues, queue pairs and ring buffers
1215  */
1216 int scif_setup_loopback_qp(struct scif_dev *scifdev)
1217 {
1218         int err = 0;
1219         void *local_q;
1220         struct scif_qp *qp;
1221 
1222         err = scif_setup_intr_wq(scifdev);
1223         if (err)
1224                 goto exit;
1225         INIT_LIST_HEAD(&scif_info.loopb_recv_q);
1226         snprintf(scif_info.loopb_wqname, sizeof(scif_info.loopb_wqname),
1227                  "SCIF LOOPB %d", scifdev->node);
1228         scif_info.loopb_wq =
1229                 alloc_ordered_workqueue(scif_info.loopb_wqname, 0);
1230         if (!scif_info.loopb_wq) {
1231                 err = -ENOMEM;
1232                 goto destroy_intr;
1233         }
1234         INIT_WORK(&scif_info.loopb_work, scif_loopb_wq_handler);
1235         /* Allocate Self Qpair */
1236         scifdev->qpairs = kzalloc(sizeof(*scifdev->qpairs), GFP_KERNEL);
1237         if (!scifdev->qpairs) {
1238                 err = -ENOMEM;
1239                 goto destroy_loopb_wq;
1240         }
1241 
1242         qp = scifdev->qpairs;
1243         qp->magic = SCIFEP_MAGIC;
1244         spin_lock_init(&qp->send_lock);
1245         spin_lock_init(&qp->recv_lock);
1246 
1247         local_q = kzalloc(SCIF_NODE_QP_SIZE, GFP_KERNEL);
1248         if (!local_q) {
1249                 err = -ENOMEM;
1250                 goto free_qpairs;
1251         }
1252         /*
1253          * For loopback the inbound_q and outbound_q are essentially the same
1254          * since the Node sends a message on the loopback interface to the
1255          * outbound_q which is then received on the inbound_q.
1256          */
1257         scif_rb_init(&qp->outbound_q,
1258                      &qp->local_read,
1259                      &qp->local_write,
1260                      local_q, get_count_order(SCIF_NODE_QP_SIZE));
1261 
1262         scif_rb_init(&qp->inbound_q,
1263                      &qp->local_read,
1264                      &qp->local_write,
1265                      local_q, get_count_order(SCIF_NODE_QP_SIZE));
1266         scif_info.nodeid = scifdev->node;
1267 
1268         scif_peer_register_device(scifdev);
1269 
1270         scif_info.loopb_dev = scifdev;
1271         return err;
1272 free_qpairs:
1273         kfree(scifdev->qpairs);
1274 destroy_loopb_wq:
1275         destroy_workqueue(scif_info.loopb_wq);
1276 destroy_intr:
1277         scif_destroy_intr_wq(scifdev);
1278 exit:
1279         return err;
1280 }
1281 
1282 /**
1283  * scif_destroy_loopback_qp - One time uninit work for Loopback Node Qp
1284  * @scifdev: SCIF device
1285  *
1286  * Destroys the workqueues and frees up the Ring Buffer and Queue Pair memory.
1287  */
1288 int scif_destroy_loopback_qp(struct scif_dev *scifdev)
1289 {
1290         scif_peer_unregister_device(scifdev);
1291         destroy_workqueue(scif_info.loopb_wq);
1292         scif_destroy_intr_wq(scifdev);
1293         kfree(scifdev->qpairs->outbound_q.rb_base);
1294         kfree(scifdev->qpairs);
1295         scifdev->sdev = NULL;
1296         scif_info.loopb_dev = NULL;
1297         return 0;
1298 }
1299 
1300 void scif_destroy_p2p(struct scif_dev *scifdev)
1301 {
1302         struct scif_dev *peer_dev;
1303         struct scif_p2p_info *p2p;
1304         struct list_head *pos, *tmp;
1305         int bd;
1306 
1307         mutex_lock(&scif_info.conflock);
1308         /* Free P2P mappings in the given node for all its peer nodes */
1309         list_for_each_safe(pos, tmp, &scifdev->p2p) {
1310                 p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1311                 dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
1312                              p2p->sg_nentries[SCIF_PPI_MMIO],
1313                              DMA_BIDIRECTIONAL);
1314                 dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
1315                              p2p->sg_nentries[SCIF_PPI_APER],
1316                              DMA_BIDIRECTIONAL);
1317                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1318                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1319                 list_del(pos);
1320                 kfree(p2p);
1321         }
1322 
1323         /* Free P2P mapping created in the peer nodes for the given node */
1324         for (bd = SCIF_MGMT_NODE + 1; bd <= scif_info.maxid; bd++) {
1325                 peer_dev = &scif_dev[bd];
1326                 list_for_each_safe(pos, tmp, &peer_dev->p2p) {
1327                         p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1328                         if (p2p->ppi_peer_id == scifdev->node) {
1329                                 dma_unmap_sg(&peer_dev->sdev->dev,
1330                                              p2p->ppi_sg[SCIF_PPI_MMIO],
1331                                              p2p->sg_nentries[SCIF_PPI_MMIO],
1332                                              DMA_BIDIRECTIONAL);
1333                                 dma_unmap_sg(&peer_dev->sdev->dev,
1334                                              p2p->ppi_sg[SCIF_PPI_APER],
1335                                              p2p->sg_nentries[SCIF_PPI_APER],
1336                                              DMA_BIDIRECTIONAL);
1337                                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1338                                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1339                                 list_del(pos);
1340                                 kfree(p2p);
1341                         }
1342                 }
1343         }
1344         mutex_unlock(&scif_info.conflock);
1345 }

/* [<][>][^][v][top][bottom][index][help] */