#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This file is part of the Cortix toolkit environment
# https://cortix.org
import os
import pickle
import logging
from cortix.src.port import Port
[docs]class Module:
'''Cortix module super class.
This class provides facilities for creating modules within the Cortix network.
Cortix will map one object of this class to either a Multiprocessing or MPI
process depending on the user's configuration.
Note
----
This class is to be inherited by every Cortix module.
In order to execute, modules *must* override the `run` method, which will be
executed during the simulation.
'''
[docs] def __init__(self):
'''Module super class constructor.
Note
----
This constructor must be called explicitly in the constructor of every
Cortix module like so:
super().__init__()
Attributes
----------
name: str
A name given to the instance. Default is the derived class name.
port_names_expected: list(str), None
A list of names of ports expected in the module. This will be compared
to port names during runtime to check against the intended use of the
module.
state: any
Any `pickle-able` data structure to be passed in a `multiprocessing.Queue`
to the parent process or to be gathered in the root MPI process.
Default is `None`.
use_mpi: bool
`True` for MPI, `False` for Multiprocessing
use_multiprocessing: bool
`False` for MPI, `True` for Multiprocessing
ports: list(Port)
A list of ports contained by the module
id: int
An integer set by the external network once a module is added to it.
The `id` is the position of the module in the network list.
Default: None.
__network: Network
An internal network inherited by the derived module for nested networks.
'''
self.name = self.__class__.__name__
self.port_names_expected = None
self.state = None
self.use_mpi = False
self.use_multiprocessing = True
self.ports = list()
self.log = logging.getLogger('cortix')
self.save = False
self.id = None
self._network = None
[docs] def send(self, data, port):
'''Send data through a given port.
Parameters
----------
data: any
The data being sent out - must be pickleable
port: Port, str
A Port object to send the data through, or its string name
'''
if isinstance(port, str):
port = self.get_port(port)
elif isinstance(port, Port):
assert port in self.ports, "Unknown port!"
else:
raise TypeError("port must be of Port or String type")
port.send(data)
[docs] def recv(self, port):
'''Receive data from a given port
Warning
-------
This function will block until data is available
Parameters
----------
port: Port, str
A Port object to send the data through, or its string name
Returns
-------
data: any
The data received through the port
'''
if isinstance(port, str):
port = self.get_port(port)
elif isinstance(port, Port):
assert port in self.ports, "Unknown port!"
else:
raise TypeError("port must be of Port or String type")
return port.recv()
[docs] def get_port(self, name):
'''Get port by name; if it does not exist, create one.
Parameters
----------
name: str
The name of the port to get
Returns
-------
port: Port
The port object with the corresponding name
'''
assert isinstance(name, str), 'port name must be of type str'
port = None
for p in self.ports:
if p.name == name:
port = p
break
if port is None:
if self.port_names_expected:
assert name in self.port_names_expected,\
'port name: {}, not expected by module: {}'.format(name,self)
port = Port(name,self.use_mpi)
self.ports.append(port)
return port
def __set_network(self, n):
assert isinstance(n,Network)
n.use_mpi = self.use_mpi
n.use_multiprocessing = self.use_multiprocessing
self._network = n
return
def __get_network(self):
return self._network
network = property(__get_network, __set_network, None, None)
[docs] def run(self, *args):
'''Module run function
Run method with an option to pass data back to the parent process when running
in Python multiprocessing mode. If the user does not want to share data with
the parent process, this function can be overriden with `run(self)`
or `run(self, *args)` as long as `self.state = None`.
If `self.state` points to anything but `None`, the user must use
`run(self, *args).
Notes
-----
When in multiprocessing, `*args` has two elements: `comm_idx` and `comm_state`.
To pass back the state of the module, the user should insert the provided
index `comm_idx` and the `state` into the queue as follows:
if self.use_multiprocessing:
try:
pickle.dumps(self.state)
except pickle.PicklingError:
args[1].put((arg[0],None))
else:
args[1].put((arg[0],self.state))
at the bottom of the user defined `run()` function.
Warning
-------
This function must be overridden by all Cortix modules
Parameters
----------
arg[0]: int
Index of the state in the communication queue.
arg[1]: multiprocessing.Queue
When using the Python `multiprocessing` library `state_comm` must have
the module's `self.state` in it. That is,
`state_comm.put((idx_comm,self.state))` must be the last command in the
method before `return`. In addition, self.state must be `pickle-able`.
'''
raise NotImplementedError('Module must implement run()')
[docs] def run_and_save(self):
self.run()
if self.save:
file_name = os.path.join(".ctx-saved", "{}_".format(self.__class__.__name__))
if self.use_mpi:
from mpi4py import MPI
file_name += str(MPI.COMM_WORLD.rank)
else:
file_name += str(os.getpid())
file_name += ".pkl"
self.ports = list() # reset ports since they can't be pickled
self.log = None
try:
with open(file_name, "wb") as f:
pickle.dump( self, f )
except pickle.PicklingError:
print("Unable to pickle {}!".format(file_name))