""" provides sort-of generic parallel framework For debugging: import multistar.parallel multistar.parallel.PARALLELMODE = 'serial' IMPORTANT: This needs to be done before any modules that depend on the parallel features are imported as it us used to derive classes. You may also set the environment variable export MULTISTAR_PARALLELMODE=serial to ensure it is done first. """ from time import time from os import nice, getenv from itertools import product from re import compile, Pattern from pickle import load, dump from io import IOBase from uuid import uuid1 from multiprocessing import Process, cpu_count, set_start_method, \ Manager, ProcessError from pathlib import Path from types import FunctionType from copy import copy from queue import Empty from glob import glob from numpy import iterable, array as np_array, where from utils import cpickle, cload from human import time2human STARTMETHOD = 'spawn' PARALLELMODE = 'parallel' # 'parallel', 'pool', 'serial' def save_model(data, task, filename, path): # data = abu_format(data) if isinstance(filename, FunctionType): filename = filename(**data) elif isinstance(filename, str): filename = filename.format(**data) elif filename is Ellipsis: filename = '_'.join(f'{k}={v:<5}'.replace(' ','') for k,v in data.items()) +'.pickle.xz' if hasattr(task, 'save'): task.save(filename, path) else: if filename is None: filename = uuid1().hex + '.pickle.xz' if path is not None: filename = Path(path) / filename filename = Path(filename).expanduser() cpickle(task, filename) class ParallelProcess(Process): def __init__(self, qi, qo, nice=19, task=None, return_data=True, tags=('_sequence',)): super().__init__() self.qi = qi self.qo = qo self.nice = nice self.task = task self.return_data = return_data self.tags = tags def run(self): nice(self.nice) while True: data = self.qi.get() if data is None: self.qi.task_done() try: self.qo.close() except AttributeError: pass break path = data.pop('path', None) filename = data.pop('filename', None) return_data = data.pop('return_data', self.return_data) starttime = time() tags = {k:v for k in self.tags if (v := data.pop(k, None)) is not None} result = data.pop('task', self.task)(**data) data |= tags stoptime = time() data['_starttime'] = starttime data['_stoptime'] = stoptime if filename is not None or path is not None: save_model(data, result, filename, path) elif self.qo is not None: if return_data: self.qo.put((data, result)) else: self.qo.put(result) self.qi.task_done() class PoolFunction(object): def __init__(self, task=None, return_data=True): self.task = task self.return_data = return_data def __call__(self, data): path = data.pop('path', None) filename = data.pop('filename', None) return_data = data.pop('return_data', self.return_data) result = data.pop('task', self.task)(**data) if filename is not None or path is not None: save_model(data, result, filename, path) if return_data: return data, result else: return result class Result(object): def __init__(self, data, result): self.result = result self.data = data def __getattr__(self, attr): if attr != 'data' and hasattr(self, 'data'): if attr in self.data: return self.data[attr] raise AttributeError() def __iter__(self): yield self.data yield self.result def __repr__(self): return f'{self.__class__.__name__}({repr(self.data).replace(" ","")} : {repr(self.result)})' def __lt__(self, other): for k,v in self.data.items(): vo = other.data[k] try: if v < vo: return True elif v > vo: return False except: v = str(v) vo = str(vo) if v < vo: return True elif v > vo: return False return False class Base(object): _pickle_exclude = ( compile(r'_plot_.*_(?:ax|fig)\d*'), ) _pickle_exclude_class = ( 'NucPlot', 'AxesSubplot', 'Figure', ) def __getstate__(self): store = dict() for k,v in self.__dict__.items(): exclude = False for r in self._pickle_exclude: if isinstance(r, str): if k == r: exclude = True break elif r.fullmatch(k) is not None: exclude = True break for r in self._pickle_exclude_class: n = type(v).__name__ if isinstance(r, str): if n == r: exclude = True break elif isinstance(r, Pattern): if r.fullmatch(n) is not None: exclude = True break elif isinstance(r, type): if isinstance(v, r): exclude = True break else: raise AttributeError(f'unknown exclude {r}') if not exclude: store[k] = v return store # these routines seem somewhat generic def save(self, filename = None, path = None): if isinstance(filename, IOBase): dump(self, filename) else: if filename is None: if not hasattr(self, 'uuid'): self.uuid = uuid1() filename = f'{self.__class__.__name__}_{self.uuid.hex}.pickle.xz' # filename = eval(f"f'{filename}'") if path is not None: filename = Path(path) / filename cpickle(self, filename) def __setstate__(self, state): self.__dict__.update(state) if not hasattr(self, 'version'): self.version = -1 else: print(f' [{self.__class__.__name__}] Version {self.version}') @classmethod def load(cls, filename): if isinstance(filename, IOBase): self = load(filename) else: self = cload(filename) if not isinstance(self, cls): print(f' [{self.__class__.__name__}] WARNING: "{filename}" may be wrong type.') # raise AttributeError(f'"{filename}" is wrong type.') return self class Results(Base): VERSION = 10000 def __init__(self, results=None, sort=lambda x: x.data['_sequence']): if results is None: results = list() if len(results) > 0 and not isinstance(results[0], Result): results = [Result(*r) for r in results] if sort is True: sort = None if sort is not False: results = sorted(results, key=sort) self.results = results self.version = self.VERSION def add(self, result): self.results.append(result) self.results.sort() def __add__(self, other): # TODO - eliminate duplicates assert other.__class__.__name__ == self.__class__.__name__ results = self.results + other.results return self.__class__(results) def __iadd__(self, other): # TODO - eliminate duplicates assert other.__class__.__name__ == self.__class__.__name__ self.results += other.results return self def __call__(self, **kwargs): results = list() for r in self.results: ok = True for k,v in kwargs.items(): vr = r.data[k] try: if vr != v: ok = False break except: vr = str(vr) v = str(v) if vr != v: ok = False break if ok: results.append(r) return Results(results) def __iter__(self): for r in self.results: yield r def __getitem__(self, key): if isinstance(key, slice): return self.__class__(self.results[key]) return self.results[key] def to_list(self): return self.results.copy() def data(self): return [r.data for r in self.results] def result(self, rslice=None): if rslice is None: rslice = slice(None) return [r.result for r in self.results[rslice]] def __len__(self): return len(self.results) def __repr__(self): return ( f'{self.__class__.__name__}(\n' + ',\n'.join(repr(r) for r in self.results) + ')' ) def save_results(self, filename=None, path=None): """Bulk-save all completed results/tasks. If you provide a 'path' then output files will be written into that directory. 'filename' should be a string with format instructions so that variables in there can be replaced from the keywords. e.g., 'an={an:g}_en={en:g}_i={i:g}_q={q:g}.xz' If no filename is provided, the UUID will be used. """ for p,r in self.results: save_model(p, r, filename, path) class Processor(Base): VERSION = 10001 """ Interface for Parallel execution of runs """ _pickle_exclude = Base._pickle_exclude + ( '_tsaved', '_fragment', ) def __setstate__(self, state): self.__dict__.update(state) if not hasattr(self, 'version'): self.version = -1 else: print(f' [{self.__class__.__name__}] Version {self.version}') if self.version < 10001: self.uuid = uuid1().hex def __init__( self, nparallel=None, task=None, startmethod=STARTMETHOD, **kwargs): """Provide patameters for runs you want to loop over as as iterable types (list, tuple, numpy array) to the corresponding Be sure that iterable items you do not want to iterate over inside, e.g., a 1-list, 1-tuples, 1-set. Be default, sting, and dictionary types should already be dealt with automatically. If you provide a 'path' then output files will be written into that directory. 'filename' - should be the base filename, s.g., 'filenmae.xz'. for fragments, provide format string for anonymous decimal formatter 'fragments' - after how many results so write a fragments file (default: inf) 'fragment' - initial value for fragment index (default 0) 'tsave' after how much time to write - a full safe file if 'fragments' is None - a fragments file file if 'fragments' is note None (e.g., 0) (default: inf) If no filename is provided, but only a 'path', the a UUID will be used. If neither filename nor path are provided, the result objects will be returned and storded on the 'results' variable. The Results object can then be iterated over these results. 'task' is an object (class or function) that executes the task when called and returns an object (as classes would naturally do) that has save function. If there is no save function, a UUID will be picked as filename. Examples -------- from multistar.parallel import ParallelProcessor as P p = P(ymax=[1e6,1e7],Q=[1,2,3],abu=(dict(he3=1),dict(h1=1))) p.save(filename='Q{Q:g}_Mdot={ymax:05g}_{ABU}.xz') """ self.tstart = time() try: set_start_method(startmethod) except RuntimeError: pass result = kwargs.pop('result', None) if result is None: result = Result results = kwargs.pop('results', None) if results is None: results = Results data = kwargs.pop('data', None) if isinstance(data, (str, Path)): data = self.load_data() if data is None: # prepare inherently iterable types in kwargs for star map operation for k,v in kwargs.items(): if isinstance(v, (str, dict, Path, FunctionType, type)): kwargs[k] = (v,) # here we need code to add tasks as dictionaries to qi base = dict() data = list() values = list() keys = list() for k,v in kwargs.items(): if iterable(v): values.append(v) keys.append(k) else: base[k]=v for i, vec in enumerate(product(*values)): data.append(base | dict(zip(keys, vec)) | dict(_sequence=i)) self.data = data if nparallel is None: nparallel = cpu_count() print(f' [{self.__class__.__name__}] Running {nparallel:g} processes.') self.nparallel = nparallel self.task = task self.version = self.VERSION self.result = result self.results = results self.filename = kwargs.pop('filename', None) self._save = kwargs.pop('save', None) self.fragments = kwargs.pop('fragments', None) self.tsave = kwargs.pop('tsave', 1.e99) self._tsaved = self.tstart self.fragment = kwargs.pop("fragment", 0) if not hasattr(self, 'uuid'): self.uuid = uuid1().hex self.timeout = kwargs.pop('timeout', None) print(f' [{self.__class__.__name__}] Running {len(data):g} jobs.') save_data = kwargs.pop("save_data", True) self.save_data() def save_data(self): if hasattr(self, 'filename'): filename = str(self.filename).rsplit('.xz', 1)[0] else: filename = '' assert hasattr(self, 'uuid') f += '_' + self.uuid + '.xz' cpickle(self.data, filename) def load_data(self, filename): raise NotImplementedError() def __iter__(self): for r in self.results: yield r def __add__(self, other): # TODO - eliminate duplicates assert other.__class__.__name__ == self.__class__.__name__ assert type(other) == type(self) results = self.results + other.results new = copy(self) new.results = results return new def __iadd__(self, other): # TODO - eliminate duplicates assert other.__class__.__name__ == self.__class__.__name__ assert type(other) == type(self) self.results += other.results return self def store_fragment(self, results, final=False): if len(results) == 0: return fragments = self.fragments n = len(results) if final: fragments = n if time() > self._tsaved + self.tsave: fragments = max(1, n) if fragments is None: return if n < fragments: return filename = self.filename if isinstance(filename, Path): filename = filename.as_posix() if filename is None: filename = f'{self.__class__.__name__}-{self.uuid}' if self.fragments is not None and final is False: filename += '-{:09d}' filename += '.xz' self.filename = filename filename = filename.format(self.fragment) self.fragment += n results_ = self.results self.results = self.results(results) self._tsaved = time() self.save(filename) if final: return self.results = results_ if self.fragments is not None: results.clear() def closeout(self, results, fails=0): if self._save or self.fragments is not None: self.store_fragment(results, final=True) else: self.results = self.results(results) if fails > 0: print(f' [{self.__class__.__name__}] there were {fails} Failtures.') print(f' [{self.__class__.__name__}] generated in {time2human(time() - self.tstart)}.') @classmethod def from_fragments(cls, path='.', complete=True): path = Path(path).expanduser() if path.is_dir(): files = list(path.glob('*')) else: files = list(glob(str(path))) if len(files) == 0: print(' [from_fragments] No files found.') return files = sorted(files) self = cls.load(files[0]) assert isinstance(self, cls) seq = dict() for f in files: s = cls.load(f) assert type(s) == cls if s.uuid in seq: seq[s.uuid] += s else: seq[s.uuid] = s print(f' [Processor][from_fragments] found {len(seq)} sequences.') results = list() for i,(u,q) in enumerate(seq.items()): rr = sorted(q.results, key=lambda r: r.data['_sequence']) if complete: assert rr[0].data['_sequence'] == 0 s = np_array([r.data['_sequence'] for r in rr]) ii = where(s[1:] > s[:-1]+1)[0] if len(ii) > 0: print(f' [Processor][from_fragments][{u}] retaining {ii[0]+1} data points, discarding {len(rr)-(ii[0]+1)}.') rr = rr[:ii[0]+1] else: print(f' [Processor][from_fragments][{u}] retaining {len(rr)} data points.') else: print(f' [Processor][from_fragments][{u}] loaded {len(rr)} data points.') results.extend(rr) print(f' [Processor][from_fragments] loaded {len(results)} data points total.') self.results = Results(results, sort=False) return self class PoolProcessor(Processor): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) manager = Manager() try: manager.start() except ProcessError: pass function = PoolFunction(task=self.task) starttime = time() ndata = len(self.data) results = list() with manager.Pool(self.nparallel) as pool: for r in pool.imap_unordered(function, self.data): results.append(self.result(*r)) now = time() runtime = now - starttime ndone = len(results) frac = ndone / ndata eta = runtime / ndone * (ndata - ndone) eta *= self.nparallel / min(self.nparallel, max(1, ndata - ndone)) print(f' [{self.__class__.__name__}] done {ndone:_d} / {ndata:_d} ({frac * 100:0.2f} %, {time2human(runtime)}) ETA: {time2human(eta)}.') self.store_fragment(results) manager.shutdown() self.closeout(results) class ParallelProcessor(Processor): @staticmethod def _make_process(qi, qo, task): p = ParallelProcess(qi, qo, task=task) p.daemon = False p.start() return p def __init__(self, *args, save=None, **kwargs): super().__init__(*args, **kwargs) manager = Manager() try: manager.start() except ProcessError: pass qi = manager.JoinableQueue() qo = manager.JoinableQueue() self.nparallel = min(self.nparallel, len(self.data)) processes = list() for i in range(self.nparallel): processes.append(self._make_process(qi, qo, self.task)) for d in self.data: qi.put(d) for _ in range(len(processes)): qi.put(None) try: qi.close() except AttributeError: pass # we collect up results timeout = self.timeout if timeout is None: timeout = 2**30 starttime = time() ndata = len(self.data) results = list() fails = 0 for _ in range(len(self.data)): try: results.append(self.result(*qo.get(True, timeout))) except Empty: fails += 1 # presumably the task died. # We start a new one in case all runs stopped. processes.append(self._make_process(qi, qo, self.task)) qo.task_done() now = time() runtime = now - starttime ndone = len(results) + fails frac = ndone / ndata eta = runtime / ndone * (ndata - ndone) eta *= self.nparallel / min(self.nparallel, max(1, ndata - ndone)) print(f' [{self.__class__.__name__}] done {ndone:_d} / {ndata:_d} ({frac * 100:0.2f} %, {time2human(runtime)}) ETA: {time2human(eta)}.') self.store_fragment(results) qi.join() qo.join() manager.shutdown() self.closeout(results, fails) class SerialProcessor(Processor): """ Debug plugin class """ def __init__(self, *args, save=None, **kwargs): nparallel = kwargs.pop('nparallel', None) if nparallel is not None and nparallel != 1: print(f' [{self.__class__.__name__}] {"#"*40}') print(f' [{self.__class__.__name__}] {"#"*40}') print(f' [{self.__class__.__name__}] {"#"*40}') print(f' [{self.__class__.__name__}] Using serial mode, NOT {nparallel=}.') print(f' [{self.__class__.__name__}] {"#"*40}') print(f' [{self.__class__.__name__}] {"#"*40}') print(f' [{self.__class__.__name__}] {"#"*40}') kwargs['nparallel'] = 1 self.nparallel = 1 super().__init__(*args, **kwargs) # we collect up results starttime = time() ndata = len(self.data) results = list() for d in self.data: r = self.task(**d) results.append(self.result(d, r)) now = time() runtime = now - starttime ndone = len(results) frac = ndone / ndata eta = runtime / ndone * (ndata - ndone) eta *= self.nparallel / min(self.nparallel, max(1, ndata - ndone)) print(f' [{self.__class__.__name__}] done {ndone:_d} / {ndata:_d} ({frac * 100:0.2f} %, {time2human(runtime)}) ETA: {time2human(eta)}.') self.store_fragment(results) self.closeout(results) _processors = dict( parallel = ParallelProcessor, pool = PoolProcessor, serial = SerialProcessor, ) _parallel = getenv('MULTISTAR_PARALLELMODE') if _parallel in _processors: PARALLELMODE = _parallel print(f' [multistar.parallel] Using {PARALLELMODE=}') del _parallel def __getattr__(attr): if attr == 'TheProcessor': return _processors[PARALLELMODE] raise AttributeError(f'[{__name__}] "{attr} not found.')