import os
import sys
import glob
from eon import fileio as io
[docs]
def make_movie(movie_type, path_root, states, separate_files=False):
movie_path = "movie.poscar"
if movie_type == 'dynamics':
atoms_list = dynamics(path_root, states)
movie_path = "dynamics.poscar"
elif movie_type == 'states':
atoms_list = dynamics(path_root, states, True)
movie_path = "states.poscar"
elif movie_type == 'fastestpath':
atoms_list = fastest_path(path_root, states)
movie_path = "fastestpath.poscar"
elif movie_type == 'fastestfullpath':
atoms_list = fastest_path(path_root, states, full=True)
movie_path = "fastestfullpath.poscar"
elif movie_type.split(',')[0] == 'processes':
try:
statenr = int(movie_type.split(',')[1])
except ValueError:
print("state number must be an integer")
sys.exit(1)
except IndexError:
print("must give a state number")
sys.exit(1)
print("making process movie for state %i" % statenr)
if len(movie_type.split(',')) > 2:
limit = int(movie_type.split(',')[2])
else:
limit = 0
atoms_list = processes(states, statenr, limit)
movie_path = "processes_%i.poscar" % statenr
elif movie_type == 'graph':
s = dot(path_root, states)
if os.path.isfile("graph.dot"):
print("File %s already exists" % "graph.dot")
sys.exit(1)
f = open("graph.dot",'w')
f.write(s)
f.close()
print("If you have graphviz installed:")
print("dot graph.dot -Tpng -o graph.png")
sys.exit(0)
else:
print("Unknown MOVIE_TYPE")
sys.exit(1)
if separate_files:
movie_dir = "movies"
if not os.path.exists(movie_dir):
os.mkdir(movie_dir)
movie_path = os.path.join(movie_dir, movie_path)
# Delete existing files with the same root.
for old_poscar in glob.glob(movie_path + ".*"):
os.unlink(old_poscar)
# Write new movie.
for i, atoms in enumerate(atoms_list):
path_i = movie_path + (".%010d" % i)
io.saveposcar(path_i, atoms, 'w')
print("Saved %i frames to %s.*" % (len(atoms_list), movie_path))
else:
# Delete existing file.
if os.path.isfile(movie_path):
os.unlink(movie_path)
# Write movie.
for atoms in atoms_list:
io.saveposcar(movie_path, atoms, 'a')
print("Saved %i frames to %s" % (len(atoms_list), movie_path))
[docs]
def get_trajectory(trajectory_path):
f = open(trajectory_path)
trajectory = [0]
f.readline()
f.readline()
for line in f:
fields = line.split()
statenr = int(fields[3])
trajectory.append(statenr)
return trajectory
[docs]
def processes(states, statenr, limit):
try:
state = states.get_state(statenr)
except IOError:
print("error: Cannot make movie for non-existant state")
sys.exit(1)
process_table = state.get_process_table()
for k,v in list(process_table.items()):
process_table[k]['id'] = k
process_table[k]['reactant'] = state.get_process_reactant(k)
process_table[k]['saddle'] = state.get_process_saddle(k)
process_table[k]['product'] = state.get_process_product(k)
processes = list(process_table.values())
sorted_processes = sorted(processes, key=lambda a: a['rate'])
sorted_processes.reverse()
atoms_list = []
print("%4s %16s %16s %16s" % ("ID", "Rate", "Barrier", "Prefactor"))
print("-------------------------------------------------------")
for p in sorted_processes:
atoms_list.append(p['reactant'])
atoms_list.append(p['saddle'])
atoms_list.append(p['product'])
print("%4i %16.5e %16.5f %16.5e" % (p['id'], p['rate'] ,p['barrier'], p['prefactor']))
limit -= 1
if limit == 0:
break
return atoms_list
[docs]
def dot(path_root, states):
G = make_graph(states)
return G.dot()
[docs]
def dynamics(path_root, states, unique=False):
if not unique:
trajectory_path = os.path.join(path_root, "dynamics.txt")
trajectory = get_trajectory(trajectory_path)
else:
trajectory = list(range(states.get_num_states()))
atoms_list = []
if len(trajectory) == 0:
print("error: There have been no dynamics steps")
sys.exit(1)
for n in trajectory:
state = states.get_state(n)
reactant = state.get_reactant()
atoms_list.append(reactant)
return atoms_list
[docs]
def make_graph(states):
# Build the graph.
G = Graph()
for statenr in range(states.get_num_states()):
state = states.get_state(statenr)
G.add_node(state)
ptable = state.get_process_table()
for i,p in list(ptable.items()):
if p['product'] != -1:
neighbor_state = states.get_state(p['product'])
G.add_node(neighbor_state)
G.add_edge(state, neighbor_state, weight=1.0/p['rate'])
return G
[docs]
def fastest_path(path_root, states, full=False):
G = make_graph(states)
state_list = [states.get_state(0)]
if full:
nodes = sorted(G.nodes(), lambda a,b: a.number-b.number)
state_pairs = [ nodes[i:i+2] for i in range(0, len(nodes)-1) ]
for s1, s2 in state_pairs:
path = G.shortest_path(s1,s2)[1:]
for s in path:
state_list.append(s)
else:
end = states.get_state(states.get_num_states()-1)
state_list = G.shortest_path(state_list[0], end)
atoms_list = []
for i in range(len(state_list)):
atoms_list.append(state_list[i].get_reactant())
if len(state_list)-1 == i:
continue
atoms_list.append(state_list[i].get_process_saddle(
get_fastest_process_id(state_list[i],
state_list[i+1])))
time = 0.0
for i in range(len(state_list)-1):
ratesum = 0.0
for j in list(state_list[i].get_process_table().values()):
ratesum += j['rate']
print(time, state_list[i].number)
time += 1/ratesum
return atoms_list
[docs]
def get_fastest_process_id(state1, state2):
ptable = state1.get_process_table()
fastest = None
for i,p in list(ptable.items()):
if p['product'] == state2.number:
if not fastest or fastest[1] < p['rate']:
fastest = (i, p['rate'])
return fastest[0]
[docs]
def get_fastest_process_rate(state1, state2):
ptable = state1.get_process_table()
fastest = None
for i,p in list(ptable.items()):
if p['product'] == state2.number:
if not fastest or fastest[1] < p['rate']:
fastest = (i, p['rate'])
return fastest[1]
[docs]
class Graph:
def __init__(self, name=""):
self.name = name
self.graph = {}
def __repr__(self):
return str(self.graph)
[docs]
def dot(self):
unique_edges = set()
for node,edgedict in list(self.graph.items()):
edgelist = list(edgedict.keys())
for edge in edgelist:
if node.number < edge.number:
unique_edges.add( (node.number, edge.number) )
else:
unique_edges.add( (edge.number, node.number) )
s = "graph akmc {\n"
for edge in unique_edges:
s += "%s -- %s;\n" % edge
s += "}\n"
return s
[docs]
def add_node(self, node):
if node not in self.graph:
self.graph[node] = {}
[docs]
def add_edge(self, node1, node2, weight=1.0):
if node1 not in self.graph:
self.add_node(node1)
if node2 not in self.graph:
self.add_node(node2)
self.graph[node1][node2] = weight
[docs]
def neighbors(self,node):
if node in self.graph:
return list(self.graph[node].keys())
else:
return None
[docs]
def nodes(self):
return list(self.graph.keys())
[docs]
def dijkstra(self, start, end=None):
G = self.graph
D = {} # dictionary of final distances
P = {} # dictionary of predecessors
Q = priorityDictionary() # est.dist. of non-final vert.
Q[start] = 0
for v in Q:
D[v] = Q[v]
if v == end: break
for w in G[v]:
vwLength = D[v] + G[v][w]
if w in D:
if vwLength < D[w]:
raise ValueError("Dijkstra: found better path to already-final vertex")
elif w not in Q or vwLength < Q[w]:
Q[w] = vwLength
P[w] = v
return (D,P)
[docs]
def shortest_path(self, start, end):
D,P = self.dijkstra(start,end)
path = []
while 1:
path.append(end)
if end == start: break
end = P[end]
path.reverse()
return path
[docs]
class priorityDictionary(dict):
def __init__(self):
'''Initialize priorityDictionary by creating binary heap
of pairs (value,key). Note that changing or removing a dict entry will
not remove the old pair from the heap until it is found by smallest() or
until the heap is rebuilt.
'''
self.__heap = []
dict.__init__(self)
[docs]
def smallest(self):
'''Find smallest item after removing deleted items from heap.'''
if len(self) == 0:
raise IndexError("smallest of empty priorityDictionary")
heap = self.__heap
while heap[0][1] not in self or self[heap[0][1]] != heap[0][0]:
lastItem = heap.pop()
insertionPoint = 0
while 1:
smallChild = 2*insertionPoint+1
if smallChild+1 < len(heap) and \
heap[smallChild] > heap[smallChild+1]:
smallChild += 1
if smallChild >= len(heap) or lastItem <= heap[smallChild]:
heap[insertionPoint] = lastItem
break
heap[insertionPoint] = heap[smallChild]
insertionPoint = smallChild
return heap[0][1]
def __iter__(self):
'''Create destructive sorted iterator of priorityDictionary.'''
def iterfn():
while len(self) > 0:
x = self.smallest()
yield x
del self[x]
return iterfn()
def __setitem__(self,key,val):
'''Change value stored in dictionary and add corresponding
pair to heap. Rebuilds the heap if the number of deleted items grows
too large, to avoid memory leakage.
'''
dict.__setitem__(self,key,val)
heap = self.__heap
if len(heap) > 2 * len(self):
self.__heap = [(v,k) for k,v in list(self.items())]
self.__heap.sort() # builtin sort likely faster than O(n) heapify
else:
newPair = (val,key)
insertionPoint = len(heap)
heap.append(None)
while insertionPoint > 0 and \
newPair < heap[(insertionPoint-1)//2]:
heap[insertionPoint] = heap[(insertionPoint-1)//2]
insertionPoint = (insertionPoint-1)//2
heap[insertionPoint] = newPair
[docs]
def setdefault(self,key,val):
'''Reimplement setdefault to call our customized __setitem__.'''
if key not in self:
self[key] = val
return self[key]