Source code for port

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This file is part of the Cortix toolkit environment

from multiprocessing import Pipe

[docs]class Port: '''Provides a method of communication between modules. The Port class provides an interface for creating ports and connecting them to other ports for the purpose of data transfer. Data exchange takes place by send and/or receive calls on a given port. The concept of a port is that of a data transfer "interaction." This can be one- or two-way with sends and receives. A port is connected to only one other port; as two ends of a pipe are connected. '''
[docs] def __init__(self, name=None, use_mpi=False): '''Constructs a Port object Parameters ---------- name: str The name of the Port object use_mpi: bool True for MPI, False for Multiprocessing Attributes ---------- id: int name: string use_mpi: bool ''' = None = name self.use_mpi = use_mpi if self.use_mpi: from mpi4py import MPI self.comm = MPI.COMM_WORLD self.rank = None else: self.pipe = None self.connected_port = None
[docs] def connect(self, port): '''Connect this port to another port Ports must be connected for data to flow between them. Parameters ---------- port: Port A Port object to connect to ''' assert isinstance(port, Port), 'Connecting port must be of Port type' self.connected_port = port port.connected_port = self port.use_mpi = self.use_mpi if not port.use_mpi: (self.pipe, port.pipe) = Pipe()
[docs] def send(self, data, tag=None): '''Send data to the connected port. If the sending port is not connected do nothing. Parameters ---------- data: any This data must be pickleable tag: int, optional MPI tag used in sending data ''' if not tag: tag = if self.connected_port: if self.use_mpi: # This is an MPI blocking send self.comm.send(data, dest=self.connected_port.rank, tag=tag) else: self.pipe.send(data)
[docs] def recv(self): '''Receive data from the connected port. Warning ------- This function will block if no data has been sent yet. Returns -------- data: any ''' if self.connected_port: if self.use_mpi: # This is an MPI blocking receive return self.comm.recv(source=self.connected_port.rank, else: return self.pipe.recv()
[docs] def __eq__(self, other): '''Check for port equality''' return ==
[docs] def __repr__(self): '''Port name representation''' return
if __name__ == '__main__': # Create some ports p1 = Port('test1') p2 = Port('test2') # Connect the ports p1.connect(p2) # View connections print(p1) print(p2) print(p1.connected_port) print(p2.connected_port)