Source code for ctdcal.odf_io

"""
A module for handling Autosalinomter files, in the ODF format (Carl Mattson).
"""

import csv
import io
import logging
from pathlib import Path

import gsw
import numpy as np
import pandas as pd

from ctdcal import get_ctdcal_config

cfg = get_ctdcal_config()
log = logging.getLogger(__name__)


def _salt_loader(filename, flag_file="tools/salt_flags_handcoded.csv"):
    """
    Load raw file into salt and reference DataFrames.
    """

    csv_opts = dict(delimiter=" ", quoting=csv.QUOTE_NONE, skipinitialspace="True")
    if isinstance(filename, (str, Path)):
        with open(filename, newline="") as f:
            saltF = csv.reader(f, **csv_opts)
            saltArray = [row for row in saltF]
            ssscc = Path(filename).stem
    elif isinstance(filename, io.StringIO):
        saltF = csv.reader(filename, **csv_opts)
        saltArray = [row for row in saltF]
        ssscc = "test_odf_io"
    else:
        raise NotImplementedError(
            "Salt loader only able to read in str, Path, or StringIO classes"
        )

    del saltArray[0]  # remove file header
    saltDF = pd.DataFrame.from_records(saltArray)

    cols = dict(  # having this as a dict streamlines next steps
        [
            ("STNNBR", int),
            ("CASTNO", int),
            ("SAMPNO", int),
            ("BathTEMP", int),
            ("CRavg", float),
            ("autosalSAMPNO", int),
            ("Unknown", int),
            ("StartTime", object),
            ("EndTime", object),
            ("Attempts", int),
        ]
    )

    # add as many "Reading#"s as needed
    for ii in range(0, len(saltDF.columns) - len(cols)):
        cols["Reading{}".format(ii + 1)] = float
    saltDF.columns = list(cols.keys())  # name columns

    # TODO: check autosalSAMPNO against SAMPNO for mismatches?
    # TODO: handling for re-samples?

    # check for commented out lines
    commented = saltDF["STNNBR"].str.startswith(("#", "x"))
    if commented.any():
        log.debug(f"Found comment character (#, x) in {ssscc} salt file, ignoring line")
        saltDF = saltDF[~commented]

    # check end time for * and code questionable
    # (unconfirmed but * appears to indicate a lot of things from LabView code:
    # large spread in values, long time between samples, manual override, etc.)
    flagged = saltDF["EndTime"].str.contains("*", regex=False)
    if flagged.any():
        # remove asterisks from EndTime and flag samples
        log.debug(f"Found * in {ssscc} salt file, flagging value(s) as questionable")
        saltDF["EndTime"] = saltDF["EndTime"].str.strip("*")
        questionable = pd.DataFrame()
        questionable["SAMPNO"] = saltDF.loc[flagged, "SAMPNO"].astype(int)
        questionable.insert(0, "SSSCC", ssscc)
        questionable["diff"] = np.nan
        questionable["salinity_flag"] = 3
        questionable["comments"] = "Auto-flagged by processing function (had * in row)"
        questionable.to_csv(flag_file, mode="a+", index=False, header=None)

    # add time (in seconds) needed for autosal drift removal step
    saltDF["IndexTime"] = pd.to_datetime(saltDF["EndTime"], format="%H:%M:%S")
    saltDF["IndexTime"] = (saltDF["IndexTime"] - saltDF["IndexTime"].iloc[0]).dt.seconds
    saltDF["IndexTime"] += (saltDF["IndexTime"] < 0) * (3600 * 24)  # fix overnight runs

    refDF = saltDF.loc[
        saltDF["autosalSAMPNO"] == "worm", ["IndexTime", "CRavg"]
    ].astype(float)
    saltDF = saltDF[saltDF["autosalSAMPNO"] != "worm"].astype(cols)  # force dtypes

    return saltDF, refDF


[docs] def remove_autosal_drift(saltDF, refDF): """Calculate linear CR drift between reference values""" if refDF.shape != (2, 2): ssscc = f"{saltDF['STNNBR'].unique()[0]:03d}{saltDF['CASTNO'].unique()[0]:02d}" log.warning( f"Failed to find start/end reference readings for {ssscc}, check salt file" ) else: # find rate of drift diff = refDF.diff(axis="index").dropna() time_coef = (diff["CRavg"] / diff["IndexTime"]).iloc[0] # apply offset as a linear function of time saltDF = saltDF.copy(deep=True) # avoid modifying input dataframe saltDF["CRavg"] += saltDF["IndexTime"] * time_coef saltDF["CRavg"] = saltDF["CRavg"].round(5) # match initial precision return saltDF.drop(labels="IndexTime", axis="columns")
def _salt_exporter( saltDF, outdir=cfg.dirs["salt"], stn_col="STNNBR", cast_col="CASTNO" ): """ Export salt DataFrame to .csv file. Extra logic is included in the event that multiple stations and/or casts are included in a single raw salt file. """ stations = saltDF[stn_col].unique() for station in stations: stn_salts = saltDF[saltDF[stn_col] == station] casts = stn_salts[cast_col].unique() for cast in casts: stn_cast_salts = stn_salts[stn_salts[cast_col] == cast].copy() stn_cast_salts.dropna(axis=1, how="all", inplace=True) # drop empty columns outfile = Path(outdir) / f"{station:03.0f}{cast:02.0f}_salts.csv" # SSSCC_* if outfile.exists(): log.info(str(outfile) + " already exists...skipping") continue stn_cast_salts.to_csv(outfile, index=False)
[docs] def process_salts(ssscc_list, salt_dir=cfg.dirs["salt"]): """ Master salt processing function. Load in salt files for given station/cast list, calculate salinity, and export to .csv files. Parameters ---------- ssscc_list : list of str List of stations to process salt_dir : str, optional Path to folder containing raw salt files (defaults to data/salt/) """ for ssscc in ssscc_list: if (Path(salt_dir) / f"{ssscc}_salts.csv").exists(): log.info(f"{ssscc}_salts.csv already exists in {salt_dir}... skipping") continue else: try: saltDF, refDF = _salt_loader(Path(salt_dir) / ssscc) except FileNotFoundError: log.warning(f"Salt file for cast {ssscc} does not exist... skipping") continue saltDF = remove_autosal_drift(saltDF, refDF) saltDF["SALNTY"] = gsw.SP_salinometer( (saltDF["CRavg"] / 2.0), saltDF["BathTEMP"] ) # .round(4) _salt_exporter(saltDF, salt_dir)