Source code for eon.fileio


'''
Con(figuration) i/o library
'''
import configparser
#from io import BytesIO as StringIO
from io import StringIO
import logging
logger = logging.getLogger('io')
import numpy
import os

import pickle as pickle

from eon import atoms
from eon.config import config

[docs] def save_prng_state(): state = numpy.random.get_state() fh = open('prng.pkl', 'wb') pickle.dump(state, fh, pickle.HIGHEST_PROTOCOL)
[docs] def get_prng_state(): fh = open('prng.pkl', 'rb') state = pickle.load(fh) numpy.random.set_state(state)
[docs] def length_angle_to_box(boxlengths, angles): box = numpy.zeros( (3,3) ) angles *= numpy.pi/180.0 box[0][0] = 1.0 box[1][0] = numpy.cos(angles[0]) box[1][1] = numpy.sin(angles[0]) box[2][0] = numpy.cos(angles[1]) box[2][1] = (numpy.cos(angles[2])-box[1][0]*box[2][0])/box[1][1] box[2][2] = numpy.sqrt(1.0-box[2][0]**2-box[2][1]**2) box[0,:]*=boxlengths[0] box[1,:]*=boxlengths[1] box[2,:]*=boxlengths[2] return box
[docs] def box_to_length_angle(box): lengths = numpy.zeros(3) lengths[0] = numpy.linalg.norm(box[0,:]) lengths[1] = numpy.linalg.norm(box[1,:]) lengths[2] = numpy.linalg.norm(box[2,:]) angles = numpy.zeros(3) angles[0] = numpy.arccos(numpy.dot(box[0,:]/lengths[0],box[1,:]/lengths[1])) angles[1] = numpy.arccos(numpy.dot(box[0,:]/lengths[0],box[2,:]/lengths[2])) angles[2] = numpy.arccos(numpy.dot(box[1,:]/lengths[1],box[2,:]/lengths[2])) angles *= 180.0/numpy.pi return lengths, angles
[docs] def loadcons(filename): filein = open(filename, 'r') p = [] while True: try: p.append(loadcon(filein, reset=False)) except: return p
[docs] def loadposcars(filename): filein = open(filename, 'r') p = [] while True: try: p.append(loadposcar(filein)) except: return p
[docs] def loadcon(filein, reset = True): ''' Load a con file filein: may be either a filename or a file-like object ''' if hasattr(filein, 'readline'): con = filein else: con = open(filein, 'r') con.readline() # line 1: comment con.readline() # line 2: comment # determine how many dimensions tmp = numpy.array(con.readline().split()) # line 3: Box lengths for i in range(len(tmp)): dim=i+1 try: float(tmp[i]) except: dim=i break # handle the box boxlengths=numpy.zeros(dim) for i in range(dim): boxlengths[i]=float(tmp[i]) boxangles=numpy.array([ float(f) for f in con.readline().split()[0:dim] ]) # line 4: Box angles boxtemp=numpy.zeros((dim,dim),'d') boxtemp = length_angle_to_box(boxlengths,boxangles) con.readline() # line 5: comment con.readline() # line 6: comment num_types = int(con.readline().split()[0]) # line 7: number of atom types num_each_type = con.readline().split() # line 8: number of each type of atom mass_of_type = con.readline().split() # line 9: mass of each type of atom num_atoms = 0 for i in range(num_types): num_each_type[i] = int(num_each_type[i]) mass_of_type[i] = float(mass_of_type[i]) num_atoms += num_each_type[i] a = atoms.Atoms(num_atoms) a.box = boxtemp index = 0 for i in range(num_types): name = con.readline().strip() if abs(1.0-mass_of_type[i]) < 1e-6 and name != "H": logger.warning("WARNING: Mass of %s set to 1.0", name) con.readline() # skip meaningless line for j in range(num_each_type[i]): vals = con.readline().split() for k in range(dim): a.r[index][k] = float(vals[k]) a.mass[index] = mass_of_type[i] a.names[index] = name if not int(vals[dim])==0: a.free[index]=0 index += 1 if reset: con.seek(0) return a
[docs] def savecon(fileout, p, w = 'w'): ''' Save a con file fileout: can be either a file name or a file-like object p: information (in the form of an atoms object) to save w: write/append flag ''' if hasattr(fileout, 'write'): con = fileout else: con = open(fileout, w) con.write("Generated by eOn\n\n") dim = len(p.r[0]) lengths, angles = box_to_length_angle(p.box) con.write(" ".join(['%.6f' % s for s in lengths])+"\n") con.write(" ".join(['%.6f' % s for s in angles])+"\n") con.write("\n\n") atom_count = {} name_order = [] for i in range(len(p)): name = p.names[i] if name not in name_order: name_order.append(name) if name in atom_count: atom_count[name] += 1 else: atom_count[name] = 1 con.write(str(len(name_order))+"\n") con.write(" ".join([str(atom_count[i]) for i in name_order])+"\n") printmasses = [] index = 0 for i in range(len(name_order)): printmasses.append(p.mass[index]) index += atom_count[name_order[i]] con.write(" ".join(["%.6f"% i for i in printmasses])+"\n") index = 0 for i in range(len(name_order)): con.write(name_order[i]+"\n") con.write("Coordinates of component %3i" %(i+1)+"\n") for j in range(atom_count[name_order[i]]): con.write("%.6f %.6f %.6f %d %d\n" %( p.r[index][0], p.r[index][1], p.r[index][2], int(not p.free[index]), index+1)) index += 1
[docs] def load_mode(modefilein): ''' Reads a mode.dat file into an N by 3 numpy array modefilein: may be either a file-like object of a filename ''' if hasattr(modefilein, 'readline'): f = modefilein else: f = open(modefilein, 'r') if len(f.readline().split()) == 3: f.seek(0); lines = f.readlines() mode = [] for line in lines: l = line.strip().split() for j in range(3): mode.append(float(l[j])) return numpy.array(mode).reshape(len(mode)//3, 3)
[docs] def save_mode(modefileout, displace_vector): ''' Saves an Nx3 numpy array into a mode.dat file. modefileout: may be either a filename or file-like object displace_vector: the mode (Nx3 numpy array) ''' if hasattr(modefileout, 'write'): f = modefileout else: f = open(modefileout, 'w') for i in range(len(displace_vector)): f.write("%.3f %.3f %.3f\n" % (displace_vector[i][0], displace_vector[i][1], displace_vector[i][2]))
[docs] def save_results_dat(fileout, results): ''' Saves a results.dat file from a dictionary ''' if hasattr(fileout, 'write'): f = fileout else: f = open(fileout, 'w') for key in results: #print(results[key], key, con) #GH: this made no sense to me - replaced with the following f.write(results[key], key)
[docs] def modify_config(config_path, changes): parser = configparser.ConfigParser() parser.read(config.config_path) for change in changes: parser.set(*change) config_str_io = StringIO() parser.write(config_str_io) config_str_io.seek(0) return config_str_io
[docs] def parse_results(filein): ''' Reads a results.dat file and gives a dictionary of the values contained therein ''' if hasattr(filein, 'readline'): f = filein f.seek(0) else: f = open(filein) results = {} for line in f: line = line.split() if len(line) < 2: continue if '.' in line[0]: try: results[line[1]] = float(line[0]) except ValueError: logger.warning("Couldn't parse float in results.dat: %s", line) else: try: results[line[1]] = int(line[0]) except ValueError: try: results[line[1]] = line[0] except ValueError: logger.warning("Couldn't parse string in results.dat: %s", line) return results
[docs] def loadposcar(filein): ''' Load the POSCAR file named filename and returns an atoms object ''' if hasattr(filein, 'readline'): f = filein else: f = open(filein, 'r') # Line 1: Atom types AtomTypes = f.readline().split() # Line 2: scaling of coordinates scale = float(f.readline()) # Lines 3-5: the box box = numpy.zeros((3, 3)) for i in range(3): line = f.readline().split() box[i] = numpy.array([float(line[0]), float(line[1]), float(line[2])]) * scale # Line 6: number of atoms of each type. line = f.readline().split() NumAtomsPerType = [] for l in line: NumAtomsPerType.append(int(l)) # Now have enough info to make the atoms object. num_atoms = sum(NumAtomsPerType) p = atoms.Atoms(num_atoms) # Fill in the box. p.box = box # Line 7: selective or cartesian sel = f.readline()[0] selective_flag = (sel == 's' or sel == 'S') if not selective_flag: car = sel else: car = f.readline()[0] direct_flag = not (car == 'c' or car == 'C' or car == 'k' or car == 'K') atom_index = 0 for i in range(len(NumAtomsPerType)): for j in range(NumAtomsPerType[i]): p.names[atom_index] = AtomTypes[i] line = f.readline().split() if(selective_flag): assert len(line) >= 6 else: assert len(line) >= 3 pos = line[0:3] if selective_flag: sel = line[3:7] if sel[0] == 'T' or sel[0] == 't': p.free[atom_index] = 1 elif sel[0] == 'F' or sel[0] == 'f': p.free[atom_index] = 0 p.r[atom_index] = numpy.array([float(q) for q in pos]) if direct_flag: p.r[atom_index] = numpy.dot(p.r[atom_index], p.box) else: p.r[atom_index] *= scale atom_index += 1 return p
[docs] def saveposcar(fileout, p, w='w', direct = False): ''' Save a POSCAR fileout: name to save it under point: atoms object to save w: write/append flag ''' if hasattr(fileout, 'write'): poscar = fileout else: poscar = open(fileout, w) atom_types = [] num_each_type = {} for name in p.names: if not name in atom_types: atom_types.append(name) num_each_type[name] = 1 else: num_each_type[name] += 1 poscar.write(" ".join(atom_types)+'\n') #Line 1: Atom type poscar.write("1.0\n") #Line 2: scaling for i in range(3): poscar.write(" ".join(['%20.14f' % s for s in p.box[i]])+'\n') #lines 3-5: box poscar.write(" ".join(atom_types)+'\n') #Line 6: Atom type poscar.write(" ".join(['%s' % num_each_type[key] for key in atom_types])+'\n') poscar.write('Selective Dynamics\n') #line 7: selective dynamics if direct: poscar.write('Direct\n') #line 8 cartesian coordinates ibox = numpy.linalg.inv(numpy.array(p.box)) p.r = numpy.dot(p.r, ibox) else: poscar.write('Cartesian\n') #line 8 cartesian coordinates for i in range(len(p)): posline = " ".join(['%20.14f' % s for s in p.r[i]]) + " " for j in range(3): if(p.free[i]): posline+=' T' else: posline+=' F' poscar.write(posline+'\n')
from configparser import ConfigParser as SCP
[docs] class ini(SCP): def __init__(self, filenames): self.loaded = False self.filenames = filenames SCP.__init__(self)
[docs] def read(self): self.loaded = True SCP.read(self, self.filenames)
[docs] def get(self, section, option, default="ini_no_default", **kwargs): # def get(self, section, option, default="ini_no_default"): if not self.loaded: self.read() try: SCP.read(self, self.filenames) #value = SCP.get(self, section, option, raw=True, **kwargs) value = SCP.get(self, section, option, **kwargs) #value = SCP.get(self, section, option, raw=True) #value = SCP.get(self, section, option) except: if default == "ini_no_default": raise NameError("Section or option missing, no default specified") return default try: return int(value) except ValueError: pass try: return float(value) except ValueError: pass if value.lower() == 'true': return True if value.lower() == 'false': return False return value
[docs] def getint(self, *args): raise NotImplementedError("Use the get function with this ConfigParser wrapper")
[docs] def getfloat(self, *args): raise NotImplementedError("Use the get function with this ConfigParser wrapper")
[docs] def getboolean(self, *args): raise NotImplementedError("Use the get function with this ConfigParser wrapper")
[docs] def set(self, section, option, value): if section not in self.sections(): self.add_section(section) SCP.set(self, section, option, str(value)) if type(self.filenames) == str: name = self.filenames else: name = self.filenames[-1] # configfile = open(name, 'wb') configfile = open(name, 'w') self.write(configfile) configfile.close()
[docs] class Dynamics: """ The Dynamics class handles I/O for the dynamics.txt file of an aKMC simulation. """ def __init__(self, filename): self.filename = filename if not os.path.exists(filename): f = open(self.filename, 'w') header = "%12s %12s %12s %12s %12s %12s %12s %12s %12s\n" % ('step-number', 'reactant-id', 'process-id', 'product-id', 'step-time', 'total-time', 'barrier', 'rate', 'energy') f.write(header) f.write("-" * len(header)) f.write("\n") f.close() self.next_step = 0 # read last lines of the file to determine iteration nr else: f = open(self.filename,'r') f.seek(0,2) #seek to EOF fsize = f.tell() # seek 1024 bytes back (or to beginning of file if fsize < 1024 ) # last line must be contained in this block f.seek( max( fsize - 1024 , 0 ) , 0) lines = f.readlines() self.next_step = int ( lines[-1].split()[0] ) + 1 # determine iteration nr of next step
[docs] def append(self, reactant_id, process_id, product_id, step_time, total_time, barrier, rate, energy): f = open(self.filename, 'a') f.write("%12d %12d %12d %12d %12e %12e %12f %12e %12f\n" % (self.next_step, reactant_id, process_id, product_id, step_time, total_time, barrier, rate, energy)) f.close() self.next_step += 1
[docs] def append_sb(self, reactant_id, process_id, product_id, step_time, total_time, basin_id, rate, energy): f = open(self.filename, 'a') f.write("%12d %12d %12d %12d %12e %12e %12d %12e %12f\n" % (self.next_step, reactant_id, process_id, product_id, step_time, total_time, basin_id, rate, energy)) f.close() self.next_step += 1
[docs] def get(self): f = open(self.filename, 'r') lines = f.readlines()[2:] f.close() data = [] for line in lines: split = line.split() data.append({"reactant": int(split[1]), "process": int(split[2]), "product": int(split[3]), "steptime": float(split[4]), "totaltime": float(split[5]), "barrier": float(split[6]), "prefactor": float(split[7])}) return data
[docs] def load_potfiles(pot_dir): ret = {} if os.path.isdir(pot_dir): for name in os.listdir(pot_dir): if os.path.isdir(name): continue a = open(os.path.join(pot_dir, name), 'r') b = StringIO("".join(a.readlines())) c = os.stat(os.path.join(pot_dir, name)).st_mode ret[name] = (b,c) return ret
[docs] class TableException(Exception): pass
[docs] class Table: """ A class that provides a nice io abstraction for table like data. The data is saved in a pretty printed format. Also provides nice data retrival methods. >>> t = Table("sample.tbl", ['id', 'name', 'age' ]) >>> t.eagerwrite = False >>> t.add_row({'id':0,'name':"Sam","age":24}) >>> t.add_row({'id':1,'name':"David","age":50}) >>> t.add_row({'id':2,'name':"Anna","age":21}) >>> t #doctest: +NORMALIZE_WHITESPACE id name age -- ----- --- 0 Sam 24 1 David 50 2 Anna 21 Rows can be accessed directly: >>> t.rows[1] #doctest: +SKIP {'age': 50, 'id': 1, 'name': 'David'} >>> t.max_value('age') #doctest: +NORMALIZE_WHITESPACE 50 >>> t.min_row('age') #doctest: +NORMALIZE_WHITESPACE +SKIP {'age': 21, 'id': 2, 'name': 'Anna'} >>> sorted(t.min_row('id').items()) #doctest: +NORMALIZE_WHITESPACE [('age', 24), ('id', 0), ('name', 'Sam')] >>> len(t) #doctest: +NORMALIZE_WHITESPACE 3 >>> sum(t.getcolumn('age')) #doctest: +NORMALIZE_WHITESPACE 95 >>> t.write() #doctest: +SKIP The table can be loaded from disk without specifying columns. This is slightly unsafe because the columns can't be checked, but it could cut down on the verbosity in some places. >>> t2 = Table("sample.tbl") #doctest: +SKIP """ #XXX: This is the number of digits that a floating point number gets # serialized with. Should it be some sort of config option? # Or is there just a good default? def __init__(self, filename, columns=None, overwrite=False): self.filename = filename self.columns = columns self.rows = [] self.columntypes = {} self.columnwidths = {} self.initialized = False self.overwrite = overwrite self.floatprecision = 6 self.eagerwrite = True
[docs] def init(self): """Checks to see if self.filename exists. If it does self.rows will be initialized from disk.""" self.initialized = True if os.path.isfile(self.filename) and not self.overwrite: self.read(self.filename) else: if self.columns is None: raise TableException("Columns are not optional for new tables") for c in self.columns: self.columnwidths[c] = len(c)
[docs] def read(self, filename): self.eagerwrite = False f = open(self.filename, "r") filecolumns = f.readline().split() if self.columns != None: if filecolumns != self.columns: raise TableException("Column name mismatch: %s" % filename) else: self.columns = filecolumns for c in self.columns: self.columnwidths[c] = len(c) # skip comment line f.readline() for line in f: fields = line.split() row = {} coli = 0 for field in fields: try: field = int(field) except ValueError: try: field = float(field) except ValueError: field = field.strip() pass row[self.columns[coli]] = field coli += 1 self.add_row(row) f.close() self.eagerwrite = True
def __repr__(self): if not self.initialized: self.init() f = StringIO() self.writefilehandle(f) return f.getvalue() def __len__(self): if not self.initialized: self.init() return len(self.rows) def __iter__(self): for row in self.rows: yield row
[docs] def write(self): if not self.initialized: self.init() f = open(self.filename, "w") #print("into table write: ",self.filename) self.writefilehandle(f) f.close()
[docs] def writefilehandle(self, filehandle): f = filehandle line = ' '.join([ "%-*s"%(self.columnwidths[c], c) for c in self.columns ]) f.write(line+"\n") line = '' for c in self.columns: line += '-'*self.columnwidths[c]+' ' f.write(line+'\n') for row in self.rows: line = "" for c in self.columns: if self.columntypes[c] == float: line += "%#-*.*G " % (self.columnwidths[c],self.floatprecision, row[c]) else: line += "%-*s " % (self.columnwidths[c],row[c]) f.write(line+"\n")
[docs] def add_row(self, row): if not self.initialized: self.init() mismatched_columns = set(self.columns).symmetric_difference(set(row.keys())) if len(mismatched_columns) != 0: raise TableException("Mismatched columns %s" % str(mismatched_columns)) if len(self.rows) == 0: for c in row: self.columntypes[c] = type(row[c]) else: for c in row: if type(row[c]) != self.columntypes[c]: raise TableException("Type mismatch for column %s" % c) for c in row: if self.columntypes[c] == float: self.columnwidths[c] = max(self.columnwidths[c], self.floatprecision+5) else: self.columnwidths[c] = max(self.columnwidths[c], len(str(row[c]))) self.rows.append(row) if self.eagerwrite: self.write()
[docs] def delete_row(self, column, value): if not self.initialized: self.init() rows_to_delete = [] for row in self.rows: if row[column] == value: rows_to_delete.append(row) # map(self.rows.remove, rows_to_delete) for row in rows_to_delete: self.rows.remove(row) if self.eagerwrite: self.write() return len(rows_to_delete)
[docs] def delete_row_func(self, column, func): if not self.initialized: self.init() rows_to_delete = [] for row in self.rows: if func(row[column]): rows_to_delete.append(row) # map(self.rows.remove, rows_to_delete) for row in rows_to_delete: self.rows.remove(row) if self.eagerwrite: self.write() return len(rows_to_delete)
[docs] def find_value(self, column, func): if not self.initialized: self.init() value = None for row in self.rows: if value is None: value = row[column] continue value = func(value, row[column]) return value
[docs] def find_row(self, column, func): if not self.initialized: self.init() value = None for row in self.rows: if value is None: value = row continue if func(row[column],value[column])==row[column]: value = row return value
[docs] def min_value(self, column): return self.find_value(column, min)
[docs] def min_row(self, column): return self.find_row(column, min)
[docs] def max_value(self, column): return self.find_value(column, max)
[docs] def max_row(self, column): return self.find_row(column, max)
[docs] def get_row(self, column, value): if not self.initialized: self.init() for row in self.rows: if row[column] == value: return row return None
[docs] def get_rows(self, column, value): if not self.initialized: self.init() result = [] for row in self.rows: if row[column] == value: result.append(row) return result
[docs] def get_column(self, column): if not self.initialized: self.init() results = [] for row in self.rows: results.append(row[column]) return results
if __name__=='__main__': import doctest doctest.testmod()