Source code for tkp.quality.rms
import numpy
from tkp.utility import nice_format
from scipy.stats import norm
from sqlalchemy.sql.expression import desc
from tkp.db.model import Image
from tkp.db.quality import reject_reasons
[docs]def rms_invalid(rms, noise, low_bound=1, high_bound=50):
"""
Is the RMS value of an image outside the plausible range?
:param rms: RMS value of an image, can be computed with
tkp.quality.statistics.rms
:param noise: Theoretical noise level of instrument, can be calculated with
tkp.lofar.noise.noise_level
:param low_bound: multiplied with noise to define lower threshold
:param high_bound: multiplied with noise to define upper threshold
:returns: True/False
"""
if (rms < noise * low_bound) or (rms > noise * high_bound):
ratio = rms / noise
return "rms value (%s) is %s times theoretical noise (%s)" % \
(nice_format(rms), nice_format(ratio), nice_format(noise))
else:
return False
[docs]def rms(data):
"""Returns the RMS of the data about the median.
Args:
data: a numpy array
"""
data -= numpy.median(data)
return numpy.sqrt(numpy.power(data, 2).sum()/len(data))
[docs]def clip(data, sigma=3):
"""Remove all values above a threshold from the array.
Uses iterative clipping at sigma value until nothing more is getting clipped.
Args:
data: a numpy array
"""
raveled = data.ravel()
median = numpy.median(raveled)
std = numpy.std(raveled)
newdata = raveled[numpy.abs(raveled-median) <= sigma*std]
if len(newdata) and len(newdata) != len(raveled):
return clip(newdata, sigma)
else:
return newdata
[docs]def subregion(data, f=4):
"""Returns the inner region of a image, according to f.
Resulting area is 4/(f*f) of the original.
Args:
data: a numpy array
"""
x, y = data.shape
return data[(x/2 - x/f):(x/2 + x/f), (y/2 - y/f):(y/2 + y/f)]
[docs]def rms_with_clipped_subregion(data, rms_est_sigma=3, rms_est_fraction=4):
"""
RMS for quality-control.
Root mean square value calculated from central region of an image.
We sigma-clip the input-data in an attempt to exclude source-pixels
and keep only background-pixels.
Args:
data: A numpy array
rms_est_sigma: sigma value used for clipping
rms_est_fraction: determines size of subsection, result will be
1/fth of the image size where f=rms_est_fraction
returns the rms value of a iterative sigma clipped subsection of an image
"""
return rms(clip(subregion(data, rms_est_fraction), rms_est_sigma))
[docs]def reject_basic_rms(image_id, session, est_sigma=4, rms_max=100., rms_min=0.0):
"""Check if the RMS value of an image lies within a range predetermined
at the start of a pipeline run.
args:
image_id (int): database ID of the image we want to check
session (sqlalchemy.orm.session.Session): the database session
est_sigma (float): sigma multiplication factor
rms_max (float): global maximum rms for image quality check
rms_min (float): global minimum rms for image quality check
returns:
bool: None if not rejected, (rejectreason, comment) if rejected
"""
image = session.query(Image).filter(Image.id == image_id).one()
if not rms_min < image.rms_qc < rms_max:
return reject_reasons['rms'],\
"RMS value not within {} and {}".format(rms_min, rms_max)
[docs]def reject_historical_rms(image_id, session, history=100, est_sigma=4, rms_max=100., rms_min=0.0, rej_sigma=3.0):
"""
Check if the RMS value of an image lies within a range defined
by a gaussian fit on the histogram calculated from the last x RMS
values in this subband. Upper and lower bound are then controlled
by est_sigma multiplied with the sigma of the gaussian.
args:
image_id (int): database ID of the image we want to check
session (sqlalchemy.orm.session.Session): the database session
history (int): the number of timestamps we want to use for histogram
est_sigma (float): sigma multiplication factor
rms_max (float): global maximum rms for image quality check
rms_min (float): global minimum rms for image quality check
returns:
bool: None if not rejected, (rejectreason, comment) if rejected
"""
image = session.query(Image).filter(Image.id == image_id).one()
rmss = session.query(Image.rms_qc).filter(
(Image.band == image.band)).order_by(desc(Image.taustart_ts)).limit(
history).all()
if len(rmss) < history:
return False
mu, sigma = norm.fit(rmss)
t_low = mu - sigma * rej_sigma
t_high = mu + sigma * rej_sigma
if not rms_min < image.rms_qc < rms_max:
return reject_reasons['rms'],\
"RMS value not within {} and {}".format(rms_min, rms_max)
if not t_low < image.rms_qc < t_high or not rms_min < image.rms_qc < rms_max:
return reject_reasons['rms'],\
"RMS value not within {} and {}".format(t_low, t_high)