Created
September 28, 2022 23:29
-
-
Save Mike3285/dbfcec14b2af1297c001a7e9c794be16 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Pool | |
import pickle | |
from attilio.auto_picking import * | |
from tqdm.contrib.concurrent import process_map | |
# caricamento dati | |
def _traceback(D): | |
i, j = array(D.shape) - 2 | |
p, q = [i], [j] | |
while (i > 0) or (j > 0): | |
tb = argmin((D[i, j], D[i, j + 1], D[i + 1, j])) | |
if tb == 0: | |
i -= 1 | |
j -= 1 | |
elif tb == 1: | |
i -= 1 | |
else: # (tb == 2): | |
j -= 1 | |
p.insert(0, i) | |
q.insert(0, j) | |
return array(p), array(q) | |
def manhattan_distance(target_seq, input_seq): | |
return np.abs(input_seq - target_seq) | |
def manhattan_dtw(x, y, warp=1, w=inf, s=1.0): | |
""" | |
Computes Dynamic Time Warping (DTW) of two sequences. | |
:param array x: N1*M array | |
:param array y: N2*M array | |
:param int warp: how many shifts are computed. | |
:param int w: window size limiting the maximal distance between indices of matched entries |i,j|. | |
:param float s: weight applied on off-diagonal moves of the path. As s gets larger, the warping path is increasingly biased towards the diagonal | |
Returns the minimum distance, the cost matrix, the accumulated cost matrix, and the wrap path. | |
""" | |
assert len(x) | |
assert len(y) | |
assert isinf(w) or (w >= np.abs(len(x) - len(y))) | |
assert s > 0 | |
r, c = len(x), len(y) | |
if not isinf(w): | |
D0 = full((r + 1, c + 1), inf) | |
for i in range(1, r + 1): | |
D0[i, np.max(1, i - w):np.min(c + 1, i + w + 1)] = 0 | |
D0[0, 0] = 0 | |
else: | |
D0 = zeros((r + 1, c + 1)) | |
D0[0, 1:] = inf | |
D0[1:, 0] = inf | |
D1 = D0[1:, 1:] # view | |
for i in range(r): | |
for j in range(c): | |
if (isinf(w) or (np.max(0, i - w) <= j <= np.min(c, i + w))): | |
D1[i, j] = manhattan_distance(x[i], y[j]) | |
C = D1.copy() | |
jrange = range(c) | |
for i in range(r): | |
if not isinf(w): | |
jrange = range(np.max(0, i - w), np.min(c, i + w + 1)) | |
for j in jrange: | |
min_list = [D0[i, j]] | |
for k in range(1, warp + 1): | |
i_k = min(i + k, r) | |
j_k = min(j + k, c) | |
min_list += [D0[i_k, j] * s, D0[i, j_k] * s] | |
D1[i, j] += np.min(min_list) | |
if len(x) == 1: | |
path = zeros(len(y)), range(len(y)) | |
elif len(y) == 1: | |
path = range(len(x)), zeros(len(x)) | |
else: | |
path = _traceback(D0) | |
return D1[-1, -1], C, D1, path | |
class Attilio: | |
def __init__(self): | |
path = 'attilio/data/' | |
name = 'CORAL_07_DIR_DYNAMIC.las' | |
self.data, self.a = read_las(path, name) | |
print("Dati caricati") | |
self.data_smooth = smoothing(self.data, self.a) | |
self.data_resh = reshape(self.data_smooth, 20) | |
self.DF = pd.DataFrame(self.data_smooth.T).T | |
def get_target_fetta(self, point): | |
row = point[0] | |
col = point[1] | |
try: | |
fetta_target = self.DF[col][row - 5 : row + 6] | |
new_col = col + 1 | |
for i in range(-5, 6): | |
fetta_input = self.DF[new_col][row - 5 - i: row + 6 - i] | |
distance = manhattan_dtw(fetta_target.to_numpy(), fetta_input.to_numpy())[0] | |
try: | |
if distance <= min_distance: | |
min_fetta = fetta_input | |
min_distance = min(distance, min_distance) | |
except: | |
min_distance = distance | |
min_fetta = fetta_input | |
new_row = min_fetta.iloc[5:6].index[0] | |
return np.array(min_distance), np.array((new_row, new_col)) | |
except: | |
return 0, (0, 0) | |
def calculate_distances_from_ipoint(self, point): | |
result_distances = [] | |
row = point[0] | |
col = point[1] | |
try: | |
for i in range(col, 16): | |
distance, new_coords = self.get_target_fetta((row, i)) | |
result_distances.append((distance, new_coords)) | |
row = new_coords[0] | |
result_distances.append(new_coords) | |
except: | |
for i in range(0, col): | |
distance, new_coords = self.get_target_fetta((row, i)) | |
result_distances.append((distance, new_coords)) | |
row = new_coords[0] | |
result_distances.append(new_coords) | |
return result_distances | |
# for res in pool.imap_ordered(calculate_distances_from_ipoint, int_data_figo): | |
# risultatissimo.append(res) | |
# pbar.update() | |
def do_points_monocore(self, ipoints): | |
r = np.zeros(len(ipoints)) | |
for point in tqdm(range(len(ipoints))): | |
result_dist = self.calculate_distances_from_ipoint(ipoints[point]) | |
r[point] = result_dist | |
return r | |
def do_points_multicore(self, ipoints): | |
r = [] | |
with tqdm(len(ipoints)) as pbar, Pool(processes=10) as pool: | |
for res in pool.imap(self.calculate_distances_from_ipoint, ipoints): | |
r.append(res) | |
pbar.update() | |
return r | |
if __name__ == "__main__": | |
calcolo = Attilio() | |
data_sobel, threshold, ds_fl, maximum, max_pt = Sobel_calc(calcolo.data_resh, 20, k_size=5, a=calcolo.a) | |
print("Sobel eseguito") | |
int_points = get_interesting_points(data_sobel, threshold)[2] | |
print("Interesting points trovati") | |
try: | |
with open('interesting_data.pickle', 'rb') as handle: | |
interesting_data = pickle.load(handle) | |
except: | |
interesting_data = dtw_calc_faster(data_sobel, int_points)[2] | |
with open('interesting_data.pickle', 'wb') as handle: | |
pickle.dump(interesting_data, handle, protocol=pickle.HIGHEST_PROTOCOL) | |
print("Interesting data trovati") | |
print("Dataframe creato") | |
int_data_figo = np.array([np.array((i[0] * 20 + i[2], i[1])) for i in interesting_data]) | |
print("Punti interessanti convertiti") | |
print(len(int_data_figo)) | |
print(int_data_figo[-10:]) | |
try: | |
with open('risultato.pickle', 'rb') as handle: | |
risultato = pickle.load(handle) | |
except: | |
risultato = calcolo.do_points_multicore(int_data_figo) | |
with open('risultato.pickle', 'wb') as handle: | |
pickle.dump(risultato, handle, protocol=pickle.HIGHEST_PROTOCOL) | |
print(risultato) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment