#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This file is part of the Cortix toolkit environment
# https://cortix.org
import os
import pickle
import logging
from multiprocessing import Process
from cortix.src.module import Module
from cortix.src.port import Port
[docs]class Network:
'''Cortix network.
Attributes
----------
num_networks: int
Number of instances of this class.
'''
num_networks = 0
[docs] def __init__(self):
'''Module super class constructor.
Attributes
----------
max_n_modules_for_data_copy_on_root: int
When using MPI the `network` will copy the data from all modules on the
root process. This can generate an `out of memory` condition. This variable
sets the maximum number of processes for which the data will be copied.
Default is 1000.
'''
self.id = Network.num_networks
self.name = 'network-'+str(self.id)
self.log = logging.getLogger('cortix')
self.max_n_modules_for_data_copy_on_root = 1000
self.modules = list()
self.gv_edges = list()
self.use_mpi = None
self.use_multiprocessing = None
Network.num_networks += 1
return
[docs] def module(self, m):
'''Add a module.
'''
assert isinstance(m, Module), 'm must be a module'
name = m.name
if not name:
name = m.__class__.__name__
if m not in self.modules:
m.use_mpi = self.use_mpi
m.use_multiprocessing = self.use_multiprocessing
self.modules.append(m)
m.id = len(self.modules)-1 # see module doc for module id
return
[docs] def add_module(self, m):
'''Alternative name to `module()`.
'''
self.module(m)
return
[docs] def connect(self, module_port_a, module_port_b, info=None):
'''Connect two modules using either their ports directly or inferred ports.
A connection always opens a channel for data communication in both ways.
That is, both sends and receives are allowed.
Note
----
The simplest form of usage is with arguments: (`module_a`, `module_b`).
In this case, a `port` with the name of `module_a` **must** exist in `module_b`,
and vice-versa (port names as `str` in lower case). In addition, the connect
must not be called again with these same two modules, else the underlying
connection will be overriden.
For more rigorous connection, the user is advised to fully specify the
module and the port in each list argument.
Parameters
----------
module_port_a: list([Module,Port]) or list([Module,str]) or Module
First `module`-`port` to connect.
module_port_b: list([Module,Port]) or list([Module,str]) or Module
Second `module`-`port` to connect.
info: str
Information on the directionality of the information flow. This is for
graph visualization purposes only. The default value will use the order
in the argument list to define the direction. Default: None. If set
to `bidiretional`, will create a double headed arrow in the graph figure.
'''
if info:
assert isinstance(info,str)
if isinstance(module_port_a, Module) and isinstance(module_port_b, Module):
module_a = module_port_a
module_b = module_port_b
assert module_a.name and module_b.name # sanity check
assert module_a in self.modules,'module %r not in network.'%module_a.name
assert module_b in self.modules,'module %r not in network.'%module_b.name
idx_a = self.modules.index(module_a)
idx_b = self.modules.index(module_b)
self.gv_edges.append( (str(idx_a),str(idx_b)) )
if info=='bidirectional':
self.gv_edges.append( (str(idx_b),str(idx_a)) )
port_a = module_a.get_port(module_b.name.lower())
port_b = module_b.get_port(module_a.name.lower())
port_a.connect(port_b)
elif isinstance(module_port_a, list) and isinstance(module_port_b, list):
assert len(module_port_a) == 2 and isinstance(module_port_a[0],Module) and\
(isinstance(module_port_a[1],str) or isinstance(module_port_a[1],Port))
assert len(module_port_b) == 2 and isinstance(module_port_b[0],Module) and\
(isinstance(module_port_b[1],str) or isinstance(module_port_b[1],Port))
module_a = module_port_a[0]
module_b = module_port_b[0]
assert module_a.name and module_b.name # sanity check
assert module_a in self.modules,'module %r not in network.'%module_a.name
assert module_b in self.modules,'module %r not in network.'%module_b.name
idx_a = self.modules.index(module_a)
idx_b = self.modules.index(module_b)
self.gv_edges.append( (str(idx_a),str(idx_b)) )
if info=='bidirectional':
self.gv_edges.append( (str(idx_b),str(idx_a)) )
if isinstance(module_port_a[1],str):
port_a = module_a.get_port(module_port_a[1])
elif isinstance(module_port_a[1],Port):
port_a = module_port_a[1]
else:
assert False, 'help!'
if isinstance(module_port_b[1],str):
port_b = module_b.get_port(module_port_b[1])
elif isinstance(module_port_b[1],Port):
port_b = module_port_b[1]
else:
assert False, 'help!'
port_a.connect(port_b)
else:
assert False,' not implemented.'
return
[docs] def run(self):
'''Run the network simulation.
This function concurrently executes the `cortix.src.module.run` function
for each module in the network. Modules are run using either MPI or
Multiprocessing, depending on the user configuration.
Note
----
When using multiprocessing, data from the modules state are copied to the master
process after the `run()` method of the modules is finished.
'''
assert len(self.modules) >= 1, 'the network must have a list of modules.'
# Remove the scratch file save directory
if self.rank == 0 or self.use_multiprocessing:
os.system('rm -rf .ctx-saved && mkdir .ctx-saved')
# Running under MPI
#------------------
if self.use_mpi:
# Synchronize in the beginning
assert self.size == len(self.modules) + 1,\
'Incorrect number of processes (Required %r, got %r)'%\
(len(self.modules) + 1, self.size)
self.comm.Barrier()
# Assign an mpi rank to all ports of a module using the module list index
# If a port has rank assignment from a previous run; leave it alone
for m in self.modules:
rank = self.modules.index(m)+1
for port in m.ports:
if port.rank is None:
port.rank = rank
# Assign a unique port id to all ports
# If a port has id assignment from a previous run; leave it alone
i = 0
for mod in self.modules:
for port in mod.ports:
port.use_mpi = self.use_mpi
if port.id is None:
port.id = i
i += 1
# Parallel run module in MPI
if self.rank != 0:
mod = self.modules[self.rank-1]
self.log.info('Launching Module {}'.format(mod))
mod.run_and_save()
# Sync here at the end
self.comm.Barrier()
# Running under Python multiprocessing
#-------------------------------------
else:
# Parallel run all modules in Python multiprocessing
processes = list()
count_states = 0
for mod in self.modules:
self.log.info('Launching Module {}'.format(mod))
p = Process( target=mod.run_and_save )
processes.append(p)
p.start()
# Synchronize at the end
for p in processes:
p.join()
# Reload saved modules
#---------------------
if self.use_mpi:
# make double sure all are in sync here before reading files from disk
self.comm.Barrier()
num_files = 0
for file_name in os.listdir('.ctx-saved'):
if file_name.endswith('.pkl'):
num_files += 1
file_name = os.path.join('.ctx-saved', file_name)
with open(file_name, 'rb') as f:
module = pickle.load(f)
# reintroduce logging
module.log = logging.getLogger('cortix')
self.modules[module.id] = module
if num_files and num_files != len(self.modules):
self.log.warn('Network::run(): not all modules reloaded from disk.')
if self.use_mpi:
# Make double sure all are in sync here before going forward
# this solves the problem of processes running behind reading files
# that do not exist anymore
self.comm.Barrier()
[docs] def draw( self, graph_attr=None, node_attr=None, engine='twopi', lr=False,
ports=False, node_shape='hexagon' ):
# Import here to avoid broken dependency. Only this method needs this.
from graphviz import Digraph
graph_attr = graph_attr if graph_attr else {'splines':'true','overlap':'scale',
'ranksep':'2.0'}
node_attr = node_attr if node_attr else {'shape':'hexagon','style':'filled'}
g = Digraph( name=self.name, comment='Network::graphviz',format='png',
graph_attr=graph_attr,node_attr=node_attr, engine=engine )
if lr: g.attr(rankdir='LR')
g.attr('node', shape=node_shape)
for id, m in enumerate(self.modules):
g.node(str(id), m.name)
for e in self.gv_edges:
g.edge(e[0], e[1])
g.render()
return g