"""Useful fitting functions."""
from scipy.optimize import curve_fit
from scipy.signal import medfilt
import numpy as np
import traceback
import warnings
from collections.abc import Iterable
from .utils import mad, HAS_MPL
__all__ = [
"contiguous_regions",
"ref_std",
"ref_mad",
"linear_fun",
"linear_fit",
"offset",
"offset_fit",
"baseline_rough",
"purge_outliers",
"baseline_als",
"fit_baseline_plus_bell",
"total_variance",
"align",
]
[docs]def contiguous_regions(condition):
"""Find contiguous True regions of the boolean array "condition".
Return a 2D array where the first column is the start index of the region
and the second column is the end index.
Parameters
----------
condition : boolean array
Returns
-------
idx : [[i0_0, i0_1], [i1_0, i1_1], ...]
A list of integer couples, with the start and end of each True blocks
in the original array
Notes
-----
From http://stackoverflow.com/questions/4494404/
find-large-number-of-consecutive-values-fulfilling-
condition-in-a-numpy-array
""" # NOQA
# Find the indicies of changes in "condition"
diff = np.logical_xor(condition[1:], condition[:-1])
(idx,) = diff.nonzero()
# We need to start things after the change in "condition". Therefore,
# we'll shift the index by 1 to the right.
idx += 1
if condition[0]:
# If the start of condition is True prepend a 0
idx = np.r_[0, idx]
if condition[-1]:
# If the end of condition is True, append the length of the array
idx = np.r_[idx, condition.size]
# Reshape the result into two columns
idx.shape = (-1, 2)
return idx
def _rolling_window(a, window):
"""A smart rolling window.
Found at http://www.rigtorp.se/2011/01/01/rolling-statistics-numpy.html
"""
try:
shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
strides = a.strides + (a.strides[-1],)
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
except Exception:
warnings.warn(traceback.format_exc())
raise
[docs]def ref_std(array, window=1):
"""Minimum standard deviation along an array.
If a data series is noisy, it is difficult to determine the underlying
standard deviation of the original series. Here, the standard deviation is
calculated in a rolling window, and the minimum is saved, because it will
likely be the interval with less noise.
Parameters
----------
array : ``numpy.array`` object or list
Input data
window : int or float
Number of bins of the window
Returns
-------
ref_std : float
The reference Standard Deviation
"""
return np.std(np.diff(array)) / np.sqrt(2)
[docs]def ref_mad(array, window=1):
"""Ref. Median Absolute Deviation of an array, rolling median-subtracted.
If a data series is noisy, it is difficult to determine the underlying
statistics of the original series. Here, the MAD is calculated in a rolling
window, and the minimum is saved, because it will likely be the interval
with less noise.
Parameters
----------
array : ``numpy.array`` object or list
Input data
window : int or float
Number of bins of the window
Returns
-------
ref_std : float
The reference MAD
"""
return mad(np.diff(array)) / np.sqrt(2)
[docs]def linear_fun(x, q, m):
"""A linear function.
Parameters
----------
x : float or array
The independent variable
m : float
The slope
q : float
The intercept
Returns
-------
y : float or array
The dependent variable
"""
return m * np.asarray(x, dtype=float) + q
[docs]def linear_fit(x, y, start_pars, return_err=False):
"""A linear fit with any set of data.
Parameters
----------
x : array-like
y : array-like
start_pars : [q0, m0], floats
Intercept and slope of linear function
Returns
-------
par : [q, m], floats
Fitted intercept and slope of the linear function
"""
par, _ = curve_fit(linear_fun, x, y, start_pars, maxfev=6000)
if return_err:
warnings.warn("return_err not implemented yet in linear_fit")
return par, None
else:
return par
[docs]def offset(x, off):
"""An offset."""
return off
[docs]def offset_fit(x, y, offset_start=0, return_err=False):
"""Fit a constant offset to the data.
Parameters
----------
x : array-like
y : array-like
offset_start : float
Constant offset, initial value
Returns
-------
offset : float
Fitted offset
"""
par, _ = curve_fit(offset, x, y, [offset_start], maxfev=6000)
if return_err:
warnings.warn("return_err not implemented yet in offset_fit")
return par[0], None
else:
return par[0]
[docs]def baseline_rough(x, y, start_pars=None, return_baseline=False, mask=None):
"""Rough function to subtract the baseline.
Parameters
----------
x : array-like
the sample time/number/position
y : array-like
the data series corresponding to x
start_pars : [q0, m0], floats
Intercept and slope of linear function
Other Parameters
----------------
return_baseline : bool
return the baseline?
mask : array of bools
Mask indicating the good x and y data. True for good, False for bad
Returns
-------
y_subtracted : array-like, same size as y
The initial time series, subtracted from the trend
baseline : array-like, same size as y
Fitted baseline
"""
N = len(y)
if start_pars is None:
if N > 40:
m0 = (np.median(y[-20:]) - np.median(y[:20])) / (
np.mean(x[-20:]) - np.mean(x[:20])
)
else:
m0 = (y[-1] - y[0]) / (x[-1] - x[0])
q0 = min(y)
start_pars = [q0, m0]
lc = y.copy()
time = x.copy()
if mask is None:
mask = np.ones(len(time), dtype=bool)
total_trend = 0
if N < 20:
par = linear_fit(time, lc, start_pars)
lc = lc - linear_fun(time, *par)
total_trend = total_trend + linear_fun(time, *par)
else:
local_std = ref_std(lc, np.max([N // 20, 20]))
for percentage in [0.8, 0.15]:
time_to_fit = time[mask][1:-1]
lc_to_fit = lc[mask][1:-1]
if len(time_to_fit) < len(start_pars):
break
sorted_els = np.argsort(lc_to_fit)
# Select the lowest half elements
good = sorted_els[: int(N * percentage)]
if np.std(lc_to_fit[good]) < 2 * local_std:
good = np.ones(len(lc_to_fit), dtype=bool)
time_filt = time_to_fit[good]
lc_filt = lc_to_fit[good]
if len(time_filt) < len(start_pars):
break
back_in_order = np.argsort(time_filt)
lc_filt = lc_filt[back_in_order]
time_filt = time_filt[back_in_order]
par = linear_fit(time_filt, lc_filt, start_pars)
lc = lc - linear_fun(time, *par)
total_trend = total_trend + linear_fun(time, *par)
if return_baseline:
return lc, total_trend
else:
return lc
def outlier_from_median_filt(y, window_size, down=True, up=True):
y_medfilt = medfilt(y, window_size)
diffs = y - y_medfilt
min_diff = mad(diffs)
outliers = np.zeros(len(y), dtype=bool)
if down:
outliers = np.logical_or(outliers, -diffs > 10 * min_diff)
if up:
outliers = np.logical_or(outliers, diffs > 10 * min_diff)
return outliers
[docs]def purge_outliers(
y, window_size=5, up=True, down=True, mask=None, plot=False
):
"""Remove obvious outliers.
Attention: This is known to throw false positives on bona fide, very strong
Gaussian peaks
"""
# Needs to be odd
window_size = window_size // 2 * 2 + 1
if mask is None:
mask = np.ones(len(y), dtype=bool)
bad_mask = np.logical_not(mask)
if not (up or down):
return y
ysave = y
y = y.copy()
win1 = outlier_from_median_filt(y, window_size)
win2 = outlier_from_median_filt(y, window_size * 2 + 1)
local_outliers = win1 & win2
Noutliers = len(local_outliers[local_outliers])
if Noutliers > 0:
warnings.warn("Found {} outliers".format(Noutliers), UserWarning)
outliers = np.logical_or(local_outliers, bad_mask)
if not np.any(outliers):
return y
bad = contiguous_regions(outliers)
for b in bad:
if b[0] == 0:
y[b[0]] = y[b[1]]
elif b[1] >= len(y):
y[b[0] :] = y[b[0] - 1]
else:
previous = y[b[0] - 1]
next_bin = y[b[1]]
dx = b[1] - b[0]
y[b[0] : b[1]] = (next_bin - previous) / (dx + 1) * np.arange(
1, b[1] - b[0] + 1
) + previous
if plot and HAS_MPL:
import matplotlib.pyplot as plt
fig = plt.figure()
plt.plot(ysave, label="Input data")
plt.plot(y, zorder=3, label="Filtered data")
plt.plot(medfilt(ysave, window_size), zorder=6, lw=1, label="Medfilt")
plt.savefig("Bubu_" + str(np.random.randint(0, 10000000)) + ".png")
plt.legend()
plt.close(fig)
return y
def _als(y, lam, p, niter=30):
"""Baseline Correction with Asymmetric Least Squares Smoothing.
Modifications to the routine from Eilers & Boelens 2005 [eilers-2005]_.
The Python translation is partly from [so-als]_.
Thanks to Shriharsh Tendulkar for an optimized version
originally implemented in [Stingray]_.
Parameters
----------
y : array-like
the data series corresponding to ``x``
lam : float
the lambda parameter of the ALS method. This control how much the
baseline can adapt to local changes. A higher value corresponds to a
stiffer baseline
p : float
the asymmetry parameter of the ALS method. This controls the overall
slope tollerated for the baseline. A higher value correspond to a
higher possible slope
Other parameters
----------------
niter : int
The number of iterations to perform
Returns
-------
z : array-like, same size as ``y``
Fitted baseline.
References
----------
.. [eilers-2005] https://www.researchgate.net/publication/228961729_Technical_Report_Baseline_Correction_with_Asymmetric_Least_Squares_Smoothing
.. [so-als] http://stackoverflow.com/questions/29156532/python-baseline-correction-library
.. [Stingray] https://github.com/StingraySoftware/stingray/pull/725
"""
from scipy import sparse
L = len(y)
indptr = np.arange(0, L - 1, dtype=np.int32) * 3
indices = np.vstack(
(np.arange(0, L - 2).T, np.arange(0, L - 2).T + 1, np.arange(0, L - 2).T + 2)
).T.flatten()
data = np.tile([1, -2, 1], L - 2)
D = sparse.csc_matrix((data, indices, indptr), shape=(L, L - 2))
w = np.ones(L)
for _ in range(niter):
W = sparse.spdiags(w, 0, L, L)
Z = W + lam * D.dot(D.transpose())
z = sparse.linalg.spsolve(Z, w * y)
w = p * (y > z) + (1 - p) * (y < z)
return z
[docs]def baseline_als(x, y, **kwargs):
"""Baseline Correction with Asymmetric Least Squares Smoothing.
If the input arrays are larger than 300 elements, ignores outlier_purging
and executes the baseline calculation on a small subset of the
(median-filtered) y-array
Parameters
----------
x : array-like
the sample time/number/position
y : array-like
the data series corresponding to x
lam : float
the lambda parameter of the ALS method. This control how much the
baseline can adapt to local changes. A higher value corresponds to a
stiffer baseline
p : float
the asymmetry parameter of the ALS method. This controls the overall
slope tollerated for the baseline. A higher value correspond to a
higher possible slope
Other Parameters
----------------
niter : int
The number of iterations to perform
return_baseline : bool
return the baseline?
offset_correction : bool
also correct for an offset to align with the running mean of the scan
outlier_purging : bool
Purge outliers before the fit?
mask : array of bools
Mask indicating the good x and y data. True for good, False for bad
Returns
-------
y_subtracted : array-like, same size as y
The initial time series, subtracted from the trend
baseline : array-like, same size as y
Fitted baseline. Only returned if return_baseline is True
"""
from scipy.interpolate import interp1d
if y.size < 300:
return _baseline_als(x, y, **kwargs)
_ = kwargs.pop("outlier_purging", False)
return_baseline = kwargs.pop("return_baseline", False)
y_medf = medfilt(y, 31)
els = np.array(np.rint(np.linspace(0, y.size - 1, 31)), dtype=int)
y_sub, base = _baseline_als(
x[els],
y_medf[els],
outlier_purging=False,
return_baseline=True,
**kwargs
)
func = interp1d(x[els], base)
baseline = func(x)
if return_baseline:
return y - baseline, baseline
else:
return y - baseline
def _baseline_als(
x,
y,
lam=None,
p=None,
niter=40,
return_baseline=False,
offset_correction=True,
mask=None,
outlier_purging=True,
):
if not isinstance(outlier_purging, Iterable):
outlier_purging = (outlier_purging, outlier_purging)
if lam is None:
lam = 1e11
if p is None:
p = 0.001
N = len(y)
if N > 40:
med_start = np.median(y[:20])
med_stop = np.median(y[-20:])
approx_m = (med_stop - med_start) / (N - 20)
else:
approx_m = (y[-1] - y[0]) / (N - 1)
approx_q = y[0]
approx_baseline = approx_m * np.arange(N) + approx_q
y = y - approx_baseline
y_mod = purge_outliers(
y, up=outlier_purging[0], down=outlier_purging[1], mask=mask
)
z = _als(y_mod, lam, p, niter=niter)
offset = 0
ysub = y_mod - z
if offset_correction:
std = ref_std(ysub, np.max([len(y) // 20, 20]))
good = np.abs(ysub) < 10 * std
if len(ysub[good]) < 20:
good = np.ones(len(ysub), dtype=bool)
offset = np.median(ysub[good])
if np.isnan(offset):
offset = 0
if return_baseline:
return y - z - offset, z + offset + approx_baseline
else:
return y - z - offset
def detrend_spectroscopic_data(
x, spectrum, kind="als", mask=None, outlier_purging=True
):
"""Take the baseline off the spectroscopic data.
Examples
--------
>>> spectrum = np.vstack([np.arange(0 + i, 2 + i, 1/3)
... for i in np.arange(0., 4, 1/16)])
>>> x = np.arange(spectrum.shape[0])
>>> detr, _ = detrend_spectroscopic_data(x, spectrum, kind='rough')
>>> np.allclose(detr, 0, atol=1e-3)
True
"""
y = np.sum(spectrum, axis=1)
if kind == "als":
y_sub, baseline = baseline_als(
x,
y,
return_baseline=True,
outlier_purging=outlier_purging,
mask=mask,
)
elif kind == "rough":
y_sub, baseline = baseline_rough(x, y, return_baseline=True, mask=mask)
else:
warnings.warn("Baseline kind unknown")
return spectrum, np.ones_like(spectrum)
if len(spectrum.shape) == 1:
return y_sub, baseline
shape = spectrum.shape
tiled_baseline = np.tile(baseline, (shape[1], 1)).transpose()
tiled_norm = np.tile(y, (shape[1], 1)).transpose()
tiled_baseline = tiled_baseline / tiled_norm * spectrum
return spectrum - tiled_baseline, tiled_baseline
[docs]def fit_baseline_plus_bell(x, y, ye=None, kind="gauss"):
"""Fit a function composed of a linear baseline plus a bell function.
Parameters
----------
x : array-like
the sample time/number/position
y : array-like
the data series corresponding to x
Other parameters
----------------
ye : array-like
the errors on the data series
kind: str
Can be 'gauss' or 'lorentz'
Returns
-------
mod_out : ``Astropy.modeling.model`` object
The fitted model
fit_info : dict
Fit info from the Astropy fitting routine.
"""
if kind not in ["gauss", "lorentz"]:
raise ValueError("kind has to be one of: gauss, lorentz")
from astropy.modeling import models, fitting
approx_m = (np.median(y[-20:]) - np.median(y[:20])) / (
np.mean(x[-20:]) - np.mean(x[:20])
)
base = models.Linear1D(
slope=approx_m, intercept=np.median(y[:20]), name="Baseline"
)
xrange = np.max(x) - np.min(x)
yrange = np.max(y) - np.min(y)
if kind == "gauss":
bell = models.Gaussian1D(
mean=np.mean(x), stddev=xrange / 20, amplitude=yrange, name="Bell"
)
bell.amplitude.bounds = (0, None)
bell.mean.bounds = (None, None)
bell.stddev.bounds = (0, None)
# max_name = 'mean'
elif kind == "lorentz":
bell = models.Lorentz1D(
x_0=np.mean(x), fwhm=xrange / 20, amplitude=yrange, name="Bell"
)
bell.amplitude.bounds = (0, None)
bell.x_0.bounds = (None, None)
bell.fwhm.bounds = (0, None)
# max_name = 'x_0'
mod_init = base + bell
fit = fitting.LevMarLSQFitter()
mod_out = fit(mod_init, x, y)
return mod_out, fit.fit_info
[docs]def total_variance(xs, ys, params):
"""Calculate the total variance of a series of scans.
This functions subtracts a linear function from each of the scans
(excluding the first one) and calculates the total variance.
Parameters
----------
xs : list of array-like [array1, array2, ...]
list of arrays containing the x values of each scan
ys : list of array-like [array1, array2, ...]
list of arrays containing the y values of each scan
params : list of array-like [[q0, m0], [q1, m1], ...]
list of arrays containing the parameters [m, q] for each scan.
Returns
-------
total_variance : float
The total variance of the baseline-subtracted scans.
"""
params = np.array(params).flatten()
qs = params[: len(xs) - 1]
ms = params[len(xs) - 1 :]
x = xs[0].copy()
y = ys[0].copy()
for i in range(1, len(xs)):
x = np.append(x, xs[i])
scaled_y = ys[i] - (xs[i] * ms[i - 1] + qs[i - 1])
y = np.append(y, scaled_y)
order = np.argsort(x)
x = x[order]
y = y[order]
x_range = [np.min(x), np.max(x)]
xints = np.linspace(x_range[0], x_range[1], int(len(x) / 20))
values = np.array(
[
np.var(y[(x >= xints[k]) & (x < xints[k + 1])])
for k in range(len(xints[:-1]))
]
)
good = values == values
value = np.mean(values[good])
return value
def _objective_function(params, args):
"""Put the parameters in the right order to use with scipy's minimize."""
return total_variance(args[0], args[1], params)
[docs]def align(xs, ys):
"""Given the first scan, it aligns all the others to that.
Parameters
----------
xs : list of array-like [array1, array2, ...]
list of arrays containing the x values of each scan
ys : list of array-like [array1, array2, ...]
list of arrays containing the y values of each scan
Returns
-------
qs : array-like
The list of intercepts maximising the alignment, one for each scan
ms : array-like
The list of slopes maximising the alignment, one for each scan
"""
from scipy.optimize import minimize
qs = np.zeros(len(xs) - 1)
ms = np.zeros(len(xs) - 1)
result = minimize(
_objective_function, [qs, ms], args=[xs, ys], options={"disp": True}
)
qs = result.x[: len(xs) - 1]
ms = np.zeros(len(xs) - 1)
result = minimize(
_objective_function, [qs, ms], args=[xs, ys], options={"disp": True}
)
qs = result.x[: len(xs) - 1]
ms = result.x[len(xs) - 1 :]
return qs, ms