Source code for ctdcal.convert

import logging
from pathlib import Path

import gsw
import numpy as np
import pandas as pd

from . import equations_sbe as sbe_eq
from . import get_ctdcal_config
from . import process_bottle as btl
from . import process_ctd as process_ctd
from . import sbe_reader as sbe_rd

cfg = get_ctdcal_config()
log = logging.getLogger(__name__)

# TODO: move this to a separate file?
# lookup table for sensor data
# DOUBLE CHECK TYPE IS CORRECT #
short_lookup = {
    "55": {
        "short_name": "CTDTMP",
        "long_name": "SBE 3+ Temperature",
        "units": "ITS-90",
        "type": "float64",
    },
    "45": {
        "short_name": "CTDPRS",
        "long_name": "SBE 9+ Pressure",
        "units": "DBAR",
        "type": "float64",
    },
    "3": {
        "short_name": "CTDCOND",
        "long_name": "SBE 4 Conductivity",
        "units": "MSPCM",
        "type": "float64",
    },
    "38": {
        "short_name": "CTDOXY",
        "long_name": "SBE 43 Oxygen",
        "units": "MLPL",
        "type": "float64",
    },
    # '38':{'short_name': 'CTDOXYVOLTS', 'long_name':'SBE 43 Oxygen Volts', 'units': '0-5VDC', 'type':'float64'},
    "11": {
        "short_name": "FLUOR",
        "long_name": "Seapoint Fluorometer",
        "units": "0-5VDC",
        "type": "float64",
    },
    "27": {"short_name": "FREE", "long_name": "empty", "units": "NA", "type": "NA"},
    "0": {
        "short_name": "ALT",
        "long_name": "Altitude",
        "units": "M",
        "type": "float64",
    },
    "71": {
        "short_name": "CTDXMISS",
        "long_name": "CStar",
        "units": "0-5VDC",
        "type": "float64",
    },
    "61": {
        "short_name": "U_DEF_poly",
        "long_name": "user defined, polynomial",
        "units": "0-5VDC",
        "type": "float64",
    },
    "80": {
        "short_name": "U_DEF_e",
        "long_name": "user defined, exponential",
        "units": "0-5VDC",
        "type": "float64",
    },
    "1000": {
        "short_name": "CTDSAL",
        "long_name": "Salinity (C1 T1)",
        "units": "PSU",
        "type": "float64",
    },
    "20": {
        "short_name": "CTDFLUOR",
        "long_name": "WetlabECO_AFL_FL_Sensor",
        "units": "0-5VDC",
        "type": "float64",
    },
    "42": {
        "short_name": "PAR",
        "long_name": "PAR/Irradiance, Biospherical/Licor",
        "units": "0-5VDC",
        "type": "float64",
    },
    "51": {
        "short_name": "REF_PAR",
        "long_name": "Surface PAR/Irradiance, Biospherical/Licor",
        "units": "0-5VDC",
        "type": "float64",
    },
    "70": {
        "short_name": "CTDBACKSCATTER",
        "long_name": "WetlabECO_BB_Sensor",
        "units": "0-5VDC",
        "type": "float64",
    },
    "13": {
        "short_name": "CTDFLUOR",
        "long_name": "FluoroSeatechWetlabsFLF_Sensor",
        "units": "0-5VDC",
        "type": "float64",
    },
}


