"""
Classes and functions for flagging data by all modules.
"""
import logging
import numpy as np
import pandas as pd
from ctdcal import get_ctdcal_config
from ctdcal.plotting.plot_fit import _intermediate_residual_plot
cfg = get_ctdcal_config()
log = logging.getLogger(__name__)
def _flag_btl_data(
df,
param=None,
ref=None,
cast_id='SSSCC',
thresh=[0.002, 0.005, 0.010, 0.020],
f_out=None,
):
"""
Flag CTD "btl" data against reference measurement (e.g. SBE35, bottle salts).
Parameters
----------
df : DataFrame,
DataFrame containing btl data
param : str
Name of parameter to calibrate (e.g. "CTDCOND1", "CTDTMP2")
ref : str
Name of reference parameter to calibrate against (e.g. "BTLCOND", "T90")
thresh : list of float, optional
Maximum acceptable residual for each pressure range
f_out : str, optional
Path and filename to save residual vs. pressure plots
Returns
-------
df_ques : DataFrame
Data flagged as questionable (flag 3s)
df_bad : DataFrame
Data flagged as bad (flag 4s)
"""
# prs = cfg.column["p"]
# Remove extreme outliers and code bad
df = df.reset_index(drop=True)
df["Diff"] = df[ref] - df[param]
df["Flag"] = outliers(df["Diff"])
# Find values that are above the threshold and code questionable
df["Flag"] = by_residual(df[param], df[ref], df['CTDPRS'], old_flags=df["Flag"])
df_good = df[df["Flag"] == 2].copy()
df_ques = df[df["Flag"] == 3].copy()
df_bad = df[df["Flag"] == 4].copy()
if f_out is not None:
if param == cfg.column["t1"]:
xlabel = "T1 Residual (T90 C)"
elif param == cfg.column["t2"]:
xlabel = "T2 Residual (T90 C)"
elif param == cfg.column["c1"]:
xlabel = "C1 Residual (mS/cm)"
elif param == cfg.column["c2"]:
xlabel = "C2 Residual (mS/cm)"
f_out = str(f_out).split(".pdf")[0] + "_postfit.pdf"
_intermediate_residual_plot(
df["Diff"],
df['CTDPRS'],
df[cast_id],
show_thresh=True,
xlabel=xlabel,
f_out=f_out,
)
f_out = str(f_out).split(".pdf")[0] + "_flag2.pdf"
_intermediate_residual_plot(
df_good["Diff"],
df_good['CTDPRS'],
df_good[cast_id],
show_thresh=True,
xlabel=xlabel,
f_out=f_out,
)
return df_ques, df_bad
def _merge_flags(new_flags, old_flags, keep_higher=True):
"""
Merge old and new flags into one flag vector.
Returns new_flags if old_flags is None.
Parameters
----------
new_flags : array-like
Newly calculated flags
old_flags : int, optional
User-supplied original data flags
keep_higher : bool, optional
Whether to keep higher (default) or lower value flags
Returns
-------
merged_flags : array-like
Merged old and new flags
"""
if old_flags is None:
return new_flags
new_flags = np.squeeze(new_flags)
old_flags = np.squeeze(old_flags)
merged_flags = np.copy(new_flags)
if keep_higher:
is_higher = old_flags > new_flags
merged_flags[is_higher] = old_flags[is_higher]
else:
# can't really think of a use case for this... maybe delete?
is_lower = old_flags < new_flags
merged_flags[is_lower] = old_flags[is_lower]
return merged_flags
[docs]
def nan_values(data, old_flags=None, flag_good=2, flag_nan=9):
"""
Flag values as either good or nan.
Parameters
----------
data : array-like
Variable to be flagged
old_flags : array-like, optional
Original data flags to be merged in (if provided)
flag_good : int, optional
Flag value for good data
flag_nan : int, optional
Flag value for bad (nan) data
Returns
-------
flags : array-like
Flag for each data point in input
"""
data = np.squeeze(data)
flags = np.full(np.shape(data), flag_good)
flags[np.isnan(data)] = flag_nan
return _merge_flags(flags, old_flags)
[docs]
def outliers(
data,
old_flags=None,
flag_good=2,
flag_outlier=4,
n_sigma1=2,
n_sigma2=20,
ignore_nan=True,
):
"""
Flag extreme outliers using standard deviations from the mean as a threshold.
Outliers are identified over two passes. For the first pass, mean and standard
deviation of data are calculated for all data. Values more than n_sigma1 standard
deviations from mean are (temporarily) flagged questionable. For the second pass,
mean and standard deviation are re-calculated with questionable data excluded. Data
more than n_sigma2 standard deviations from mean are flagged as outliers.
Parameters
----------
data : array-like
Variable to be flagged
old_flags : array-like, optional
Original data flags to be merged in (if provided)
flag_good : int, optional
Flag value for good data
flag_outlier : int, optional
Flag value for outliers
n_sigma1 : int, optional
Number of standard deviations away from mean needed to be excluded from statistics
n_sigma2 : int, optional
Number of standard deviations away from mean needed to be outlier
ignore_nan : bool, optional
Ignore nan values in data
Returns
-------
flags : array-like
Flag for each data point in input
Notes
-----
Functionality is similar to Sea-Bird's "Wild Edit" in Seasoft V2.
"""
data = np.squeeze(data)
# np.squeeze converts DataFrame->Series but does nothing to Series
if type(data) is pd.Series:
data = data.to_numpy() # force to np.array
flags = np.full(np.shape(data), flag_good).squeeze()
# function aliases
if ignore_nan:
mean, std = np.nanmean, np.nanstd
else:
mean, std = np.mean, np.std
# pass 1
questionable = np.abs(data - mean(data)) > (n_sigma1 * std(data))
# pass 2
data_mean = mean(data[~questionable])
data_std = std(data[~questionable])
outliers = np.abs(data - data_mean) > (n_sigma2 * data_std)
flags[outliers] = flag_outlier
return _merge_flags(flags, old_flags)
[docs]
def by_percent_diff(
data, ref_data, old_flags=None, percent_thresh=1, flag_good=2, flag_bad=3
):
"""
Flag residuals based on the percent error relative to the reference data.
Parameters
----------
data : array-like
Variable to be flagged
ref_data : array-like
Reference data to compare against
old_flags : array-like, optional
Original data flags to be merged in (if provided)
percent_thresh : int, optional
Percent different cutoff to be flagged as bad data
flag_good : int, optional
Flag value for good data
flag_bad : int, optional
Flag value for bad data
Returns
-------
flags : array-like
Flag for each data point in input
"""
data = np.squeeze(data)
ref_data = np.squeeze(ref_data)
# Avoid dividing by zero whenever possible
data = np.where(data == 0, np.nan, data)
percent_diff = (np.abs(data - ref_data) / data).squeeze() * 100
flags = np.full(np.shape(percent_diff), flag_good).squeeze()
flags[percent_diff > percent_thresh] = flag_bad
return _merge_flags(flags, old_flags)
[docs]
def by_residual(
data,
ref_data,
pressure,
old_flags=None,
flag_good=2,
flag_bad=3,
threshold=[0.002, 0.005, 0.01, 0.02],
p_cutoff=[2000, 1000, 500],
):
"""
Flag residuals using specific thresholds for each pressure range.
Parameters
----------
data : array-like
Variable to be flagged
ref_data : array-like
Reference data to compare against
flag_good : int, optional
Flag value for good data
flag_bad : int, optional
Flag value for data outside threshold
threshold : list of float, optional
Threshold between good and bad values
p_cutoff : list of int, optional
Edges of pressure bins
Returns
-------
flags : array-like
Flag for each data point in input
"""
if (len(threshold) - len(p_cutoff)) != 1:
raise IndexError("threshold must be one value longer than p_cutoff")
if any(np.diff(p_cutoff) >= 0):
raise ValueError("p_cutoff must be monotonically decreasing")
residual = np.abs(data - ref_data).squeeze()
flags = np.full(np.shape(residual), flag_good).squeeze()
# first and last pressure bin need special handling
is_deep = pressure > p_cutoff[0]
is_shallow = pressure < p_cutoff[-1]
flags[is_deep & (residual > threshold[0])] = flag_bad
flags[is_shallow & (residual > threshold[-1])] = flag_bad
p_bins = zip(p_cutoff, p_cutoff[1:]) # make pairs (e.g. (2000, 1000), (1000, 500))
threshold = threshold[1:-1] # deep and shallow threshold values
for (p_upper, p_lower), thresh in zip(p_bins, threshold):
in_bin = (pressure <= p_upper) & (pressure > p_lower)
flags[in_bin & (residual > thresh)] = flag_bad
return _merge_flags(flags, old_flags)
[docs]
def quality_by_threshold(values, threshold=1.0):
"""
Flags values that meet or exceed a threshold.
Parameters
----------
values : int or float or array
values to flag.
threshold : int or float or array
flag threshold.
Returns
-------
bool
"""
return values >= threshold