Source code for tkp.utility.sigmaclip
"""
Generic kappa-sigma clipping routine.
Note: this does *not* replace the specialized sigma_clip function in
utilities.py
"""
from __future__ import division
import numpy
[docs]def clip(data, mean, sigma, siglow, sighigh, indices=None):
"""Perform kappa-sigma clipping of data around mean
Args:
data (numpy.ndarray): N-dimensional array of values
mean (float): value around which to clip (does not have to be the mean)
sigma (float): sigma-value for clipping
siglow (float): lower kappa clipping values
sighigh (float): higher kappa clipping values
Kwargs:
indices (numpy.ndarray): data selection by indices
Returns:
(numpy.ndarray) indices of non-clipped data
"""
if indices is not None:
ilow = numpy.logical_and(data >= mean - sigma * siglow, indices)
ihigh = numpy.logical_and(data <= mean + sigma * sighigh, indices)
else:
ilow = data >= mean - sigma * siglow
ihigh = data <= mean + sigma * sighigh
indices = numpy.logical_and(ilow, ihigh)
return indices
[docs]def calcmean(data, errors=None):
"""Calculate the mean and the standard deviation of the mean"""
N = len(data)
if errors is None:
mean = data.sum() / N
sigma = numpy.sqrt(((data**2).sum() - N * mean**2) / (N - 1) / N)
else:
w = 1. / errors**2
mean = (w * data).sum() / w.sum()
sigma = numpy.sqrt(1. / w.sum())
return mean, sigma
[docs]def calcsigma(data, errors=None, mean=None, axis=None, errors_as_weight=False):
"""Calculate the sample standard deviation
Args:
data (numpy.ndarray): Data to be averaged. No conversion from
eg a list to a numpy.array is done.
Kwargs:
errors (numpy.ndarray, None): Eerrors for the data. Errors
needs to be the same shape as data (this is different than
for numpy.average). If you want to use weights instead of
errors as input, set errors_as_weight=True. If not given,
all errors (and thus weights) are assumed to be equal to
1.
mean (float): Provide mean if you don't want the mean to be
calculated for you. Pay careful attention to the shape if
you provide 'axis'.
axis (int): Specify axis along which the mean and sigma are
calculated. If not provided, calculations are done over
the whole array
errors_as_weight (bool): Set to True if errors are weights.
Returns:
(2-tuple of floats) mean and sigma
"""
N = data.shape[axis] if axis else len(data)
if errors is None:
w = None
elif errors_as_weight:
w = errors
else:
w = 1.0 / (errors * errors)
if mean is None:
# numpy.average does have a weight option, but may
# not be available in all numpy versions
mean = ((w * data).sum(axis) / (w.sum(axis))
if w is not None else data.sum(axis) / N)
if w is not None:
V1 = w.sum(axis)
V2 = (w * w).sum(axis)
# weighted sample variance
if axis:
shape = list(mean.shape)
shape.insert(axis, 1)
mmean = numpy.array(mean, copy=0, ndmin=data.ndim).reshape(shape)
else:
mmean = mean
sigma = numpy.sqrt(((data - mmean) * (data - mmean) * w).sum(axis) *
(V1 / (V1 * V1 - V2)))
else:
# unweighted sample variance
sigma = numpy.sqrt(((data * data).sum(axis) - N * mean * mean) /
(N - 1))
return mean, sigma
[docs]def sigmaclip(data, errors=None, niter=0, siglow=3., sighigh=3.,
use_median=False):
"""Remove outliers from data which lie more than siglow/sighigh
sample standard deviations from mean.
Args:
data (numpy.ndarray): Numpy array containing data values.
Kwargs:
errors (numpy.ndarray, None): Errors associated with the data
values. If None, unweighted mean and standard deviation
are used in calculations.
niter (int): Number of iterations to calculate mean & standard
deviation, and reject outliers, If niter is negative,
iterations will continue until no more clipping occurs or
until abs('niter') is reached, whichever is reached first.
siglow (float): Kappa multiplier for standard deviation. Std *
siglow defines the value below which data are rejected.
sighigh (float): Kappa multiplier for standard deviation. Std *
sighigh defines the value above which data are rejected.
use_median (bool): Use median of data instead of mean.
Returns:
tuple: (2-tuple) Boolean numpy array of indices indicating which
elements are clipped (False), with the same shape as the
input; number of iterations
"""
# indices keeps track which data should be discarded
indices = numpy.ones(len(data.ravel()), dtype=numpy.bool).reshape(
data.shape)
nniter = -niter if niter < 0 else niter
i = 0
for i in range(nniter):
newdata = data[indices]
newerrors = errors[indices] if errors is not None else None
N = len(newdata)
if N < 2:
return indices, i
if use_median:
mean = numpy.median(newdata)
else:
mean = None
mean, sigma = calcsigma(newdata, newerrors, mean)
newindices = clip(data, mean, sigma, siglow, sighigh)
if niter < 0:
# break when no changes
if (newindices == indices).all():
break
indices = newindices
return indices, i + 1