Last active
December 25, 2015 16:25
-
-
Save pkgw/fdf4c48573a0d2631c04 to your computer and use it in GitHub Desktop.
Simple pwkit CLEAN implementation, never really used so who knows if it works
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2015 Peter Williams | |
# Licensed under the MIT License. | |
import numpy as np | |
from scipy.signal import lombscargle | |
from pwkit import lsqmdl | |
twopi = 2 * np.pi | |
ncoarse = 512 | |
def sin_model (pars, fx): | |
a, ph = pars | |
return a * np.sin (fx + ph) | |
class LSClean (object): | |
nfine = 256 | |
gain = 0.2 | |
def __init__ (self, x, y, u): | |
self.x = np.asarray (x) | |
self.orig_y = np.asarray (y) | |
self.orig_y -= self.orig_y.mean () # would disappear on first iteration | |
self.cur_y = self.orig_y.copy () | |
self.invu = 1. / np.asarray (u) # this should evolve as we fit, I think? | |
self.components = [] | |
self.fmax = twopi / (0.5 * (self.x.max () - self.x.min ())) | |
self.fmin = twopi / (2 * np.abs (self.x[1:] - self.x[:-1]).min ()) | |
self.coarse_freqs = np.logspace (np.log10 (self.fmin), | |
np.log10 (self.fmax), | |
ncoarse) | |
self.model = lsqmdl.Model (None, self.cur_y, self.invu) | |
def iterate (self, n): | |
for _ in xrange (n): | |
ls = lombscargle (self.x, self.cur_y, self.coarse_freqs) | |
fcoarse = self.coarse_freqs[ls.argmax ()] | |
fine_freqs = np.linspace (0.8 * fcoarse, 1.2 * fcoarse, self.nfine) | |
ls = lombscargle (self.x, self.cur_y, fine_freqs) | |
imax = ls.argmax () | |
ffine = fine_freqs[imax] | |
afine = ls[imax] | |
fx = ffine * self.x | |
self.model.set_data (self.cur_y, self.invu) | |
self.model.set_func (sin_model, ['amp', 'ph'], args=(fx, )) | |
self.model.solve ([np.abs (self.cur_y).max (), 0.5]) | |
# Apply CLEAN downweighting and normalize parameters | |
amp = self.gain * self.model.params[0] | |
pha = self.model.params[1] | |
if amp < 0: | |
amp = -amp | |
pha += np.pi | |
pha = ((pha + np.pi) % twopi) - np.pi | |
self.cur_y -= amp * np.sin (fx + pha) | |
self.components.append ([twopi / ffine, amp, pha]) | |
def sample (self, x, ntrunc=None): | |
x = np.asarray (x) | |
y = np.zeros (x.shape) | |
if ntrunc is None: | |
complist = self.components | |
else: | |
complist = sorted (self.components, key=lambda c: c[1], reverse=True)[:ntrunc] | |
for prd, amp, pha in complist: | |
y += amp * np.sin (twopi * x / prd + pha) | |
return y | |
def engridden (df, xcol, ycol, padding=0): | |
# we assume that we're sorted in X. | |
import pandas as pd | |
x = df[xcol] | |
y = df[ycol] | |
xmax, xmin = x.max (), x.min () | |
span = xmax - xmin | |
dx = x.diff () | |
assert (dx > 0)[1:].all () # [1:] because dx[0]=NaN | |
delta = dx.min () | |
n = int (np.ceil (span / delta)) + 1 | |
if n % 2 == 0: | |
n += 1 # there are advantages to using an odd nr of elements | |
print ('gridding to', n, 'elements') | |
# Laziness! We use an optimizer to figure out the "best" coefficients | |
# to map the measured X values to a grid, where "best" means the smallest | |
# differences between the gridded and actual locations. We could do | |
# better by increasing n, of course. | |
def deltas (m, b): | |
idx = np.round ((x - b) / m) | |
xmdl = m * idx + b | |
return xmdl | |
mdl = lsqmdl.Model (deltas, x) | |
mdl.solve ([span / (n - 1), xmin]) | |
m, b = mdl.params | |
# Allow for padding so that the FFTClean can convolve the points | |
# on the grid. | |
b -= m * padding | |
n += 2 * padding | |
# Put everything on the grid | |
idx = np.round ((x - b) / m).astype (np.int) | |
assert (idx >= 0).all () | |
assert (idx < n).all () | |
x = m * np.arange (n) + b | |
res = pd.DataFrame ({'x': x}) | |
res['y'] = np.nan | |
res.y[idx] = y | |
res.idx2x_m, res.idx2x_b = m, b | |
return res | |
class FFTClean (object): | |
gain = 0.2 | |
def __init__ (self, df, xcol, ycol): | |
self.components = [] | |
self.gridded = engridden (df, xcol, ycol, padding=8) | |
self.freqs = np.fft.rfftfreq (self.gridded.shape[0], self.gridded.idx2x_m) | |
self.periods = 1. / np.maximum (self.freqs, self.freqs[1]) # avoid div-by-0 | |
self.n2 = self.gridded.shape[0] // 2 | |
# Convolve onto the grid to smooth out ringing, etc. | |
conv = np.blackman (5) | |
self.gridded['sampling'] = 1. * np.isfinite (self.gridded.y) | |
self.gridded.y.fillna (value=0, inplace=True) | |
self.gridded['y'] = np.convolve (self.gridded.y, conv, mode='same') | |
self.gridded['sampling'] = np.convolve (self.gridded.sampling, conv, mode='same') | |
win = np.abs (np.fft.rfft (self.gridded.sampling)) | |
win = np.concatenate ((win[:0:-1], win)) | |
win /= win.max () | |
self.window = win | |
self.orig_dirty = np.abs (np.fft.rfft (self.gridded.y)) | |
self.cur = self.orig_dirty.copy () | |
def iterate (self, n): | |
for _ in xrange (n): | |
idx = np.abs (self.cur).argmax () # find negative components too! | |
ampl = self.gain * self.cur[idx] | |
# xxx document magic: | |
ofs = self.n2 - idx | |
shifted = self.window[ofs:ofs + self.n2 + 1] | |
self.components.append ((self.freqs[idx], self.periods[idx], idx, ampl)) | |
self.cur -= ampl * shifted | |
print (self.components[-1]) | |
def get_components (self): | |
import pandas as pd | |
a = np.asarray (self.components).T | |
return pd.DataFrame (dict (zip ('freq period idx ampl'.split (), a))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment