Con(figuration) i/o library
import configparser
from io import StringIO
import logging
logger = logging.getLogger('io')
import numpy
import os

import pickle as pickle

from eon import atoms
from eon.config import config

def save_prng_state():
    state = numpy.random.get_state()
    fh = open('prng.pkl', 'wb')
    pickle.dump(state, fh, pickle.HIGHEST_PROTOCOL)
def get_prng_state():
    fh = open('prng.pkl', 'rb')
    state = pickle.load(fh)
    numpy.random.set_state(state)
def length_angle_to_box(boxlengths, angles):
    box = numpy.zeros( (3,3) )
    angles *= numpy.pi/180.0
    box[0][0] = 1.0
    box[1][0] = numpy.cos(angles[0])
    box[1][1] = numpy.sin(angles[0])
    box[2][0] = numpy.cos(angles[1])
    box[2][1] = (numpy.cos(angles[2])-box[1][0]*box[2][0])/box[1][1]
    box[2][2] = numpy.sqrt(1.0-box[2][0]**2-box[2][1]**2)
    box[0,:]*=boxlengths[0]
    box[1,:]*=boxlengths[1]
    box[2,:]*=boxlengths[2]
    return box
def box_to_length_angle(box):
    lengths = numpy.zeros(3)
    lengths[0] = numpy.linalg.norm(box[0,:])
    lengths[1] = numpy.linalg.norm(box[1,:])
    lengths[2] = numpy.linalg.norm(box[2,:])
    angles = numpy.zeros(3)
    angles[0] = numpy.arccos([0,:]/lengths[0],box[1,:]/lengths[1]))
    angles[1] = numpy.arccos([0,:]/lengths[0],box[2,:]/lengths[2]))
    angles[2] = numpy.arccos([1,:]/lengths[1],box[2,:]/lengths[2]))
    angles *= 180.0/numpy.pi
    return lengths, angles
def loadcons(filename):
    filein = open(filename, 'r')
    p = []
    while True:
        try:
            p.append(loadcon(filein, reset=False))
        except:
            return p
def loadposcars(filename):
    filein = open(filename, 'r')
    p = []
    while True:
        try:
            p.append(loadposcar(filein))
        except:
            return p
def loadcon(filein, reset = True):
    '''
    Load a con file
    filein: may be either a filename or a file-like object
    '''
    if hasattr(filein, 'readline'):
        con = filein
    else:
        con = open(filein, 'r')

    con.readline()  # line 1: comment
    con.readline()  # line 2: comment

    # determine how many dimensions
    tmp = numpy.array(con.readline().split())  # line 3: Box lengths
    for i in range(len(tmp)):
        dim=i+1
        try:
            float(tmp[i])
        except:
            dim=i
            break

    # handle the box
    boxlengths=numpy.zeros(dim)
    for i in range(dim):
        boxlengths[i]=float(tmp[i])
    boxangles=numpy.array([ float(f) for f in con.readline().split()[0:dim] ])  # line 4: Box angles
    boxtemp=numpy.zeros((dim,dim),'d')
    boxtemp = length_angle_to_box(boxlengths,boxangles)

    con.readline()  # line 5: comment
    con.readline()  # line 6: comment

    num_types = int(con.readline().split()[0])  # line 7: number of atom types
    num_each_type = con.readline().split()  # line 8: number of each type of atom
    mass_of_type = con.readline().split()  # line 9: mass of each type of atom

    num_atoms = 0
    for i in range(num_types):
        num_each_type[i] = int(num_each_type[i])
        mass_of_type[i] = float(mass_of_type[i])
        num_atoms += num_each_type[i]

    a = atoms.Atoms(num_atoms)
 = boxtemp

    index = 0
    for i in range(num_types):
        name = con.readline().strip()
        if abs(1.0-mass_of_type[i]) < 1e-6 and name != "H":
            logger.warning("WARNING: Mass of %s set to 1.0", name)
        con.readline()  # skip meaningless line
        for j in range(num_each_type[i]):
            vals = con.readline().split()
            for k in range(dim):
                a.r[index][k] = float(vals[k])
            a.mass[index] = mass_of_type[i]
            a.names[index] = name
            if not int(vals[dim])==0:
      [index]=0
            index += 1

    if reset:
    return a
