Source code for ctdcal.odf_io

import csv
import io
import logging
from pathlib import Path

import gsw
import numpy as np
import pandas as pd

from . import get_ctdcal_config

cfg = get_ctdcal_config()
log = logging.getLogger(__name__)


def _salt_loader(filename, flag_file="tools/salt_flags_handcoded.csv"):
    """
    Load raw file into salt and reference DataFrames.
    """

    csv_opts = dict(delimiter=" ", quoting=csv.QUOTE_NONE, skipinitialspace="True")
    if isinstance(filename, (str, Path)):
        with open(filename, newline="") as f:
            saltF = csv.reader(f, **csv_opts)
            saltArray = [row for row in saltF]
            ssscc = Path(filename).stem
    elif isinstance(filename, io.StringIO):
        saltF = csv.reader(filename, **csv_opts)
        saltArray = [row for row in saltF]
        ssscc = "test_odf_io"
    else:
        raise NotImplementedError(
            "Salt loader only able to read in str, Path, or StringIO classes"
        )

    del saltArray[0]  # remove file header
    saltDF = pd.DataFrame.from_records(saltArray)

    cols = dict(  # having this as a dict streamlines next steps
        [
            ("STNNBR", int),
            ("CASTNO", int),
            ("SAMPNO", int),
            ("BathTEMP", int),
            ("CRavg", float),
            ("autosalSAMPNO", int),
            ("Unknown", int),
            ("StartTime", object),
            ("EndTime", object),
            ("Attempts", int),
        ]
    )

    # add as many "Reading#"s as needed
    for ii in range(0, len(saltDF.columns) - len(cols)):
        cols["Reading{}".format(ii + 1)] = float
    saltDF.columns = list(cols.keys())  # name columns

    # TODO: check autosalSAMPNO against SAMPNO for mismatches?
    # TODO: handling for re-samples?

    # check for commented out lines
    commented = saltDF["STNNBR"].str.startswith(("#", "x"))
    if commented.any():
        log.debug(f"Found comment character (#, x) in {ssscc} salt file, ignoring line")
        saltDF = saltDF[~commented]

    # check end time for * and code questionable
    # (unconfirmed but * appears to indicate a lot of things from LabView code:
    # large spread in values, long time between samples, manual override, etc.)
    flagged = saltDF["EndTime"].str.contains("*", regex=False)
    if flagged.any():
        # remove asterisks from EndTime and flag samples
        log.debug(f"Found * in {ssscc} salt file, flagging value(s) as questionable")
        saltDF["EndTime"] = saltDF["EndTime"].str.strip("*")
        questionable = pd.DataFrame()
        questionable["SAMPNO"] = saltDF.loc[flagged, "SAMPNO"].astype(int)
        questionable.insert(0, "SSSCC", ssscc)
        questionable["diff"] = np.nan
        questionable["salinity_flag"] = 3
        questionable["comments"] = "Auto-flagged by processing function (had * in row)"
        questionable.to_csv(flag_file, mode="a+", index=False, header=None)

    # add time (in seconds) needed for autosal drift removal step
    saltDF["IndexTime"] = pd.to_datetime(saltDF["EndTime"])
    saltDF["IndexTime"] = (saltDF["IndexTime"] - saltDF["IndexTime"].iloc[0]).dt.seconds
    saltDF["IndexTime"] += (saltDF["IndexTime"] < 0) * (3600 * 24)  # fix overnight runs

    refDF = saltDF.loc[
        saltDF["autosalSAMPNO"] == "worm", ["IndexTime", "CRavg"]
    ].astype(float)
    saltDF = saltDF[saltDF["autosalSAMPNO"] != "worm"].astype(cols)  # force dtypes

    return saltDF, refDF


[docs]def remove_autosal_drift(saltDF, refDF): """Calculate linear CR drift between reference values""" if refDF.shape != (2, 2): ssscc = f"{saltDF['STNNBR'].unique()[0]:03d}{saltDF['CASTNO'].unique()[0]:02d}" log.warning( f"Failed to find start/end reference readings for {ssscc}, check salt file" ) else: # find rate of drift diff = refDF.diff(axis="index").dropna() time_coef = (diff["CRavg"] / diff["IndexTime"]).iloc[0] # apply offset as a linear function of time saltDF = saltDF.copy(deep=True) # avoid modifying input dataframe saltDF["CRavg"] += saltDF["IndexTime"] * time_coef saltDF["CRavg"] = saltDF["CRavg"].round(5) # match initial precision return saltDF.drop(labels="IndexTime", axis="columns")
def _salt_exporter( saltDF, outdir=cfg.dirs["salt"], stn_col="STNNBR", cast_col="CASTNO" ): """ Export salt DataFrame to .csv file. Extra logic is included in the event that multiple stations and/or casts are included in a single raw salt file. """ stations = saltDF[stn_col].unique() for station in stations: stn_salts = saltDF[saltDF[stn_col] == station] casts = stn_salts[cast_col].unique() for cast in casts: stn_cast_salts = stn_salts[stn_salts[cast_col] == cast].copy() stn_cast_salts.dropna(axis=1, how="all", inplace=True) # drop empty columns outfile = Path(outdir) / f"{station:03.0f}{cast:02.0f}_salts.csv" # SSSCC_* if outfile.exists(): log.info(str(outfile) + " already exists...skipping") continue stn_cast_salts.to_csv(outfile, index=False)
[docs]def process_salts(ssscc_list, salt_dir=cfg.dirs["salt"]): """ Master salt processing function. Load in salt files for given station/cast list, calculate salinity, and export to .csv files. Parameters ---------- ssscc_list : list of str List of stations to process salt_dir : str, optional Path to folder containing raw salt files (defaults to data/salt/) """ for ssscc in ssscc_list: if (Path(salt_dir) / f"{ssscc}_salts.csv").exists(): log.info(f"{ssscc}_salts.csv already exists in {salt_dir}... skipping") continue else: try: saltDF, refDF = _salt_loader(Path(salt_dir) / ssscc) except FileNotFoundError: log.warning(f"Salt file for cast {ssscc} does not exist... skipping") continue saltDF = remove_autosal_drift(saltDF, refDF) saltDF["SALNTY"] = gsw.SP_salinometer( (saltDF["CRavg"] / 2.0), saltDF["BathTEMP"] ) # .round(4) _salt_exporter(saltDF, salt_dir)