[docs]def hex_to_ctd(ssscc_list): # TODO: add (some) error handling from odf_convert_sbe.py """ Convert raw CTD data and export to .pkl files. Parameters ---------- ssscc_list : list of str List of stations to convert Returns ------- """ log.info("Converting .hex files") for ssscc in ssscc_list: if not Path(cfg.dirs["converted"] + ssscc + ".pkl").exists(): hexFile = cfg.dirs["raw"] + ssscc + ".hex" xmlconFile = cfg.dirs["raw"] + ssscc + ".XMLCON" sbeReader = sbe_rd.SBEReader.from_paths(hexFile, xmlconFile) converted_df = convertFromSBEReader(sbeReader, ssscc) converted_df.to_pickle(cfg.dirs["converted"] + ssscc + ".pkl") return True
[docs]def make_time_files(ssscc_list): log.info("Generating time.pkl files") for ssscc in ssscc_list: if not Path(cfg.dirs["time"] + ssscc + "_time.pkl").exists(): converted_df = pd.read_pickle(cfg.dirs["converted"] + ssscc + ".pkl") # Remove any pressure spikes bad_rows = converted_df["CTDPRS"].abs() > 6500 if bad_rows.any(): log.debug(f"{ssscc}: {bad_rows.sum()} bad pressure points removed.") converted_df.loc[bad_rows, :] = np.nan converted_df.interpolate(limit=24, limit_area="inside", inplace=True) # Trim to times when rosette is in water trimmed_df = process_ctd.remove_on_deck( converted_df, ssscc, log_file=cfg.dirs["logs"] + "ondeck_pressure.csv", ) # # TODO: switch to loop instead, e.g.: # align_cols = [cfg.column[x] for x in ["c1", "c2"]] # "dopl" -> "CTDOXY1" # if not c1_col in raw_data.dtype.names: # print('c1_col data not found, skipping') # else: # raw_data = process_ctd.ctd_align(raw_data, c1_col, float(tc1_align)) # if not c2_col in raw_data.dtype.names: # print('c2_col data not found, skipping') # else: # raw_data = process_ctd.ctd_align(raw_data, c2_col, float(tc2_align)) # if not dopl_col in raw_data.dtype.names: # print('do_col data not found, skipping') # else: # raw_data = process_ctd.ctd_align(raw_data, dopl_col, float(do_align)) # TODO: add despike/wild edit filter (optional?) # Filter data filter_data = process_ctd.raw_ctd_filter( trimmed_df, window="triangle", parameters=cfg.filter_cols, ) # Trim to downcast cast_data = process_ctd.cast_details( filter_data, ssscc, log_file=cfg.dirs["logs"] + "cast_details.csv", ) cast_data.to_pickle(cfg.dirs["time"] + ssscc + "_time.pkl")
[docs]def make_btl_mean(ssscc_list): # TODO: add (some) error handling from odf_process_bottle.py """ Create "bottle mean" files from continuous CTD data averaged at the bottle stops. Parameters ---------- ssscc_list : list of str List of stations to convert Returns ------- boolean bottle averaging of mean has finished successfully """ log.info("Generating btl_mean.pkl files") for ssscc in ssscc_list: if not Path(cfg.dirs["bottle"] + ssscc + "_btl_mean.pkl").exists(): imported_df = pd.read_pickle(cfg.dirs["converted"] + ssscc + ".pkl") bottle_df = btl.retrieveBottleData(imported_df) mean_df = btl.bottle_mean(bottle_df) # export bottom bottle time/lat/lon info fname = cfg.dirs["logs"] + "bottom_bottle_details.csv" datetime_col = "nmea_datetime" if datetime_col not in mean_df.columns: log.debug( f"'{datetime_col}' not found in DataFrame - using 'scan_datetime'" ) datetime_col = "scan_datetime" bot_df = mean_df[[datetime_col, "GPSLAT", "GPSLON"]].head(1) bot_df.columns = ["bottom_time", "latitude", "longitude"] bot_df.insert(0, "SSSCC", ssscc) add_header = not Path(fname).exists() # add header iff file doesn't exist with open(fname, "a") as f: bot_df.to_csv(f, mode="a", header=add_header, index=False) mean_df.to_pickle(cfg.dirs["bottle"] + ssscc + "_btl_mean.pkl") return True
[docs]def convertFromSBEReader(sbeReader, ssscc): """Handler to convert engineering data to sci units automatically. Takes SBEReader object that is already connected to the .hex and .XMLCON files. """ # Retrieve parsed scans and convert to dataframe rawData = sbeReader.parsed_scans raw_df = pd.DataFrame(rawData).apply(pd.to_numeric, errors="ignore") raw_df.index.name = "index" # Metadata needs to be processed seperately and then joined with the converted data log.info(f"Building metadata dataframe for {ssscc}") metaArray = [line.split(",") for line in sbeReader._parse_scans_meta().tolist()] meta_cols, meta_dtypes = sbeReader._breakdown_header() meta_df = pd.DataFrame(metaArray) meta_df.columns = meta_cols meta_df.index.name = "index" for col, dtype in zip(meta_cols, meta_dtypes): if dtype != "bool_": meta_df[col] = meta_df[col].astype(dtype) else: # map from string "pseudo-boolean" to actual boolean values meta_df[col] = meta_df[col].map({"True": True, "False": False}) log.info("Success!") t_probe = meta_df["pressure_temp_int"].tolist() # raw int from Digitquartz T probe # Temporary arrays to hold scientific values needed to compute cond/oxy t_array, p_array, c_array = [], [], [] # needs to search sensor dictionary, and compute in order: # temp, pressure, cond, salinity, oxygen, all aux. # run one loop that builds a queue to determine order of processing, must track which column to pull # process queue, store results in seperate arrays for reuse later # once queue is empty, attach results together according to format order or xmlcon order - structure to keep track queue_metadata = [] temp_counter = 0 cond_counter = 0 oxygen_counter = 0 u_def_p_counter = 0 u_def_e_counter = 0 empty_counter = 0 # The following are definitions for every key in the dict below: # # sensor_id = number assigned by SBE for identification in XML # list_id = place in XML array by SBE for determining which sensor is which, alternatively channel number (freq+volt) # channel_pos = is it the first, second, third, etc sensor of its type in the data file, aux sensors default to 0 # ranking = data processing ranking - temp first, then pressure, then conductivity, then oxygen, then aux # column = column in the raw_df containing the engineering units to be converted to sci units # sensor_info = xml sensor info to convert from eng units to sci units rawConfig = sbeReader.parsed_config() # Retrieve Config data for list_id, sensor_info in rawConfig["Sensors"].items(): sensor_id = sensor_info["SensorID"] if sensor_id == "55": # temperature block temp_counter += 1 channel_pos = temp_counter ranking = 1 elif sensor_id == "45": # pressure block channel_pos = "" ranking = 2 elif sensor_id == "3": # conductivity block cond_counter += 1 channel_pos = cond_counter ranking = 3 elif sensor_id == "38": # oxygen block oxygen_counter += 1 channel_pos = oxygen_counter ranking = 5 elif sensor_id == "27": # empty block empty_counter += 1 channel_pos = empty_counter ranking = 6 elif sensor_id == "61": # user defined (polynomial) block u_def_p_counter += 1 channel_pos = u_def_p_counter ranking = 6 elif sensor_id == "80": # user defined (exponential) block u_def_e_counter += 1 channel_pos = u_def_e_counter ranking = 6 else: # auxiliary block channel_pos = "" ranking = 7 queue_metadata.append( { "sensor_id": sensor_id, "list_id": list_id, "channel_pos": channel_pos, "ranking": ranking, "column": list_id, "sensor_info": sensor_info, } ) # Temporary block in order to append basic salinity (calculated from T1/C1) to file # If additional salinity is needed (e.g. T2/C2), it'll need a full reworking queue_metadata.append( { "sensor_id": "1000", "list_id": 1000, "channel_pos": "", "ranking": 4, "column": "", "sensor_info": "", } ) # Queue sorting forces it to be in order, so we don't worry about order here # Assumes first channel for each sensor is primary for computing following data # TODO: rework to use config.py file to determine which is primary queue_metadata = sorted(queue_metadata, key=lambda sensor: sensor["ranking"]) # Initialize converted dataframe converted_df = pd.DataFrame() for meta in queue_metadata: col = f"{short_lookup[meta['sensor_id']]['short_name']}{meta['channel_pos']}" sensor_name = short_lookup[meta["sensor_id"]]["long_name"] sensor_units = short_lookup[meta["sensor_id"]]["units"] coefs = meta["sensor_info"] ### Temperature block if meta["sensor_id"] == "55": log.info(f"Processing Sensor ID: {meta['sensor_id']}, {sensor_name}") converted_df[col] = sbe_eq.sbe3(raw_df[meta["column"]], coefs) if meta["list_id"] == 0: t_array = converted_df[col].astype(float) log.info( f"\tPrimary temperature first reading: {t_array[0]} {sensor_units}" ) ### Pressure block elif meta["sensor_id"] == "45": log.info(f"Processing Sensor ID: {meta['sensor_id']}, {sensor_name}") converted_df[col] = sbe_eq.sbe9(raw_df[meta["column"]], t_probe, coefs) if meta["list_id"] == 2: p_array = converted_df[col].astype(float) log.info(f"\tPressure first reading: {p_array[0]} {sensor_units}") ### Conductivity block elif meta["sensor_id"] == "3": log.info(f"Processing Sensor ID: {meta['sensor_id']}, {sensor_name}") converted_df[col] = sbe_eq.sbe4( raw_df[meta["column"]], t_array, p_array, coefs ) if meta["list_id"] == 1: c_array = converted_df[col].astype(float) log.info(f"\tPrimary cond first reading: {c_array[0]} {sensor_units}") ### Oxygen block elif meta["sensor_id"] == "38": log.info(f"Processing Sensor ID: {meta['sensor_id']}, {sensor_name}") # TODO: put some kind of user-enabled flag in config.py, e.g. # if cfg["correct_oxy_hysteresis"]: V_corrected = sbe_eq.sbe43_hysteresis_voltage( raw_df[meta["column"]], p_array, coefs ) converted_df[col] = sbe_eq.sbe43( V_corrected, p_array, t_array, c_array, coefs, lat=meta_df["GPSLAT"], lon=meta_df["GPSLON"], ) converted_df["CTDOXYVOLTS"] = raw_df[meta["column"]] ### Fluorometer Seapoint block elif meta["sensor_id"] == "11": log.info(f"Processing Sensor ID: {meta['sensor_id']}, {sensor_name}") converted_df[col] = sbe_eq.seapoint_fluoro(raw_df[meta["column"]], coefs) ### Salinity block elif meta["sensor_id"] == "1000": log.info(f"Processing Sensor ID: {meta['sensor_id']}, {sensor_name}") converted_df[col] = gsw.SP_from_C(c_array, t_array, p_array) ### Altimeter block elif meta["sensor_id"] == "0": log.info(f"Processing Sensor ID: {meta['sensor_id']}, {sensor_name}") converted_df[col] = sbe_eq.sbe_altimeter(raw_df[meta["column"]], coefs) ### Rinko block elif meta["sensor_id"] == "61": if meta["sensor_info"]["SensorName"] in ("RinkoO2V", "RINKO"): log.info("Processing Rinko O2") # hysteresis correct then pass through voltage (see Uchida, 2010) coefs = {"H1": 0.0065, "H2": 5000, "H3": 2000, "offset": 0} converted_df[col] = sbe_eq.sbe43_hysteresis_voltage( raw_df[meta["column"]], p_array, coefs, ) ### Aux block else: log.info(f"Passing along Sensor ID: {meta['sensor_id']}, {sensor_name}") converted_df[col] = raw_df[meta["column"]] # Set the column name for the index converted_df.index.name = "index" log.info("Joining metadata dataframe with converted data...") converted_df = converted_df.join(meta_df) log.info("Success!") # return the converted data as a dataframe return converted_df
[docs]def to_temperature(raw, manufacturer, sensor, coefs): """ Wrapper to convert raw temperature output to scientific units using appropriate conversion function. Parameters ---------- raw : array-like Raw temperature output manufacturer : str Name of sensor manufacturer (e.g. "seabird", "RBR") sensor : str Name of sensor (e.g. for SBE "3", "4", "9") coefs : dict Dictionary of calibration coefficient from cal sheet Returns ------- temperature : array-like Converted temperature (ITS-90) """ # potential example of flexible/user-friendly conversion function if manufacturer.lower() in ["sbe", "seabird", "sea-bird"]: if sensor == "3": # sbe_eq.sbe3() pass elif sensor == "35": # ? pass elif manufacturer.lower() == "rbr": # will need to create equations_rbr module if sensor.lower() == "duo": pass elif sensor.lower() == "concerto": pass
[docs]def CR_to_cond(cr, bath_t, ref_t, btl_p): """ Convert AutoSal double conductivity ratio (CR) to conductivity using GSW conversion routines. Parameters ---------- cr : array-like Double conductivity ratio from AutoSal, unitless bath_t : array-like AutoSal water bath temperature, degrees C ref_t : array-like CTD temperature at bottle stop, degrees C btl_p : array-like CTD pressure at bottle stop, dbar Returns ------- cond : array-like Converted reference conductivity, mS/cm """ salinity = gsw.SP_salinometer((cr / 2.0), bath_t) cond = gsw.C_from_SP(salinity, ref_t, btl_p) # ignore RunTimeWarning from (np.nan <= 1) with np.errstate(invalid="ignore"): cond[cond <= 1] = np.nan return cond