Source code for eon.communicator


from array import array
from eon.config import config
import os
import shutil
import logging
logger = logging.getLogger('communicator')

from time import sleep, time
from subprocess import Popen, PIPE
import tarfile
from io import StringIO
import pickle as pickle
import glob
import re
import numpy
import sys

[docs] def tryint(s): try: return int(s) except: return s
[docs] def alphanum_key(s): """ Turn a string into a list of string and number chunks. "z23a" -> ["z", 23, "a"] """ return [ tryint(c) for c in re.split('([0-9]+)', s) ]
[docs] def sort_nicely(l): """ Sort the given list in the way that humans expect. """ l.sort(key=alphanum_key)
[docs] def get_communicator(): # This is an ugly hack to "remember" a communicator as it isn't possible to construct # the MPI communicator multiple times and it needs to remember its object level variables. if hasattr(get_communicator, 'comm'): return get_communicator.comm if config.comm_type=='cluster': comm = Script(config.path_scratch, config.comm_job_bundle_size, config.comm_script_name_prefix, config.comm_script_path, config.comm_script_queued_jobs_cmd, config.comm_script_cancel_job_cmd, config.comm_script_submit_job_cmd) elif config.comm_type=='local': comm = Local(config.path_scratch, config.comm_local_client, config.comm_local_ncpus, config.comm_job_bundle_size) elif config.comm_type=='mpi': comm = MPI(config.path_scratch, config.comm_job_bundle_size) else: logger.error(str(config.comm_type)+" is an unknown communicator.") raise ValueError() get_communicator.comm = comm return comm
[docs] class NotImplementedError(Exception): pass
[docs] class CommunicatorError(Exception): pass
[docs] class EONClientError(Exception): """An EON client finished without outputting results, it probably crashed.""" pass
[docs] class Communicator: def __init__(self, scratchpath, bundle_size=1): if not os.path.isdir(scratchpath): # should probably log this event os.makedirs(scratchpath) self.scratchpath = scratchpath self.bundle_size = bundle_size
[docs] def submit_jobs(self, data, invariants): '''Throws CommunicatorError if fails.''' raise NotImplementedError()
[docs] def get_results(self, results): '''Returns a list of dictionaries containing the results.''' raise NotImplementedError()
[docs] def get_queue_size(self): '''Returns the number of items waiting to run in the queue.''' raise NotImplementedError()
[docs] def cancel_state(self, statenumber): '''Returns the number of workunits that were canceled.''' raise NotImplementedError()
[docs] def get_bundle_size(self, job_path): if not isinstance(job_path, list): # List files in job_path. fnames = os.listdir(job_path) else: # job_path is already a list of filenames. fnames = job_path # Count results*.dat files. pattern = re.compile(r'results(?:_\d+)?.dat$') size = sum(1 for fname in fnames if pattern.match(fname)) is_bundle = not (size == 1 and "results.dat" in fnames) return size, is_bundle
[docs] def unbundle(self, resultpath, keep_result): '''This method unbundles multiple jobs into multiple single jobs so the akmc script can process them. If the job did not return results (probably because it crashed or was canceled), this method will raise EONClientError. ''' # These are the files in the result directory that we keep. jobpaths = [ os.path.join(resultpath,d) for d in os.listdir(resultpath) if os.path.isdir(os.path.join(resultpath,d)) ] regex = re.compile(r"(\w+)_(\d+)(\.\w+)") for jobpath in jobpaths: basename, dirname = os.path.split(jobpath) if not keep_result(dirname): continue # Need to figure out how many jobs were bundled together # and then create the new job directories with the split files. bundle_size, is_bundle = self.get_bundle_size(jobpath) if bundle_size == 0: logger.error("Client running in %s returned no results. " "Check its output for errors." % jobpath) # GH: just log the error and continue instead of quitting #raise EONClientError("Client running in %s returned no results. " # "Check its output for errors." % jobpath) continue results = [{'name': dirname} for i in range(bundle_size)] if not is_bundle: # Only a single task inside this job, no need to unbundle. for filename in glob.glob(os.path.join(jobpath, "*.*")): if not (filename.endswith(".con") or filename.endswith(".dat")): continue rootname, fname = os.path.split(filename) f = open(filename,'r') filedata = StringIO(f.read()) f.close() # add result to results results[0][fname] = filedata results[0]['number'] = 0 else: # Several tasks bundled inside this job, we need to unbundle. filenames = glob.glob(os.path.join(jobpath,"*_[0-9]*.*")) for filename in filenames: if not (filename.endswith(".con") or filename.endswith(".dat")): continue # parse filename rootname, fname = os.path.split(filename) match = regex.match(fname) if not match: continue parts = match.groups() index = int(parts[1]) key = parts[0]+parts[2] # Load data into stringIO object (should we just return filehandles?) try: f = open(filename,'r') filedata = StringIO(f.read()) f.close() except (IOError, OSError): logger.exception("Failed to read file %s" % filename) continue # add result to results results[index][key] = filedata results[index]['number'] = index # XXX: UGLY: We need a way to check if there are no results. if not any([ filename.startswith('results') for filename in list(results[0].keys())]): logger.warning("Failed to find a result.dat file for %s",results[0]['name']) results = [] yield results
[docs] def make_bundles(self, data, invariants): '''This method is a generator that bundles together multiple jobs into a single job. Example usage: for jobpath in self.make_bundles(data, invariants): do_stuff()''' # Split jobpaths in to lists of size self.bundle_size. chunks = [ data[i:i+self.bundle_size] for i in range(0, len(data), self.bundle_size) ] for chunk in chunks: # create the bundle's directory job_path = os.path.join(self.scratchpath, chunk[0]['id']) os.mkdir(job_path) for filename in list(invariants.keys()): f = open(os.path.join(job_path, filename), 'w') file_contents, file_permissions = invariants[filename] # f.write(invariants[filename].getvalue()) f.write(file_contents.getvalue()) f.close() os.chmod(os.path.join(job_path, filename), file_permissions) # Concatenate all of the displacement and modes together. n = 0 for job in chunk: for basename in list(job.keys()): splitname = basename.rsplit(".", 1) if len(splitname)!=2: continue if self.bundle_size == 1: filename = basename else: filename = "%s_%d.%s" % (splitname[0], n, splitname[1]) f = open(os.path.join(job_path, filename), 'w') f.write(job[basename].getvalue()) f.close() n += 1 # Returns the jobpath to the new bigger workunit. yield job_path
[docs] class MPI(Communicator): def __init__(self, scratchpath, bundle_size): Communicator.__init__(self, scratchpath, bundle_size) from mpi4py.MPI import COMM_WORLD self.comm = COMM_WORLD self.client_ranks = [ int(r) for r in os.environ['EON_CLIENT_RANKS'].split(":") ] config.comm_job_buffer_size = len(self.client_ranks) self.resume_jobs = [] if os.path.isdir(self.scratchpath): self.resume_jobs = [ d for d in os.listdir(self.scratchpath) if os.path.isdir(os.path.join(self.scratchpath,d)) ] logger.info("Found %i jobs to resume in %s", len(self.resume_jobs), self.scratchpath)
[docs] def submit_jobs(self, data, invariants): ready_ranks = self.get_ready_ranks() for jobpath in self.make_bundles(data, invariants): rank = ready_ranks.pop() tmp = numpy.empty(1, dtype='i') self.comm.Recv(tmp, source=rank, tag=1) #buf = array('c', jobpath+'\0') buf = array('b') bufval = jobpath+'\0' buf.frombytes(bufval.encode()) self.comm.Send(buf, rank)
[docs] def run_resume_jobs(self): if len(self.resume_jobs) == 0: return ready_ranks = self.get_ready_ranks() while True: if len(self.resume_jobs) == 0: break if len(ready_ranks) == 0: break jobdir = self.resume_jobs.pop() rank = ready_ranks.pop() jobpath = os.path.join(self.scratchpath,jobdir) tmp = numpy.empty(1, dtype='i') self.comm.Recv(tmp, source=rank, tag=1) #buf = array('c', jobpath+'\0') buf = array('b') bufval = jobpath+'\0' buf.frombytes(bufval.encode()) self.comm.Send(buf, rank)
[docs] def get_ready_ranks(self): ready_ranks = [] for rank in self.client_ranks: ready = self.comm.Iprobe(rank, tag=1) if ready: # logger.info("Rank %i is ready" % rank) ready_ranks.append(rank) return ready_ranks
[docs] def get_queue_size(self): self.run_resume_jobs() nready = len(self.get_ready_ranks()) nclients = len(self.client_ranks) qs = nclients - nready return qs
[docs] def get_results(self, resultspath, keep_result): print("into get_results") '''Moves work from scratchpath to results path.''' # from mpi4py.MPI import ANY_SOURCE, Status import mpi4py.MPI as MPI print("after import") status = MPI.Status() while self.comm.Iprobe(source=MPI.ANY_SOURCE, tag=0, status=status): #buf = array('c', '\0'*1024) buf = numpy.array(['\0']*1024, dtype="S1") self.comm.Recv([buf, MPI.CHARACTER], source=status.source, tag=0) #print("after Recv") #print("buf: ",buf) strterm = numpy.where(buf == b'\0') strindex = strterm[0][0] #jobdir = buf[:buf.index('\0')].tostring() jobdir = buf[:strindex].tostring() #print("jobdir: ",jobdir.decode()) jobdir = os.path.split(jobdir)[1].decode() #print("jobdir: ",jobdir) if config.debug_keep_all_results: shutil.copytree(os.path.join(self.scratchpath,jobdir), os.path.join(config.path_root, config.debug_results_path, jobdir)) dest_dir = os.path.join(resultspath, jobdir) shutil.move(os.path.join(self.scratchpath,jobdir), dest_dir) for bundle in self.unbundle(resultspath, keep_result): for result in bundle: yield result
[docs] def get_number_in_progress(self): return int(os.environ['EON_NUMBER_OF_CLIENTS'])
[docs] def cancel_state(self, state): #XXX: how to support this... return 0
[docs] class Local(Communicator): def __init__(self, scratchpath, client, ncpus, bundle_size): Communicator.__init__(self, scratchpath, bundle_size) # number of cpus to use self.ncpus = ncpus # path to the client if '/' in client: self.client = os.path.abspath(client) if not os.path.isfile(self.client): logger.error("Can't find client: %s", client) raise CommunicatorError("Can't find client binary: %s"%client) else: # is the client in the local directory? if os.path.isfile(client): self.client = os.path.abspath(client) # is the client in the path? elif sum([ os.path.isfile(os.path.join(d, client)) for d in os.environ['PATH'].split(':') ]) != 0: self.client = client else: logger.error("Can't find client: %s", client) raise CommunicatorError("Can't find client binary: %s"%client) self.joblist = [] import atexit # don't let clients hang around if the script dies atexit.register(self.cleanup)
[docs] def cleanup(self): '''Kills the running eonclients.''' import signal for job in self.joblist: p = job[0] try: os.kill(p.pid, signal.SIGKILL) except OSError: pass
[docs] def get_results(self, resultspath, keep_result): '''Moves work from scratchpath to results path.''' jobdirs = [ d for d in os.listdir(self.scratchpath) if os.path.isdir(os.path.join(self.scratchpath,d)) ] for jobdir in jobdirs: if config.debug_keep_all_results: shutil.copytree(os.path.join(self.scratchpath,jobdir), os.path.join(config.path_root, config.debug_results_path,jobdir)) dest_dir = os.path.join(resultspath, jobdir) shutil.move(os.path.join(self.scratchpath,jobdir), dest_dir) for bundle in self.unbundle(resultspath, keep_result): for result in bundle: yield result # Clean out scratch directory for name in os.listdir(self.scratchpath): path_name = os.path.join(self.scratchpath, name) if not os.path.isdir(path_name): continue shutil.rmtree(path_name)
[docs] def check_job(self, job): p, jobpath = job if p.returncode == 0: logger.info('Job finished: %s' % jobpath) return True else: stdout, stderr = p.communicate() errmsg = "job failed: %s: %s" % (jobpath, stderr) logger.warning(errmsg)
[docs] def submit_jobs(self, data, invariants): '''Run up to ncpu number of clients to process the work in jobpaths. The job directories are moved to the scratch path before the calculation is run. This method doesn't return anything.''' for jobpath in self.make_bundles(data, invariants): # move the job directory to the scratch directory # update jobpath to be in the scratch directory fstdout = open(os.path.join(jobpath, "stdout.dat"),'w') p = Popen(self.client, cwd=jobpath, stdout=fstdout, stderr=PIPE) #commands.getoutput("renice -n 20 -p %d" % p.pid) self.joblist.append((p,jobpath)) while len(self.joblist) == self.ncpus: for i in range(len(self.joblist)): p = self.joblist[i][0] retval = p.poll() if retval is None: continue else: self.check_job(self.joblist[i]) self.joblist.pop(i) break sleep(0.1) # wait for everything to finish for job in self.joblist: p = job[0] p.wait() self.check_job(job)
[docs] def cancel_state(self, state): return 0
[docs] def get_queue_size(self): return 0
[docs] def get_number_in_progress(self): return 0
[docs] class Script(Communicator): def __init__(self, scratch_path, bundle_size, name_prefix, scripts_path, queued_jobs_cmd, cancel_job_cmd, submit_job_cmd): Communicator.__init__(self, scratch_path, bundle_size) self.queued_jobs_cmd = os.path.join(scripts_path, queued_jobs_cmd) self.cancel_job_cmd = os.path.join(scripts_path, cancel_job_cmd) self.submit_job_cmd = os.path.join(scripts_path, submit_job_cmd) self.job_id_path = os.path.join(scratch_path, "script_job_ids") self.name_prefix = name_prefix # read in job ids try: # f = open(self.job_id_path, "r") f = open(self.job_id_path, "rb") self.jobids = pickle.load(f) f.close() except IOError: self.jobids = {} pass
[docs] def save_jobids(self): # f = open(self.job_id_path, "w") f = open(self.job_id_path, "wb") pickle.dump(self.jobids, f) f.close()
[docs] def get_results(self, resultspath, keep_result): '''Moves work from scratchpath to results path.''' # queued_jobs.sh jobid1 jobid2 jobid 3 # the inverse of the jobids returned is # job dirs needs to map queued_jobs = self.get_queued_jobs() finished_jobids = set(self.jobids.keys()) - set(self.get_queued_jobs()) finished_eonids = [] for jobid in finished_jobids: finished_eonids.append(int(self.jobids.pop(jobid))) jobdirs = [ d for d in os.listdir(self.scratchpath) if os.path.isdir(os.path.join(self.scratchpath,d)) if int(d.rsplit('_', 1)[-1]) in finished_eonids ] #try to return jobs in order sort_nicely(jobdirs) for jobdir in jobdirs: if config.debug_keep_all_results: shutil.copytree(os.path.join(self.scratchpath,jobdir), os.path.join(config.path_root, config.debug_results_path,jobdir)) dest_dir = os.path.join(resultspath, jobdir) shutil.move(os.path.join(self.scratchpath,jobdir), dest_dir) for bundle in self.unbundle(resultspath, keep_result): for result in bundle: yield result
[docs] def check_command(self, status, output, cmdname): if status != 0: logger.error(output) raise CommunicatorError("'%s' returned a non-zero exit status"%cmdname)
[docs] def submit_jobs(self, data, invariants): for jobpath in self.make_bundles(data, invariants): # submit_job.sh jobname jobpath # should return a jobid # need to associate this jobid with our jobid jobpath = os.path.realpath(jobpath) jobname = "%s_%s" % (self.name_prefix, os.path.basename(jobpath)) eon_jobid = jobname.rsplit('_',1)[-1] cmd = "%s %s %s" % (self.submit_job_cmd, jobname, jobpath) # status, output = commands.getstatusoutput(cmd) p = Popen([self.submit_job_cmd,jobname,jobpath], stdout=PIPE, stderr=PIPE) output, error = p.communicate() output = output.decode() error = error.decode() status = p.returncode self.check_command(status, output, cmd) jobid = int(output.strip()) self.jobids[jobid] = eon_jobid # XXX: It is probably slow to save after EVERY job submission, # but is slow better than losing jobs? self.save_jobids()
[docs] def cancel_state(self, state): # cancel_job.sh jobid if len(list(self.jobids.keys())) == 0: return 0 for job_id in list(self.jobids.keys()): cmd = "%s %i" % (self.cancel_job_cmd, job_id) job_id_string = "%s" % (job_id) # status, output = commands.getstatusoutput(cmd) p = Popen([self.cancel_job_cmd, job_id_string], stdout=PIPE, stderr=PIPE) output, error = p.communicate() output = output.decode() error = error.decode() status = p.returncode self.check_command(status, output, cmd) if status != 0: logger.warn("Job cancel failed with error: %s" % output) self.jobids = {} self.save_jobids() shutil.rmtree(config.path_scratch) os.makedirs(config.path_scratch) return len(list(self.jobids.keys()))
[docs] def get_queued_jobs(self): # status, output = commands.getstatusoutput(self.queued_jobs_cmd) p = Popen([self.queued_jobs_cmd,''], stdout=PIPE, stderr=PIPE) output, error = p.communicate() output = output.decode() error = error.decode() status = p.returncode self.check_command(status, output, self.queued_jobs_cmd) queued_job_ids = [] for line in output.split("\n"): try: queued_job_ids.append(int(line)) except ValueError: continue return list(set(self.jobids).intersection(queued_job_ids))
[docs] def get_number_in_progress(self): return 0
[docs] def get_queue_size(self): return len(self.get_queued_jobs())