Created
July 1, 2015 17:49
-
-
Save CestDiego/c063549e33740572ad45 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
from horton import Molecule, periodic | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from progressive.bar import Bar | |
import os | |
import math | |
import datetime | |
class WatershedGrid(object): | |
def __init__(self, *args, **kwargs): | |
"""Use a molecule file to shrink it and get a denser grid | |
""" | |
for key, value in kwargs.iteritems(): | |
setattr(self, key, value) | |
try: | |
mol_from_cube = Molecule.from_file("cube_files/" + | |
self.molecule_file + | |
".cube") | |
except: | |
raise | |
self.molecule = mol_from_cube | |
self.neighbor_data_path = "neighbor_data" | |
self.cube_data = self.molecule.cube_data | |
self.shape = self.cube_data.shape | |
self.size = self.cube_data.size | |
self.make_dirs_available() | |
# We shrink the steps and the origin | |
self.steps = self.molecule.grid.grid_rvecs / 2 | |
self.origin = self.molecule.grid.origin / 2 | |
self.density = self.create_density() | |
self.atoms = self.get_atoms_from(self.molecule.numbers) | |
self.indices_array = self.populate_indices_array() | |
self.density_curves = [ | |
6, | |
1, | |
0.1, | |
0.09, | |
0.08, | |
0.07, | |
0.06, | |
0.05, | |
0.04, | |
0.01, | |
0.005, | |
0.001, | |
0.1, | |
] | |
self.curve_index = 0 | |
self.basin_array = np.zeros(self.size, dtype=np.int) | |
def make_dirs_available(self): | |
self.mol_name = self.molecule.title.split()[0] | |
self.mol_basis = self.molecule.title.split()[1].split('/')[1].split('+')[0] | |
timestamp = str(datetime.datetime.now()).split()[1] | |
stages = ["thick", "thin", "elimination", "watersheds", "basins"] | |
self.root_dir_path = os.path.join("MoleculeData", | |
self.mol_name, | |
self.mol_basis, | |
self.dim_to_filename(self.shape), | |
timestamp) | |
self.dir_path = {} | |
for stage in stages: | |
self.dir_path[stage] = os.path.join(self.root_dir_path, | |
stage) | |
self.make_sure_dir_path_exists(self.dir_path[stage]) | |
def create_density(self): | |
print "Creating Density" | |
denser_grid_points = np.zeros((self.size, 3), | |
dtype=float) | |
for point_index in xrange(0, self.size): | |
point_cart = self.index_to_cartesian(point_index) | |
denser_grid_points[point_index] = point_cart | |
"Done iterating over grid points" | |
self.molecule_fchk = Molecule.from_file('fchk_files/' + | |
self.molecule_file + | |
'.fchk') | |
density = self.molecule_fchk.obasis.compute_grid_density_dm(self.molecule_fchk.get_dm_full(), | |
denser_grid_points) | |
# Plot_density | |
with PreserveShape(density, self.shape): | |
index, axis = self.image_axis | |
plane_data = np.log10(density.take(index, axis=axis)) | |
plt.axis('equal') | |
plt.contour(plane_data, 150) | |
file_path = os.path.join(self.root_dir_path, "contour.png") | |
plt.savefig(file_path, format='eps', dpi=900) | |
plt.clf() | |
return density | |
def get_atoms_from(self, molecule_numbers): | |
atoms = [] | |
for i, atomic_number in enumerate(molecule_numbers): | |
molecule_coordinates = self.molecule.coordinates[i] | |
covalent_radius = periodic[atomic_number].cov_radius | |
atoms.append({'number': atomic_number, | |
'coord': molecule_coordinates, | |
'cov_radius': covalent_radius, | |
'basin': i + 1}) # FIXME: This is not always true | |
return atoms | |
def save_basins(self, stage): | |
file_name = stage + ".npy" | |
file_path = os.path.join(self.dir_path["basins"], | |
file_name) | |
basins = self.basin_array | |
with PreserveShape(basins, self.shape): | |
# for point_index in xrange(0, self.size): | |
# point_coord = self.index_to_coord(point_index) | |
# if self.basin_array[point_index] <= 0: | |
# basins[point_coord] = 0 | |
np.save(file_path, basins) | |
@staticmethod | |
def make_sure_dir_path_exists(dir_path): | |
try: | |
os.stat(dir_path) | |
except: | |
print "Creating dir: ", dir_path | |
try: | |
os.makedirs(dir_path) | |
except: | |
print "Not possible to create dir" | |
raise | |
else: | |
return True | |
@staticmethod | |
def flatten_cube_data(cube_data): | |
return cube_data.ravel() | |
@property | |
def basin_array(self): | |
return self._basin_array | |
@basin_array.setter | |
def basin_array(self, array): | |
assert isinstance(array, np.ndarray), "Need a numpy array" | |
assert array.size == self.size, "Array size error" | |
assert array.shape == (self.size, ), "Array shape error" | |
self._basin_array = array | |
@property | |
def steps(self): | |
return self._steps | |
@steps.setter | |
def steps(self, value): | |
if isinstance(value, np.ndarray): | |
assert value.shape == (3, 3) | |
dx = value[0][0] | |
dy = value[1][1] | |
dz = value[2][2] | |
elif isinstance(value, tuple): | |
assert len(value) == 3 | |
dx, dy, dz = value | |
else: | |
raise ValueError("Need a tuple or an 3x3 np.ndarray") | |
self._steps = (dx, dy, dz) | |
@property | |
def neighbor_data_path(self): | |
return self._neighbor_data_path | |
@neighbor_data_path.setter | |
def neighbor_data_path(self, path): | |
self.make_sure_dir_path_exists(path) | |
self._neighbor_data_path = path | |
@property | |
def origin(self): | |
return self._origin | |
@origin.setter | |
def origin(self, origin): | |
assert len(origin) == 3, "You are not giving 3D origin" | |
self._origin = origin | |
@property | |
def root_dir_path(self): | |
return self._root_dir_path | |
@root_dir_path.setter | |
def root_dir_path(self, path): | |
self._root_dir_path = path | |
def index_to_cartesian(self, point_index): | |
if self.is_index(point_index): | |
point_coord = self.index_to_coord(point_index) | |
point_cart = self.coord_to_cartesian(point_coord) | |
return point_cart | |
elif self.is_coord(point_coord): | |
raise ValueError("You are trying to put a coord here") | |
else: | |
"Please input an index element to "\ | |
"this index_to_cartesian function" | |
def coord_to_cartesian(self, point_coord): | |
if self.is_coord(point_coord): | |
x, y, z = point_coord | |
x0, y0, z0 = self.origin | |
dx, dy, dz = self.steps | |
return (x0 + (x * dx), | |
y0 + (y * dy), | |
z0 + (z * dz)) | |
elif self.is_index(point_coord): | |
raise ValueError("You are trying to put an index here") | |
else: | |
"Please input a coordinate element to "\ | |
"this coord_to_cartesian function" | |
def cartesian_to_coord(self, point_cart): | |
x, y, z = point_cart | |
x0, y0, z0 = self.origin | |
dx, dy, dz = self.steps | |
return ((x - x0) // dx, | |
(y - y0) // dy, | |
(z - z0) // dz) | |
def cartesian_to_index(self, point_cart): | |
point_coord = self.cartesian_to_coord(point_cart) | |
print point_coord | |
return self.coord_to_index(point_coord) | |
def distance_from_origin(self, point): | |
if self.is_coord(point): | |
assert self.has_point(point) | |
point_coord = point | |
elif self.is_index(point): | |
point_coord = self.index_to_coord(point, self.shape) | |
assert self.has_point(point_coord) | |
else: | |
raise ValueError("Use an index or use a (x,y,z)") | |
return self.distanceP2P(point, self.origin) | |
def distanceP2P(self, point1, point2): | |
if self.is_coord(point1) and self.is_coord(point2): | |
x, y, z = self.coord_to_cartesian(point1) | |
m, n, p = self.coord_to_cartesian(point2) | |
return math.sqrt((x - m) ** 2 + (y - n) ** 2 + (z - p) ** 2) | |
else: | |
print "Please input a coordinate element to this function" | |
@staticmethod | |
def dim_to_filename(dimensions): | |
X, Y, Z = dimensions | |
filename = str(X) + "x" + str(Y) + "x" + str(Z) | |
return filename | |
def populate_indices_array(self): | |
"""This populates the indices_array with the coordinates and references | |
to the neighbor index in the same array""" | |
print "Populating array:" | |
neighbor_array_size = len(self.all_neighbors) | |
indices_array_shape = (self.size, neighbor_array_size) | |
indices_array = np.zeros(shape=indices_array_shape, | |
dtype=int) | |
neighbor_indices_filename = self.dim_to_filename(self.shape) + ".npy" | |
file_path = os.path.join(self.neighbor_data_path, | |
neighbor_indices_filename) | |
if(os.path.exists(file_path)): | |
indices_obtained = np.load(file_path) | |
indices_array[:] = indices_obtained[:] | |
print "Done, cheated by using the file: ", file_path | |
return indices_array | |
bar = Bar(max_value=100, fallback=True) | |
bar.cursor.clear_lines(2) | |
bar.cursor.save() | |
for point_index in xrange(0, self.size): | |
available_close_neighbors = [] | |
available_close_diagonal_neighbors = [] | |
available_diagonal_neighbors = [] | |
point_coord = self.index_to_coord(point_index) | |
x, y, z = point_coord | |
for (dx, dy, dz) in self.close_neighbors: | |
neighbor_coord = (x + dx, y + dy, z + dz) | |
if self.has_point(neighbor_coord): | |
available_close_neighbors.append(self.coord_to_index(neighbor_coord)) | |
for (dx, dy, dz) in self.close_diagonal_neighbors: | |
neighbor_coord = (x + dx, y + dy, z + dz) | |
if self.has_point(neighbor_coord): | |
available_close_diagonal_neighbors.append(self.coord_to_index(neighbor_coord)) | |
for (dx, dy, dz) in self.diagonal_neighbors: | |
neighbor_coord = (x + dx, y + dy, z + dz) | |
if self.has_point(neighbor_coord): | |
available_diagonal_neighbors.append(self.coord_to_index(neighbor_coord)) | |
inflated_close_neighbors = self.inflate_array(available_close_neighbors, | |
len(self.close_neighbors)) | |
inflated_close_diagonal_neighbors = self.inflate_array(available_close_diagonal_neighbors, | |
len(self.close_diagonal_neighbors)) | |
inflated_diagonal_neighbors = self.inflate_array(available_diagonal_neighbors, | |
len(self.diagonal_neighbors)) | |
# At this point we have the close and the diagonal available | |
# neighbors | |
neighbors = inflated_close_neighbors + inflated_close_diagonal_neighbors + inflated_diagonal_neighbors | |
indices_array[point_index, :] = neighbors | |
if (not (point_index % 50000)) or point_index == self.size: | |
# print (point_index * 100)/self.size, "%" | |
percentage = (point_index * 100)/self.size | |
# We restore the cursor to saved position before writing | |
bar.cursor.restore() | |
# Now we draw the bar | |
bar.draw(value=percentage) | |
np.save(file_path, indices_array) | |
print "Array Populated and stored in: ", file_path | |
return indices_array | |
# BEWARE OF ALIASING >> ;_; | |
def inflate_array(self, array, length): | |
"inflate array into an array on length `lenght` and put -1 in the ones that are needed" | |
assert isinstance(array, list) and len(array) <= length | |
assert isinstance(length, int) | |
while len(array) < length: | |
array.append(-1) | |
return array | |
def get_maxima_points(self): | |
"""Check each point in the grid, check the close neighbors and return the list of the maxima points encountered""" | |
for point_index in xrange(0, self.size): | |
self.is_maxima(point_index, | |
search_depth=self.get_close_neighbors_indices(point_index), | |
success=self.success_maxima) | |
print "In total: ", len(self.maxima_points), " maxima points considering only close neighbors" | |
return self.maxima_points | |
def success_maxima(self, point_index, *args, **kwargs): | |
available_neighbors_indices = kwargs.get('search_depth', None) | |
assert isinstance(available_neighbors_indices, list) or \ | |
isinstance(available_neighbors_indices, np.ndarray),\ | |
"You are not giving available neighbors to the success_maxima function" | |
self.maxima_points.append(point_index) | |
def is_maxima(self, point_index, *args, **kwargs): | |
success_function = kwargs.get('success', None) | |
failure_function = kwargs.get('failure', None) | |
available_neighbors_indices = kwargs.get('search_depth', None) | |
assert isinstance(available_neighbors_indices, list) or \ | |
isinstance(available_neighbors_indices, np.ndarray),\ | |
"You are not giving available neighbors to the is_maxima function" | |
point_density = self.get_density(point_index) | |
for neighbor_index in available_neighbors_indices: | |
neighbor_density = self.get_density(neighbor_index) | |
if neighbor_density >= point_density: | |
# Current point is watershed | |
if failure_function: | |
failure_function(point_index, *args, **kwargs) | |
return False | |
if success_function: | |
success_function(point_index, *args, **kwargs) | |
return True | |
def get_density(self, point): | |
"""Returns the density of the element in the point_index in the indices_array | |
""" | |
assert self.density.shape == (self.size,) | |
if self.is_coord(point): | |
if not self.has_point(point): | |
print point, "<<<Point" | |
import pdb; pdb.set_trace() | |
raise PointError(self, point, "none", "I have no life") | |
point_index = self.coord_to_index(point) | |
return self.density[point_index] | |
elif self.is_index(point): | |
if not self.has_point(point): | |
print point, "<<<Point" | |
import pdb; pdb.set_trace() | |
raise PointError(self, point, "none", "I have no life") | |
point_index = point | |
return self.density[point_index] | |
else: | |
raise ValueError("Use an index or use a (x,y,z)") | |
def get_basin(self, point_index): | |
assert self.has_point(point_index) | |
return self.basin_array[point_index] | |
def set_basin(self, point_index, basin): | |
assert isinstance(basin, int), "You may want integer basins" | |
assert self.has_point(point_index) | |
self.basin_array[point_index] = basin | |
@property | |
def size(self): | |
return self._size | |
@size.setter | |
def size(self, value): | |
assert isinstance(value, int), "Grid Size must be an integer" | |
self._size = value | |
@property | |
def density(self): | |
return self._density | |
@density.setter | |
def density(self, density_array): | |
assert isinstance(density_array, np.ndarray),\ | |
"Need a numpy array for density" | |
assert density_array.size == self.size,\ | |
"The density array doesn't have the size we need" | |
assert density_array.shape == (self.size,),\ | |
"We need a 1D array for density" | |
self._density = density_array | |
@property | |
def indices_array(self): | |
return self._indices_array | |
@indices_array.setter | |
def indices_array(self, array): | |
assert isinstance(array, np.ndarray) | |
self._indices_array = array | |
@property | |
def shape(self): | |
return self._shape | |
@shape.setter | |
def shape(self, shape): | |
assert isinstance(shape, tuple), "Need a tuple for shape" | |
assert len(shape) == 3, "only 3D shape supported" | |
self._shape = shape | |
@property | |
def mol_name(self): | |
return self._mol_name | |
@mol_name.setter | |
def mol_name(self, name): | |
self._mol_name = name | |
@property | |
def mol_basis(self): | |
return self._mol_basis | |
@mol_basis.setter | |
def mol_basis(self, basis_name): | |
self._mol_basis = basis_name | |
@property | |
def close_neighbors(self): | |
return [ | |
(-1, 0, 0), | |
(0, -1, 0), | |
(1, 0, 0), | |
(0, 1, 0), | |
(0, 0, 1), | |
(0, 0, -1), | |
] | |
@property | |
def close_diagonal_neighbors(self): | |
return [ | |
(-1, -1, 0), | |
(-1, 0, -1), | |
(-1, 0, 1), | |
(-1, 1, 0), | |
(0, -1, -1), | |
(0, -1, 1), | |
(0, 1, -1), | |
(0, 1, 1), | |
(1, -1, 0), | |
(1, 0, -1), | |
(1, 0, 1), | |
(1, 1, 0), | |
] | |
@property | |
def diagonal_neighbors(self): | |
return [ | |
(-1, -1, -1), | |
(-1, -1, 1), | |
(-1, 1, -1), | |
(-1, 1, 1), | |
(1, -1, -1), | |
(1, -1, 1), | |
(1, 1, -1), | |
(1, 1, 1) | |
] | |
@property | |
def all_neighbors(self): | |
return self.close_neighbors + \ | |
self.close_diagonal_neighbors +\ | |
self.diagonal_neighbors | |
def get_close_neighbors_indices(self, point_index): | |
length = len(self.close_neighbors) | |
close_neighbors = self.indices_array[point_index][:length] | |
mask = np.where(close_neighbors != -1) | |
return close_neighbors[mask] | |
# close_available_neighbors = close_neighbors[mask] | |
# sorted_indices = sorted(xrange(len(close_available_neighbors)), | |
# key=lambda k: | |
# self.get_density(close_available_neighbors[k]), | |
# reverse=True) | |
# # LESS AWFUL CODE | |
# sorted_density_indices = [close_available_neighbors[index] | |
# for index in sorted_indices] | |
# return sorted_density_indices | |
def get_close_diagonal_neighbors_indices(self, point_index): | |
length = len(self.close_neighbors) | |
length2 = len(self.close_neighbors + self.close_diagonal_neighbors) | |
close_diagonal_neighbors = self.indices_array[point_index][length:length2] | |
mask = np.where(close_diagonal_neighbors != -1) | |
return close_diagonal_neighbors[mask] | |
def get_diagonal_neighbors_indices(self, point_index): | |
length = len(self.close_neighbors + self.close_diagonal_neighbors) | |
diagonal_neighbors = self.indices_array[point_index][length:] | |
mask = np.where(diagonal_neighbors != -1) | |
return diagonal_neighbors[mask] | |
def get_all_neighbors_indices(self, point_index): | |
assert isinstance(point_index, int), "You'd better give an integer "\ | |
"as a point_index" | |
return np.concatenate((self.get_close_neighbors_indices(point_index), | |
self.get_close_diagonal_neighbors_indices(point_index), | |
self.get_diagonal_neighbors_indices(point_index))) | |
def get_close_neighbors_cart(self, point_index): | |
neighbor_indices = self.get_close_neighbors_indices(point_index) | |
neighbor_cart = [self.index_to_cart(neighbor_index) | |
for neighbor_index in neighbor_indices] | |
return neighbor_cart | |
def get_close_diagonal_neighbors_cart(self, point_index): | |
neighbor_indices = self.get_close_diagonal_neighbors_indices(point_index) | |
neighbor_cart = [self.index_to_cart(neighbor_index) | |
for neighbor_index in neighbor_indices] | |
return neighbor_cart | |
def get_diagonal_neighbors_cart(self, point_index): | |
neighbor_indices = self.get_diagonal_neighbors_indices(point_index) | |
neighbor_cart = [self.index_to_cart(neighbor_index) | |
for neighbor_index in neighbor_indices] | |
return neighbor_cart | |
def get_all_neighbors_cart(self, point_index): | |
neighbor_indices = self.get_all_neighbors_indices(point_index) | |
neighbor_cart = [self.index_to_cart(neighbor_index) | |
for neighbor_index in neighbor_indices] | |
return neighbor_cart | |
def index_to_coord(self, index): | |
I, J, K = self.shape | |
N_1D = K | |
N_2D = K * J | |
i = index / N_2D | |
j = (index - N_2D * i) / N_1D | |
k = index - N_2D * i - N_1D * j | |
return (i, j, k) | |
def coord_to_index(self, coord): | |
I, J, K = self.shape | |
i, j, k = coord | |
N_1D = K | |
N_2D = K * J | |
return k + N_1D * j + N_2D * i | |
@staticmethod | |
def is_coord(point): | |
return (isinstance(point, tuple) or | |
isinstance(point, list)) and len(point) == 3 | |
@staticmethod | |
def is_index(point): | |
if isinstance(point, float): | |
raise ValueError("You have float point indices , %s" % point) | |
return isinstance(point, int) | |
def has_point(self, point): | |
"""Returns True if point is in density_grid | |
""" | |
if self.is_index(point): | |
point_coord = self.index_to_coord(point) | |
elif self.is_coord(point): | |
point_coord = point | |
else: | |
raise ValueError("Use an index or use a (x,y,z)\n" + | |
"Point given: " + repr(point) + "\n" + | |
str(type(point))) | |
x, y, z = point_coord | |
X, Y, Z = self.shape | |
if (x < X and x >= 0) and \ | |
(y < Y and y >= 0) and \ | |
(z < Z and z >= 0): | |
return True | |
else: | |
return False | |
class WatershedGrid2D(WatershedGrid): | |
@property | |
def close_neighbors(self): | |
return [ | |
(-1, 0), | |
(0, -1), | |
(1, 0), | |
(0, 1) | |
] | |
@property | |
def diagonal_neighbors(self): | |
return [ | |
(-1, -1), | |
(1, -1), | |
(-1, 1), | |
(1, 1) | |
] | |
class Point: | |
def __init__(self, grid, point): | |
"""the current point and information about it's neighbors | |
grid: a WatershedGrid instance | |
point: a point which can be a coordinate, index, or cartesian | |
message: the message you want to print to the output | |
depth: the amount of neighbors you want to print""" | |
self.grid = grid | |
if self.grid.is_index(point): | |
self.point_index = point | |
self.point_coord = self.grid.index_to_coord(point) | |
self.point_cart = self.grid.coord_to_cartesian(self.point_coord) | |
elif grid.is_coord(point): | |
self.point_index = self.grid.coord_to_index(point) | |
self.point_coord = point | |
self.point_cart = self.grid.coord_to_cartesian(point) | |
else: | |
raise ValueError("Need a point to be either an index or a coord") | |
@property | |
def grid(self): | |
return self._grid | |
@grid.setter | |
def grid(self, value): | |
assert isinstance(value, WatershedGrid) | |
self._grid = value | |
def __str__(self): | |
point_density = self.grid.get_density(self.point_index) | |
point_basin = self.grid.get_basin(self.point_index) | |
return "--------------------------\n" +\ | |
"index: " + repr(self.point_index) + "\n" +\ | |
"coord: " + repr(self.point_coord) + "\n" +\ | |
"cartesian: " + repr(self.point_cart) + "\n" +\ | |
"density: " + repr(point_density) + "\n" +\ | |
"basin: " + repr(point_basin) + "\n" | |
class PointError(ValueError): | |
def __init__(self, grid, point, depth, message): | |
"""Print the current point and information about it's neighbors | |
grid is a WatershedGrid instance | |
point is a point which can be a coordinate, index, or cartesian | |
message is the message you want to print to the output | |
depth is the amount of neighbors you want to print""" | |
self.grid = grid | |
self.point = Point(grid, point) | |
self.message = message | |
if depth == "close": | |
self.get_neighbors_indices = self.grid.get_close_neighbors_indices | |
elif depth == "diagonal": | |
self.get_neighbors_indices = self.grid.get_diagonal_neighbors_indices | |
elif depth == "all": | |
self.get_neighbors_indices = self.grid.get_all_neighbors_indices | |
elif depth == "none": | |
self.get_neighbors_indices = None | |
else: | |
raise ValueError("Depth given is not accepted") | |
self.grid.save_basins("lel") | |
@property | |
def grid(self): | |
return self._grid | |
@grid.setter | |
def grid(self, value): | |
assert isinstance(value, WatershedGrid) | |
self._grid = value | |
@property | |
def message(self): | |
return self._message | |
@message.setter | |
def message(self, value): | |
self._message = value + "\n" | |
def __str__(self): | |
point_info = "Current Point: \n" + str(self.point) | |
point_index = self.point.point_index | |
if self.get_neighbors_indices: | |
neighbor_data = [str(Point(self.grid, neighbor)) | |
for neighbor | |
in self.get_neighbors_indices(point_index)] | |
neighbor_info = "Neighboring Points: \n" + \ | |
'\n'.join(neighbor_data) | |
return self.message + point_info + neighbor_info | |
else: | |
return self.message + point_info | |
class PreserveShape: | |
@property | |
def data(self): | |
return self._data | |
@data.setter | |
def data(self, value): | |
assert isinstance(value, np.ndarray) | |
self._data = value | |
def __init__(self, data, new_shape): | |
self.data = data | |
self.old_shape = data.shape | |
self.new_shape = new_shape | |
def __enter__(self): | |
self.data.shape = self.new_shape | |
return self.data | |
def __exit__(self, type, value, traceback): | |
self.data.shape = self.old_shape |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment