The simple logging example can be run via:
- mpiexec -n 4 python mpi_logger.py
For the compute and logging example:
- mpiexec -n 4 python mock_execution.py
The mpi scheduler is provided by:
The simple logging example can be run via:
For the compute and logging example:
The mpi scheduler is provided by:
#!/usr/bin/env python | |
from os import makedirs | |
from os.path import exists | |
import logging | |
from mpi4py import MPI | |
import schwimmbad | |
import structlog | |
from mpi_logger import MPIFileHandler, COMMON_PROCESSORS | |
from task import SomeTask | |
# configure structlog to log in a specific way | |
structlog.configure( | |
processors=COMMON_PROCESSORS, | |
logger_factory=structlog.stdlib.LoggerFactory() | |
) | |
# I/O handler | |
HANDLER = MPIFileHandler("execution.log") | |
FORMATTER = logging.Formatter('%(message)s') | |
HANDLER.setFormatter(FORMATTER) | |
# initialise a logger | |
LOGGER = logging.getLogger('status') | |
LOGGER.setLevel(logging.DEBUG) | |
LOGGER.addHandler(HANDLER) | |
def main(): | |
""" | |
Simulate the execution of a bunch of tasks using MPI, | |
as well as setup MPI logging. | |
We should see all processor ranks logging similar events right up | |
till `pool = schwimmbad.choose_pool(mpi=True)` is called. | |
After which only rank 0 will be logging events, and executing lines | |
of code. | |
All tasks should contain the same value for rank as an argument, | |
and the value will be zero. | |
The tasks themeselves will be carried out by the workers, | |
ranks 1 and higher, and rank 0 will act as a scheduler. | |
""" | |
# comm and processor info | |
comm = MPI.COMM_WORLD | |
rank = comm.Get_rank() | |
size = comm.Get_size() | |
# bind the logs to include the rank and size | |
log = structlog.get_logger('status').bind(rank=rank, size=size) | |
log.info('checking rank and size') | |
# simpler mechanism the tells slaves to wait for the master's instructions | |
log.info('about to initialise an MPI pool') | |
pool = schwimmbad.choose_pool(mpi=True) | |
# after pool is created, only rank 0 will be executing the following lines | |
log.info('MPI pool chosen') | |
# change as required | |
outdir = 'outdir' | |
bands = range(1, 21) | |
# the task class that calls some function | |
task = SomeTask() | |
log.info('task chosen') | |
# define the list of tasks and the input parameters | |
args = [(outdir, band, rank) for band in bands] | |
# the rank arg should always be zero (remember we defined the MPI pool) | |
log.info(args=args) | |
if not exists(outdir): | |
log.info('create output directory') | |
makedirs(outdir) | |
# map tasks across the pool of workers | |
# rank 0 acts only as a scheduler, ranks 1 and higher are workers | |
pool.map(task, args) | |
pool.close() | |
log.info('finished processing') | |
if __name__ == "__main__": | |
main() |
#!/usr/bin/env python | |
""" | |
Logging configuration for JSON Lines structured logging. | |
Defines structured logging for: | |
* Errors -- qualname error | |
* Status messages -- qualname status | |
""" | |
import logging | |
from mpi4py import MPI | |
import structlog | |
from structlog.processors import JSONRenderer | |
COMMON_PROCESSORS = [ | |
structlog.stdlib.add_log_level, | |
structlog.processors.TimeStamper(fmt="ISO"), | |
structlog.processors.StackInfoRenderer(), | |
structlog.processors.format_exc_info, | |
JSONRenderer(sort_keys=True) | |
] | |
def get_wrapped_logger(logger_name: str = 'root', **kwargs): | |
""" Returns a struct log equivalent for the named logger """ | |
return structlog.wrap_logger( | |
logging.getLogger(logger_name), | |
COMMON_PROCESSORS, | |
**kwargs | |
) | |
class FormatJSONL(logging.Formatter): | |
""" Prevents printing of the stack trace to enable JSON lines output """ | |
def formatException(self, ei): | |
""" Disables printing separate stack traces """ | |
return | |
ERROR_LOGGER = get_wrapped_logger('error', stack_info=True) | |
STATUS_LOGGER = get_wrapped_logger('status') | |
class MPIIOStream(object): | |
""" | |
A very basic MPI stream handler for synchronised I/O. | |
""" | |
def __init__(self, filename, comm, mode): | |
self._file = MPI.File.Open(comm, filename, mode) | |
self._file.Set_atomicity(True) | |
def write(self, msg): | |
# if for some reason we don't have a unicode string... | |
try: | |
msg = msg.encode() | |
except AttributeError: | |
pass | |
self._file.Write_shared(msg) | |
def sync(self): | |
""" | |
Synchronise the processes | |
""" | |
self._file.Sync() | |
def close(self): | |
self.sync() | |
self._file.Close() | |
class MPIFileHandler(logging.StreamHandler): | |
""" | |
A basic MPI file handler for writing log files. | |
Internally opens a synchronised MPI I/O stream via MPIIOStream. | |
Ideas and some code from: | |
* https://groups.google.com/forum/#!topic/mpi4py/SaNzc8bdj6U | |
* https://gist.github.com/JohnCEarls/8172807 | |
* https://stackoverflow.com/questions/45680050/cannot-write-to-shared-mpi-file-with-mpi4py | |
""" | |
def __init__(self, filename, | |
mode=MPI.MODE_WRONLY|MPI.MODE_CREATE, comm=MPI.COMM_WORLD): | |
self.filename = filename | |
self.mode = mode | |
self.comm = comm | |
super(MPIFileHandler, self).__init__(self._open()) | |
def _open(self): | |
stream = MPIIOStream(self.filename, self.comm, self.mode) | |
return stream | |
def close(self): | |
if self.stream: | |
self.stream.close() | |
self.stream = None | |
def emit(self, record): | |
""" | |
Emit a record. | |
We have to override emit, as the logging.StreamHandler has 2 calls | |
to 'write'. The first for the message, and the second for the | |
terminator. This posed a problem for mpi, where a second process | |
could call 'write' in between these two calls and create a | |
conjoined log message. | |
""" | |
msg = self.format(record) | |
self.stream.write('{}{}'.format(msg, self.terminator)) | |
self.flush() | |
def main(): | |
""" | |
A sample test run. | |
""" | |
comm = MPI.COMM_WORLD | |
logger = logging.getLogger("node[%i]"%comm.rank) | |
# logger = logging.getLogger("func-status") # another name example | |
logger.setLevel(logging.DEBUG) | |
mpi_handler = MPIFileHandler("test.log") | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
mpi_handler.setFormatter(formatter) | |
logger.addHandler(mpi_handler) | |
# sample log levels | |
logger.debug('debug message') | |
logger.info('info message') | |
logger.warn('warn message') | |
logger.error('error message') | |
logger.critical('critical message') | |
if __name__ == "__main__": | |
main() |
#!/usr/bin/env python | |
from os.path import join as pjoin | |
import numpy | |
import h5py | |
from mpi4py import MPI | |
import structlog | |
from mpi_logger import COMMON_PROCESSORS | |
# log file i/o is initialised elsewhere | |
LOG = structlog.get_logger('status') | |
# which rank worker will do the work (should always be 1 or higher) | |
COMM = MPI.COMM_WORLD | |
RANK = COMM.Get_rank() | |
class SomeTask(object): | |
""" | |
A simple class definition (similar to luigi) that defines | |
a work method for custom functions. | |
""" | |
def work(self, outdir, band, rank): | |
# rank2 will be the process executing the work | |
# but the arg 'rank' will always be zero, as rank 0 defined the work | |
log = LOG.bind(rank=rank, rank2=RANK) | |
log.info("start processing band: {}".format(band)) | |
out_fname = pjoin(outdir, 'band-{}.h5'.format(band)) | |
with h5py.File(out_fname, 'w') as fid: | |
data = numpy.random.randint(0, 10001, (4000, 4000)) | |
kwargs = {'data': data, | |
'compression': 'lzf', | |
'chunks': (1, 4000), | |
'shuffle': True} | |
fid.create_dataset('data', **kwargs) | |
log.info("finished processing band: {}".format(band)) | |
def __call__(self, args): | |
self.work(*args) |