def savecon(fileout, p, w = 'w'):
    '''
    Save a con file
    fileout: can be either a file name or a file-like object
    p: information (in the form of an atoms object) to save
    w: write/append flag
    '''
    if hasattr(fileout, 'write'):
        con = fileout
    else:
        con = open(fileout, w)

    con.write("Generated by eOn\n\n")
    dim = len(p.r[0])
    lengths, angles = box_to_length_angle(
    con.write("  ".join(['%.6f' % s for s in lengths])+"\n")
    con.write("  ".join(['%.6f' % s for s in angles])+"\n")
    con.write("\n\n")

    atom_count = {}
    name_order = []
    for i in range(len(p)):
        name = p.names[i]
        if name not in name_order:
            name_order.append(name)
        if name in atom_count:
            atom_count[name] += 1
        else:
            atom_count[name] = 1

    con.write(str(len(name_order))+"\n")
    con.write("  ".join([str(atom_count[i]) for i in name_order])+"\n")

    printmasses = []
    index = 0
    for i in range(len(name_order)):
        printmasses.append(p.mass[index])
        index += atom_count[name_order[i]]
    con.write("  ".join(["%.6f"% i for i in printmasses])+"\n")

    index = 0
    for i in range(len(name_order)):
        con.write(name_order[i]+"\n")
        con.write("Coordinates of component %3i" %(i+1)+"\n")
        for j in range(atom_count[name_order[i]]):
            con.write("%.6f %.6f %.6f %d %d\n" %(
                p.r[index][0], p.r[index][1], p.r[index][2],
                int(not[index]), index+1))
            index += 1
def load_mode(modefilein):
    '''
    Reads a mode.dat file into an N by 3 numpy array
    modefilein: may be either a file-like object of a filename
    '''
    if hasattr(modefilein, 'readline'):
        f = modefilein
    else:
        f = open(modefilein, 'r')
    if len(f.readline().split()) == 3:
    lines = f.readlines()
    mode = []
    for line in lines:
        l = line.strip().split()
        for j in range(3):
            mode.append(float(l[j]))
    return numpy.array(mode).reshape(len(mode)//3, 3)
def save_mode(modefileout, displace_vector):
    '''
    Saves an Nx3 numpy array into a mode.dat file.
    modefileout: may be either a filename or file-like object
    displace_vector: the mode (Nx3 numpy array)
    '''
    if hasattr(modefileout, 'write'):
        f = modefileout
    else:
        f = open(modefileout, 'w')
    for i in range(len(displace_vector)):
        f.write("%.3f %.3f %.3f\n" % (displace_vector[i][0],
                                       displace_vector[i][1],
                                       displace_vector[i][2]))
def save_results_dat(fileout, results):
    '''
    Saves a results.dat file from a dictionary
    '''
    if hasattr(fileout, 'write'):
        f = fileout
    else:
        f = open(fileout, 'w')
    for key in results:
        #print(results[key], key, con)
        #GH: this made no sense to me - replaced with the following
        f.write(results[key], key)
def modify_config(config_path, changes):
    parser = configparser.ConfigParser()
    for change in changes:
        parser.set(*change)
    config_str_io = StringIO()
    parser.write(config_str_io)
    return config_str_io
def parse_results(filein):
    '''
    Reads a results.dat file and gives a dictionary of the values contained therein
    '''
    if hasattr(filein, 'readline'):
        f = filein
    else:
        f = open(filein)
    results = {}
    for line in f:
        line = line.split()
        if len(line) < 2:
            continue
        if '.' in line[0]:
            try:
                results[line[1]] = float(line[0])
            except ValueError:
                logger.warning("Couldn't parse float in results.dat: %s", line)
        else:
            try:
                results[line[1]] = int(line[0])
            except ValueError:
                try:
                    results[line[1]] = line[0]
                except ValueError:
                    logger.warning("Couldn't parse string in results.dat: %s", line)
    return results
def loadposcar(filein):
    '''
    Load the POSCAR file named filename and returns an atoms object
    '''
    if hasattr(filein, 'readline'):
        f = filein
    else:
        f = open(filein, 'r')

    # Line 1: Atom types
    AtomTypes = f.readline().split()

    # Line 2: scaling of coordinates
    scale = float(f.readline())

    # Lines 3-5: the box
    box = numpy.zeros((3, 3))
    for i in range(3):
        line = f.readline().split()
        box[i] = numpy.array([float(line[0]), float(line[1]), float(line[2])]) * scale

    # Line 6: number of atoms of each type.
    line = f.readline().split()
    NumAtomsPerType = []
    for l in line:
        NumAtomsPerType.append(int(l))

    # Now have enough info to make the atoms object.
    num_atoms = sum(NumAtomsPerType)
    p = atoms.Atoms(num_atoms)

    # Fill in the box.
 = box

    # Line 7: selective or cartesian
    sel = f.readline()[0]
    selective_flag = (sel == 's' or sel == 'S')
    if not selective_flag:
        car = sel
    else:
        car = f.readline()[0]
    direct_flag = not (car == 'c' or car == 'C' or car == 'k' or car == 'K')

    atom_index = 0
    for i in range(len(NumAtomsPerType)):
        for j in range(NumAtomsPerType[i]):
            p.names[atom_index] = AtomTypes[i]
            line = f.readline().split()
            if(selective_flag):
                assert len(line) >= 6
            else:
                assert len(line) >= 3
            pos = line[0:3]
            if selective_flag:
                sel = line[3:7]
                if sel[0] == 'T' or sel[0] == 't':
          [atom_index] = 1
                elif sel[0] == 'F' or sel[0] == 'f':
          [atom_index] = 0
            p.r[atom_index] = numpy.array([float(q) for q in pos])
            if direct_flag:
                p.r[atom_index] =[atom_index],
            else:
                p.r[atom_index] *= scale
            atom_index += 1
    return p
def saveposcar(fileout, p, w='w', direct = False):
    '''
    Save a POSCAR
    fileout: name to save it under
    point: atoms object to save
    w: write/append flag
    '''
    if hasattr(fileout, 'write'):
        poscar = fileout
    else:
        poscar = open(fileout, w)

    atom_types = []
    num_each_type = {}
    for name in p.names:
        if not name in atom_types:
            atom_types.append(name)
            num_each_type[name] = 1
        else:
            num_each_type[name] += 1

    poscar.write("  ".join(atom_types)+'\n')  #Line 1: Atom type
    poscar.write("1.0\n")  #Line 2: scaling
    for i in range(3):
        poscar.write("  ".join(['%20.14f' % s for s in[i]])+'\n')  #lines 3-5: box
    poscar.write("  ".join(atom_types)+
from configparser import ConfigParser as SCP
[docs] class ini(SCP): def __init__(self, filenames): self.loaded = False self.filenames = filenames SCP.__init__(self)
[docs] def read(self): self.loaded = True, self.filenames)
[docs] def get(self, section, option, default="ini_no_default", **kwargs): # def get(self, section, option, default="ini_no_default"): if not self.loaded: try:, self.filenames) #value = SCP.get(self, section, option, raw=True, **kwargs) value = SCP.get(self, section, option, **kwargs) #value = SCP.get(self, section, option, raw=True) #value = SCP.get(self, section, option) except: if default == "ini_no_default": raise NameError("Section or option missing, no default specified") return default try: return int(value) except ValueError: pass try: return float(value) except ValueError: pass if value.lower() == 'true': return True if value.lower() == 'false': return False return value
[docs] def getint(self, *args): raise NotImplementedError("Use the get function with this ConfigParser wrapper")
[docs] def getfloat(self, *args): raise NotImplementedError("Use the get function with this ConfigParser wrapper")
[docs] def getboolean(self, *args): raise NotImplementedError("Use the get function with this ConfigParser wrapper")
[docs] def set(self, section, option, value): if section not in self.sections(): self.add_section(section) SCP.set(self, section, option, str(value)) if type(self.filenames) == str: name = self.filenames else: name = self.filenames[-1] # configfile = open(name, 'wb') configfile = open(name, 'w') self.write(configfile) configfile.close()
[docs] class Dynamics: """ The Dynamics class handles I/O for the dynamics.txt file of an aKMC simulation. """ def __init__(self, filename): self.filename = filename if not os.path.exists(filename): f = open(self.filename, 'w') header = "%12s %12s %12s %12s %12s %12s %12s %12s %12s\n" % ('step-number', 'reactant-id', 'process-id', 'product-id', 'step-time', 'total-time', 'barrier', 'rate', 'energy') f.write(header) f.write("-" * len(header)) f.write("\n") f.close() self.next_step = 0 # read last lines of the file to determine iteration nr else: f = open(self.filename,'r'),2) #seek to EOF fsize = f.tell() # seek 1024 bytes back (or to beginning of file if fsize < 1024 ) # last line must be contained in this block max( fsize - 1024 , 0 ) , 0) lines = f.readlines() self.next_step = int ( lines[-1].split()[0] ) + 1 # determine iteration nr of next step
[docs] def append(self, reactant_id, process_id, product_id, step_time, total_time, barrier, rate, energy): f = open(self.filename, 'a') f.write("%12d %12d %12d %12d %12e %12e %12f %12e %12f\n" % (self.next_step, reactant_id, process_id, product_id, step_time, total_time, barrier, rate, energy)) f.close() self.next_step += 1
[docs] def append_sb(self, reactant_id, process_id, product_id, step_time, total_time, basin_id, rate, energy): f = open(self.filename, 'a') f.write("%12d %12d %12d %12d %12e %12e %12d %12e %12f\n" % (self.next_step, reactant_id, process_id, product_id, step_time, total_time, basin_id, rate, energy)) f.close() self.next_step += 1
[docs] def get(self): f = open(self.filename, 'r') lines = f.readlines()[2:] f.close() data = [] for line in lines: split = line.split() data.append({"reactant": int(split[1]), "process": int(split[2]), "product": int(split[3]), "steptime": float(split[4]), "totaltime": float(split[5]), "barrier": float(split[6]), "prefactor": float(split[7])}) return data
[docs] def load_potfiles(pot_dir): ret = {} if os.path.isdir(pot_dir): for name in os.listdir(pot_dir): if os.path.isdir(name): continue a = open(os.path.join(pot_dir, name), 'r') b = StringIO("".join(a.readlines())) c = os.stat(os.path.join(pot_dir, name)).st_mode ret[name] = (b,c) return ret
[docs] class TableException(Exception): pass
[docs] class Table: """ A class that provides a nice io abstraction for table like data. The data is saved in a pretty printed format. Also provides nice data retrival methods. >>> t = Table("sample.tbl", ['id', 'name', 'age' ]) >>> t.eagerwrite = False >>> t.add_row({'id':0,'name':"Sam","age":24}) >>> t.add_row({'id':1,'name':"David","age":50}) >>> t.add_row({'id':2,'name':"Anna","age":21}) >>> t #doctest: +NORMALIZE_WHITESPACE id name age -- ----- --- 0 Sam 24 1 David 50 2 Anna 21 Rows can be accessed directly: >>> t.rows[1] #doctest: +SKIP {'age': 50, 'id': 1, 'name': 'David'} >>> t.max_value('age') #doctest: +NORMALIZE_WHITESPACE 50 >>> t.min_row('age') #doctest: +NORMALIZE_WHITESPACE +SKIP {'age': 21, 'id': 2, 'name': 'Anna'} >>> sorted(t.min_row('id').items()) #doctest: +NORMALIZE_WHITESPACE [('age', 24), ('id', 0), ('name', 'Sam')] >>> len(t) #doctest: +NORMALIZE_WHITESPACE 3 >>> sum(t.getcolumn('age')) #doctest: +NORMALIZE_WHITESPACE 95 >>> t.write() #doctest: +SKIP The table can be loaded from disk without specifying columns. This is slightly unsafe because the columns can't be checked, but it could cut down on the verbosity in some places. >>> t2 = Table("sample.tbl") #doctest: +SKIP """ #XXX: This is the number of digits that a floating point number gets # serialized with. Should it be some sort of config option? # Or is there just a good default? def __init__(self, filename, columns=None, overwrite=False): self.filename = filename self.columns = columns self.rows = [] self.columntypes = {} self.columnwidths = {} self.initialized = False self.overwrite = overwrite self.floatprecision = 6 self.eagerwrite = True
[docs] def init(self): """Checks to see if self.filename exists. If it does self.rows will be initialized from disk.""" self.initialized = True if os.path.isfile(self.filename) and not self.overwrite: else: if self.columns is None: raise TableException("Columns are not optional for new tables") for c in self.columns: self.columnwidths[c] = len(c)
[docs] def read(self, filename): self.eagerwrite = False f = open(self.filename, "r") filecolumns = f.readline().split() if self.columns != None: if filecolumns != self.columns: raise TableException("Column name mismatch: %s" % filename) else: self.columns = filecolumns for c in self.columns: self.columnwidths[c] = len(c) # skip comment line f.readline() for line in f: fields = line.split() row = {} coli = 0 for field in fields: try: field = int(field) except ValueError: try: field = float(field) except ValueError: field = field.strip() pass row[self.columns[coli]] = field coli += 1 self.add_row(row) f.close() self.eagerwrite = True
def __repr__(self): if not self.initialized: self.init() f = StringIO() self.writefilehandle(f) return f.getvalue() def __len__(self): if not self.initialized: self.init() return len(self.rows) def __iter__(self): for row in self.rows: yield row
[docs] def write(self): if not self.initialized: self.init() f = open(self.filename, "w") #print("into table write: ",self.filename) self.writefilehandle(f) f.close()
[docs] def writefilehandle(self, filehandle): f = filehandle line = ' '.join([ "%-*s"%(self.columnwidths[c], c) for c in self.columns ]) f.write(line+"\n") line = '' for c in self.columns: line += '-'*self.columnwidths[c]+' ' f.write(line+'\n') for row in self.rows: line = "" for c in self.columns: if self.columntypes[c] == float: line += "%#-*.*G " % (self.columnwidths[c],self.floatprecision, row[c]) else: line += "%-*s " % (self.columnwidths[c],row[c]) f.write(line+"\n")
[docs] def add_row(self, row): if not self.initialized: self.init() mismatched_columns = set(self.columns).symmetric_difference(set(row.keys())) if len(mismatched_columns) != 0: raise TableException("Mismatched columns %s" % str(mismatched_columns)) if len(self.rows) == 0: for c in row: self.columntypes[c] = type(row[c]) else: for c in row: if type(row[c]) != self.columntypes[c]: raise TableException("Type mismatch for column %s" % c) for c in row: if self.columntypes[c] == float: self.columnwidths[c] = max(self.columnwidths[c], self.floatprecision+5) else: self.columnwidths[c] = max(self.columnwidths[c], len(str(row[c]))) self.rows.append(row) if self.eagerwrite: self.write()
[docs] def delete_row(self, column, value): if not self.initialized: self.init() rows_to_delete = [] for row in self.rows: if row[column] == value: rows_to_delete.append(row) # map(self.rows.remove, rows_to_delete) for row in rows_to_delete: self.rows.remove(row) if self.eagerwrite: self.write() return len(rows_to_delete)
[docs] def delete_row_func(self, column, func): if not self.initialized: self.init() rows_to_delete = [] for row in self.rows: if func(row[column]): rows_to_delete.append(row) # map(self.rows.remove, rows_to_delete) for row in rows_to_delete: self.rows.remove(row) if self.eagerwrite: self.write() return len(rows_to_delete)
[docs] def find_value(self, column, func): if not self.initialized: self.init() value = None for row in self.rows: if value is None: value = row[column] continue value = func(value, row[column]) return value
[docs] def find_row(self, column, func): if not self.initialized: self.init() value = None for row in self.rows: if value is None: value = row continue if func(row[column],value[column])==row[column]: value = row return value
[docs] def min_value(self, column): return self.find_value(column, min)
[docs] def min_row(self, column): return self.find_row(column, min)
[docs] def max_value(self, column): return self.find_value(column, max)
[docs] def max_row(self, column): return self.find_row(column, max)
[docs] def get_row(self, column, value): if not self.initialized: self.init() for row in self.rows: if row[column] == value: return row return None
[docs] def get_rows(self, column, value): if not self.initialized: self.init() result = [] for row in self.rows: if row[column] == value: result.append(row) return result
[docs] def get_column(self, column): if not self.initialized: self.init() results = [] for row in self.rows: results.append(row[column]) return results
