Last active
October 24, 2016 23:42
-
-
Save JoaoRodrigues/2a6773d962d6fba4530a940a07c8f51f to your computer and use it in GitHub Desktop.
Smooths a trajectory using mdtraj and an weighted adaptive window algorithm.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Smooths a trajectory using mdtraj and an weighted adaptive window algorithm. | |
""" | |
from __future__ import print_function, division | |
import argparse | |
import os | |
import sys | |
import numpy as np | |
import mdtraj as md | |
# Functions | |
def adaptive_smoothing(trajectory, window=5): | |
""" | |
Smooths a trajectory using an adaptive window that narrows at the edges | |
of the trajectory and weights the frames depending on their distance to | |
the central frame. | |
""" | |
n_frames = trajectory.n_frames | |
msg = 'Running adaptive smoothing on {} frames (window={})' | |
print(msg.format(n_frames, window)) | |
for frame_no, frame_xyz in enumerate(trajectory.xyz): | |
# Smooth an equal number of frames in each direction | |
if frame_no < cmd.window: | |
# Adapt window size | |
# Keep symmetry | |
# [0], [0,1,0], [0,0,1,0,0], ... | |
beg_frame = 0 | |
end_frame = (frame_no*2) + 1 # 0: [0,1[, 1: [0, 1, 2, 3[, ... | |
half_window_size = frame_no + 1 | |
elif n_frames - frame_no <= window: | |
# Not enough remaining frames to average normally | |
# Reduce averaging window symmetrically | |
end_frame = n_frames | |
beg_frame = frame_no - (n_frames - frame_no) + 1 | |
half_window_size = n_frames - frame_no | |
else: | |
beg_frame = frame_no - window # 15: 10 | |
end_frame = frame_no + window + 1 # 15: 21 | |
half_window_size = window + 1 | |
window_frames = trajectory.xyz[beg_frame: end_frame] | |
# Define weights based on distance to central frame | |
_w = [x+1 for x in range(half_window_size)] # avoid zero weights | |
_raw_w = _w + _w[::-1][1:] | |
try: | |
weights = map(lambda x: x/max(_raw_w), _raw_w) | |
except ZeroDivisionError: # First/Last frame | |
weights = [1] | |
# Average coordinates | |
average_xyz = np.average(window_frames, weights=weights, axis=0) | |
# Update trajectory | |
trajectory.xyz[frame_no] = average_xyz | |
return trajectory | |
def halt(message): | |
"""Writes a message to STDERR and exists the program""" | |
print(message, file=sys.stderr) | |
sys.exit(1) | |
# Main | |
if __name__ == '__main__': | |
_filters = {'adaptive': adaptive_smoothing} | |
ap = argparse.ArgumentParser(description=__doc__) | |
ap_io = ap.add_argument_group('I/O Options') | |
ap_io.add_argument('trajectory', type=str, help='Input Trajectory') | |
ap_io.add_argument('topology', type=str, help='Input Topology') | |
ap_io.add_argument('--selection', type=str, | |
help='Filter atoms in input trajectory (e.g. backbone)') | |
ap_io.add_argument('--reference', type=str, | |
help='Reference PDB structure to fit trajectory to') | |
ap_io.add_argument('--read_from', type=int, | |
help='Frame number to start reading trajectory data') | |
ap_io.add_argument('--read_until', type=int, | |
help='Frame number to stop reading trajectory data') | |
ap_io.add_argument('-o', '--output', type=str, help='Output Trajectory') | |
ap_io.add_argument('--downsample', type=int, | |
help='Downsample output trajectory (1 every N frames)') | |
ap_smooth = ap.add_argument_group('Smoothing Options') | |
ap_smooth.add_argument('-w', '--window', type=int, default=5, | |
help='Number of frames to average') | |
ap_smooth.add_argument('-s', '--smooth_function', | |
help='Type of smoothing function', | |
choices=_filters.keys(), default='adaptive') | |
ap_smooth.add_argument('-p', '--pad', action='store_true', | |
help='Pad ends of trajectory with start/end frames') | |
cmd = ap.parse_args() | |
# Check input files exist | |
if not os.path.isfile(cmd.trajectory): | |
msg = 'Trajectory input file not found: {}'.format(cmd.trajectory) | |
halt(msg) | |
if not os.path.isfile(cmd.topology): | |
msg = 'Topology input file not found: {}'.format(cmd.topology) | |
halt(msg) | |
# Read trajectory | |
print('== Started ==') | |
trj = md.load(cmd.trajectory, top=cmd.topology) | |
if cmd.read_from: | |
trj = trj[cmd.read_from-1:] | |
if cmd.read_until: | |
trj = trj[:cmd.read_until] | |
print('Loaded {0.n_atoms} atoms / {0.n_frames} frames'.format(trj)) | |
# Strip by selection | |
if cmd.selection: | |
atomsel = trj.top.select(cmd.selection) | |
trj.atom_slice(atomsel, inplace=True) | |
if cmd.pad: | |
print('Padding trajectory with {} frames'.format(cmd.window)) | |
for _ in range(cmd.window): | |
trj = trj[0] + trj + trj[-1] | |
# Align trajectory to reference or first frame | |
if cmd.reference: | |
print('Aligning trajectory to {}'.format(cmd.reference)) | |
refe = md.load_pdb(cmd.reference) | |
else: | |
print('Aligning trajectory to first frame') | |
refe = trj | |
atomsel = trj.top.select('protein and name CA and not (resname ACE or resname NME)') | |
trj.superpose(refe, 0, atom_indices=atomsel, ref_atom_indices=atomsel) | |
# Smooth frames | |
smooth_function = _filters[cmd.smooth_function] | |
trj = smooth_function(trj, window=cmd.window) | |
# Downsample | |
if cmd.downsample: | |
print('Downsampling (1 every {} frames)'.format(cmd.downsample)) | |
trj = trj[::cmd.downsample] | |
# Output smoothed trajectory | |
if not cmd.output: | |
cmd.output = 'smooth.pdb' | |
print('Saving smoothed trajectory to: {}'.format(cmd.output)) | |
trj.save(cmd.output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment