import os
from pathlib import Path
from obspy import read, UTCDateTime
import pandas as pd
import numpy as np
from mspasspy.ccore.utility import (
    Metadata,
    MsPASSError,
    ErrorSeverity,
    AtomicType,
    ProcessingHistory,
)
from mspasspy.ccore.seismic import TimeSeriesEnsemble
from mspasspy.util.converter import Trace2TimeSeries
[docs]
def obspy_mseed_file_indexer(file):
    """
    Use obspy's miniseed reader to eat up a (potentially large) file and
    build an index as a table (returned) of data that can be written to
    a database.   Obspy's reader was written to accept the abomination of
    miniseed with random packets scattered through the file (or so it seems,
    since they have no foff concept in their reader).  Hence, what this reader
    does is make a table entry for each net:sta:chan:loc trace object their
    reader returns.   It does this with panda dataframes to build the
    table.  One required argument is the file name containing the miniseed data.
    """
    try:
        pr = Path(file)
        fullpath = pr.absolute()
        [dirself, dfileself] = os.path.split(fullpath)
        dseis = read(file, format="mseed")
        net = []
        sta = []
        chan = []
        loc = []
        stime = []
        etime = []
        samprate = []
        delta = []
        npts = []
        calib = []
        dfile = []
        dir = []
        mover = []
        tref = []
        format = []
        mover_self = "obspy_read"
        tref_self = "UTC"
        format_self = "miniseed"
        # Note obspy uses a more verbose name for net:sta:chan:loc
        # We change to mspass definition below that uses css3.0 names
        for x in dseis:
            net.append(x.stats["network"])
            sta.append(x.stats["station"])
            chan.append(x.stats["channel"])
            loc.append(x.stats["location"])
            sutc = x.stats["starttime"]
            stime.append(sutc.timestamp)
            eutc = x.stats["endtime"]
            etime.append(eutc.timestamp)
            samprate.append(x.stats["sampling_rate"])
            delta.append(x.stats["delta"])
            npts.append(x.stats["npts"])
            calib.append(x.stats["calib"])
            dfile.append(dfileself)
            dir.append(dirself)
            tref.append(tref_self)
            format.append(format_self)
            mover.append(mover_self)
        # Now convert the above to a pandas dataframe and return that
        # there may be a better way to do this than using this
        # intermediary dict object, but this should not be a hugely
        # compute or memory entensive operation even for large files
        ddict = {
            "net": net,
            "sta": sta,
            "chan": chan,
            "loc": loc,
            "starttime": stime,
            "endtime": etime,
            "samprate": samprate,
            "delta": delta,
            "npts": npts,
            "calib": calib,
            "dfile": dfile,
            "dir": dir,
            "treftype": tref,
            "format": format,
            "mover": mover,
        }
        return pd.DataFrame(ddict)
    except FileNotFoundError as err:
        print("mseed_file_indexer:  invalid file named received")
        print(err) 
[docs]
def dbsave_raw_index(db, pdframe, collection="import_miniseed_ensemble"):
    """
    Database save to db for a panda data frame pdframe.
    This crude version collection name is frozen as import_miniseed_ensemble.  db is
    assumed to be the client root for mongodb or the mspasspy Client
    that is a child of MongoClient.
    :param db:  MongoClient or mspass Client to save data desired
    :param pdframe:  panda data frame to be saved
    :param collection:  collection to which the data in pdframe is to be
      saved.  Default is 'import_miniseed_ensemble'
    """
    col = db[collection]
    # records is a keyword that makes rows of the dataframe docs for mongo
    dtmp = pdframe.to_dict("records")
    col.insert_many(dtmp) 
[docs]
def dbsave_seed_ensemble_file(db, file, gather_type="event", keys=None):
    """
    Indexer for SEED files that are already assembled in a
    "gather" meaning the data have some relation through one or more
    keys.   The association may be predefined by input though a
    keys array or left null for later association.   There is a large
    overhead in this function as it has to read the entire seed file
    to acquire the metadata it needs.  This version uses a bigger
    memory bloat than required because it uses obspy's seed reader
    that always eats up the whole file and returns a list of
    Trace object.  A less memory intensive approach would be to
    scan the seed blockettes to assemble the metadata, but
    that would be a future development.
    A KEY POINT about this function is that it ONLY builds an index
    for the data file it is given.  That index is loosely equivalent to
    the css3.0 wfdisc table, but organized completely differently.
    This function writes records into a import_miniseed_ensemble collection.
    The records written are a hierarchy expressed in json (bson) of
    how a Ensemble object is define: i.e. ensemble Metadata
    combined with a container of TimeSeries of Seismogram objects.
    Because SEED data always defines something that directly maps to
    TimeSeries this only works to create an index to build
    TimeSeriesEnsemble objects.
    This function was written to index ensembles that are passive
    array common event (source) gathers.  These are the passive array
    equivalent of a shot gather in reflection processing.   The
    format of the json(bson) document used for the index, however,
    is not limited to that case.  The gather type is defined by a
    metadata key (for this prototype the key is "gather_type").  The
    concept is the gather_type can be used as both a filter to select
    data and as a hint to readers on how to handle the data bundle.
    The gather (ensemble) metadata include a list of dict
    data that define a json/bson document defining the members of
    an ensemble.   Other documentation will be needed to define this
    concept more clearly with figures.
    A design constraint we impose for now is that one file generates
    one document in the import_miniseed_ensemble collection.   This means if
    the data for an ensemble is spread through several files the
    best approach is to convert all the data to TimeSeries objects and
    assemble them with a different algorithm
    A final point about this function is that it dogmatically always produces a
    unique uuid string using the ProcessingHistory newid method.   Be warned
    that that string CANNOT be converted to a MongoDB ObjectId as it is
    generated by a completely different algorithm.  The uuid is posted with
    the key "seed_file_id" to provide a unique name.  That id is used
    by the reader in this module to define a unique origin for a channel of
    data.  We note that usage is problematic for finding a particular
    waveform linked to a seed_file_id because the index storeds that
    attribute in subdocuments for each ensemble.
    :param db:  MongoDB database pointer - may also be a mspass Database
      class
    :param file:  seed file containing the data to be indexed.
    :param gather_type: character string defining a name that defines
      a particular ensemble type.  Default is "event", which is the
      only currently supported format.  (others keyword will cause an
      error to be thrown)  Anticipated alternatives are:  "common_receiver"
      or "station", "image_point", and "time_window".
     :return:  ObjectId of the document inserted that is the index for
      the file processed.
    """
    try:
        his = ProcessingHistory()  # used only to create uuids
        dbh = db["import_miniseed_ensemble"]
        pr = Path(file)
        fullpath = pr.absolute()
        [dirself, dfileself] = os.path.split(fullpath)
        dseis = read(file, format="mseed")
        # This holds the ensemble metatdata
        ensemblemd = {"dir": dirself}
        ensemblemd["dfile"] = dfileself
        ensemblemd["format"] = "mseed"
        # this is a placeholder not really necessary for seed data \
        # as seed data by definition yield TimeSeries type data although
        # not necessarily seismic data (e.g. MT data are distributed as mseed
        ensemblemd["member_type"] = "TimeSeries"
        ensemblemd["mover"] = "obspy_seed_ensemble_reader"
        members = []  # this list will contain one dict for each dseis Trace
        # we want to put time range of the data into enemblemd - we use these for that
        stimes = []
        etimes = []
        for d in dseis:
            mddict = {}
            mddict["net"] = d.stats["network"]
            mddict["sta"] = d.stats["station"]
            mddict["chan"] = d.stats["channel"]
            mddict["loc"] = d.stats["location"]
            st = d.stats["starttime"]
            et = d.stats["endtime"]
            mddict["starttime"] = st.timestamp
            mddict["endtime"] = et.timestamp
            stimes.append(st.timestamp)
            etimes.append(et.timestamp)
            mddict["sampling_rate"] = d.stats["sampling_rate"]
            mddict["delta"] = d.stats["delta"]
            mddict["npts"] = d.stats["npts"]
            mddict["calib"] = d.stats["calib"]
            # this key name could change
            mddict["seed_file_id"] = his.newid()
            members.append(mddict)
        ensemblemd["members"] = members
        tmin = np.median(stimes)
        tmax = np.median(etimes)
        ensemblemd["starttime"] = tmin
        ensemblemd["endtime"] = tmax
        result = dbh.insert_one(ensemblemd)
        return result.inserted_id
    except:
        print("something threw an exception - this needs detailed handlers") 
def _load_md(rec, keys):
    """
    Helper for load ensemble.   Extracts metadata defined by keys list and
    posts to a Metadata container that is returned.
    """
    # do this stupid for now without error handlers
    md = Metadata()
    for k in keys:
        x = rec[k]
        md.put(k, x)
    return md
[docs]
def load_one_ensemble(
    doc,
    create_history=False,
    jobname="Default job",
    jobid="99999",
    algid="99999",
    ensemble_mdkeys=[],  # default is to load nothing for ensemble
    apply_calib=False,
    verbose=False,
):
    """
    This function can be used to load a full ensemble indexed in the
    collection import_miniseed_ensemble.  It uses a large memory model
    that eat up the entire file using obspy's miniseed reader.   It contains
    some relics of early ideas of potentially having the function
    utilize the history mechanism.  Those may not work, but were retained.
    :param doc: is one record in the import_miniseed_ensemble collection
    :param create_history:  if true each member of the ensemble will be
      defined in the history chain as an origin and jobname and jobid will be
      be used to construct the ProcessingHistory object.
    :param jobname: as used in ProcessingHistory (default "Default job")
    :param jobid: as used in processingHistory
    :param algid: as used in processingHistory
    :param ensemble_mdkeys:  list of keys to copy from first member to ensemble
       Metadata (no type checking is done)
    :param apply_calib:  if True tells obspy's reader to apply the calibration
      factor to convert the data to ground motion units.  Default is false.
    :param verbose:  write informational messages while processing
    """
    try:
        ensemblemd = Metadata()
        if create_history:
            his = ProcessingHistory(jobname, jobid)
        form = doc["format"]
        mover = doc["mover"]
        if form != "mseed":
            raise MsPASSError(
                "Cannot handle this ensemble - ensemble format="
                + form
                + "\nCan only be mseed for this reader"
            )
        if mover != "obspy_seed_ensemble_reader":
            raise MsPASSError(
                "Cannot handle this ensemble - ensemble mover parameter="
                + mover
                + " which is not supported"
            )
        dir = doc["dir"]
        dfile = doc["dfile"]
        fname = dir + "/" + dfile
        # Note this algorithm actually should work with any format
        # supported by obspy's read function - should generalize it for release
        dseis = read(fname, format="mseed", apply_calib=apply_calib)
        if len(ensemble_mdkeys) > 0:
            ensemblemd = _load_md(doc, ensemble_mdkeys)
        else:
            # default is to load everything != members
            members_key = "members"
            for k in doc:
                if k != members_key:
                    x = doc[k]
                    ensemblemd[k] = x
        # There is a Stream2TimeSeriesEnsemble function
        # but we don't use it here because we need some functionality
        # not found in that simple function
        nseis = len(dseis)
        result = TimeSeriesEnsemble(ensemblemd, nseis)
        # Secondary files get handled almost the same except for
        # a warning.   The warning message (hopefully) explains the
        # problem but our documentation must warn about his if this
        # prototype algorithm becomes the release version
        count = 0
        for d in dseis:
            # print('debug - working on data object number',count)
            count += 1
            dts = Trace2TimeSeries(d)
            if create_history:
                # This should just define jobname and jobid
                dts.load_history(his)
                seedid = d["seed_file_id"]
                dts.set_as_origin(
                    "load_ensemble", algid, seedid, AtomicType.TIMESERIES, True
                )
            result.member.append(dts)
        return result
    except:
        print("something threw an exception - needs more complete error handlers") 
[docs]
def link_source_collection(db, dt=10.0, prefer_evid=False, verbose=False):
    """
    This prototype function uses a not at all generic method to link data
    indexed in a import_miniseed_ensemble collection to source data assumed stored
    in the source collection.   The algorithm is appropriate ONLY if the
    data are downloaded by obspy with a time window defined by a start time
    equal to the origin time of the event.   We use a generic test to check
    if the median ensemble start time (pulled from import_miniseed_ensemble record)
    is within +-dt of any origin time in source.   If found we extract the
    source_id of the maching event document and then update the record in
    import_miniseed_ensemble being handled.  Tha process is repeated for each
    document in the import_miniseed_ensemble collection.
    To handle css3.0 set the prefer_evid boolean True (default is False).
    When used the program will use a document with an evid set as a match
    if it finds multiple source matches.  This feature is used to match
    data with arrivals exported from a css3.0 database where the source
    data is embedded in the exported database view table.
    :param db:  MongoDB top level handle or a mspasspy.Database object to
      be accessed - this function is  pure database function talking to
      this db
    :param dt:  time range of match (search is + to - this value)
    :param prefer_evid:  As noted above if True select the source doc with
      evid set when there are multiple matches.
    :param verbose:  when true output will be more verbose.
    """
    dbwf = db["import_miniseed_ensemble"]
    dbsource = db["source"]
    try:
        ensrec = dbwf.find({})
        for ens in ensrec:
            # print('debug - at top of ensemble loop')
            t = ens["starttime"]
            tlow = t - dt
            thigh = t + dt
            query = {"time": {"$gte": tlow, "$lte": thigh}}
            matchid = ens["_id"]
            ens_match_arg = {"_id": matchid}
            # print('debug - query:',query)
            # print('range between ',UTCDateTime(tlow),'->',UTCDateTime(thigh))
            n = dbsource.count_documents(query)
            # print('debug - found ',n,' documents')
            if n == 0:
                if verbose:
                    print(
                        "link_source_collection:  no match in source for time=",
                        UTCDateTime(t),
                    )
                    print("This enemble cannot be processed")
            elif n == 1:
                srcrec = dbsource.find_one(query)
                # print('debug - query returned:',srcrec)
                # only in this situation will we update the document
                source_id = srcrec["source_id"]
                # print('debug - matchid and source_id=',matchid,source_id)
                if prefer_evid:
                    if "evid" in srcrec:
                        evid = srcrec["evid"]
                        update_record = {"$set": {"source_id": source_id, "evid": evid}}
                        if verbose:
                            print(
                                "Found evid=",
                                evid,
                                " for ensembled with start time=",
                                UTCDateTime(t),
                            )
                    else:
                        print(
                            "link_source_collection(WARNING): unique match for source at time=",
                            UTCDateTime(t),
                            " does not have evid set but function was called with prefer_evid true",
                        )
                        update_record = {"$set": {"source_id": source_id}}
                else:
                    update_record = {"$set": {"source_id": source_id}}
                dbwf.update_one(ens_match_arg, update_record)
            else:
                cursor = dbsource.find(query)
                evid = -1  # Negative used as a test for search failure
                for srcrec in cursor:
                    # this will be set to last record if evid search fails but
                    # will match evid field if there is a match because of the break
                    # note this logic sets source_id to the last record found
                    # when prefer_evid is false.  That is inefficient but
                    # we don't expect long match lists
                    source_id = srcrec["source_id"]
                    if prefer_evid and ("evid" in srcrec):
                        evid = srcrec["evid"]
                        matchid = ens["_id"]
                        break
                if evid > 0:
                    update_record = {"$set": {"source_id": source_id, "evid": evid}}
                    if verbose:
                        print(
                            "Found evid=",
                            evid,
                            " for ensembled with start time=",
                            UTCDateTime(t),
                        )
                else:
                    update_record = {"$set": {"source_id": source_id}}
                    if verbose:
                        print(
                            "Found ",
                            n,
                            " matches in source collection for ensemble start time=",
                            UTCDateTime(t),
                        )
                        print("Linking to document with source_id=", source_id)
                dbwf.update_one(ens_match_arg, update_record)
    except Exception as err:
        raise MsPASSError(
            "Something threw an unexpected exception", ErrorSeverity.Invalid
        ) from err 
[docs]
def load_source_data_by_id(db, mspass_object):
    """
    Prototype function to load source data to any MsPASS data object
    based on the normalization key.  That keys is frozen in this version
    as "source_id" but may be changed to force constraints by the mspasspy
    schema classes.
    Handling of Ensembles and atomic objects are different conceptually but
    in fact do exactly the same thing.   That is, in all cases the
    algorithm queries the input object for the key "source_id".  If that
    fails it returns an error.  Otherwise, it finds the associated document
    in the source collection.   It then posts a frozen set of metadata to
    mspass_object.   If that is an ensemble it is posted to the ensemble
    metadata area.  If it is an atomic object it gets posted to the atomic object's
    metadata area.
    """
    dbsource = db.source
    try:
        if not "source_id" in mspass_object:
            raise MsPASSError(
                "load_source_data_by_id",
                "required attribute source_id not in ensemble metadata",
                ErrorSeverity.Invalid,
            )
        source_id = mspass_object["source_id"]
        # The way we currently do this source_id eithe rmaches one documentn in
        # source or none.  Hence, we can jus use a find_one query
        srcrec = dbsource.find_one({"_id": source_id})
        # note find_one returns a None if there is no match.  Point this out
        # because if we used find we would use test size of return and use
        # next to get the data. Find_one return is easier but depends upon
        # the uniqueness assumption
        if srcrec == None:
            raise MsPASSError(
                "load_source_data",
                "no match found in source collection for source_id=" + source_id,
                ErrorSeverity.Invalid,
            )
        else:
            mspass_object["source_lat"] = srcrec["lat"]
            mspass_object["source_lon"] = srcrec["lon"]
            mspass_object["source_depth"] = srcrec["depth"]
            mspass_object["source_time"] = srcrec["time"]
        return mspass_object
    except:
        print("something threw an unexpected exception") 
[docs]
def load_hypocenter_data_by_time(
    db=None,
    ens=None,
    dbtime_key="time",
    mdtime_key="time_P",
    event_id_key="evid",
    phase="P",
    model="iasp91",
    dt=10.0,
    t0_definition="origin_time",
    t0_offset=0.0,
    kill_null=True,
):
    """
    Loads hypocenter data (space time coordinates) into an ensemble using
    an arrival time matching algorithm.  This is a generalization of earlier
    prototypes with the objective of a single interface to a common concept -
    that is, matching arrival documents to ensemble data using timing
    based on travel times.
    We frequently guide processing by the time of one or more seismic phases.
    Those times can be either measured times done by a human or an automated
    system or theoretical times from an earth model.   This function should
    work with either approach provided some earlier function created an
    arrival document that can be used for matching.  The matching algorithm uses
    three keys:  an exact match for net, an exact match for sta, and a time
    range travel time association.
    The algorithm is not a general associator.  It assumes we have access to
    an arrival collection that has been previously associated.  For those
    familiar with CSS3.0 the type example is the join of event->origin->assoc->arrival
    grouped by evid or orid.  We use a staged match to the ensemble to
    reduce compute time.  That is, we first run a db find on arrival to select
    only arrival documents within the time span of the ensemble with the
    defined arrival name key.  From each matching arrival we compute a theoretical
    origin time using obspy's taup calculator and a specified model and
    hypocenter coordinates in the arrival document that we assume was loaded
    previously from a css3.0 database (i.e. the event->origin->assoc->arrival
    view).   We then select and load source coordinates for the closest
    origin time match in the source collection.   This is much simpler than
    the general phase association (determine source coordinates from a random
    bad of arrival times) but still has one complication - if multiple
    events have arrivals within the time span of the ensemble the simple match
    described above is ambiguous.   We resolve that with a switch defined by
    the argument t0_definition.   Currently there are two options defining
    the only two options I know of for time selection of downloaded segments.
    (1) if set to 'origin_time' we assume member t0 values are near the origin
    time of the event.  (2) if set to 'phase_time' we assume the member t0
    values are relative to the phase used as a reference.  In both cases an
    optional t0_offset can be specified to offset each start time by a constant.
    The sign of the shift is to compare the data start time to the computed
    time MINUS the specified offset.  (e.g. if the reference is a P phase time
    and we expect the data to have been windowed with a start time 100 s before
    the theoretical P time, the offset would be 100.0)   Note if arrival windowing
    is selected the source information in arrival will not be referenced but
    that data is required if using origin time matching.  In any case when
    multiple sources are found to match the ensemble the one with the smallest
    rms misfit for the windowing is chosen.  A warning message is always
    posted in that situation.
    By default any ensemble members without a match in arrival will be
    killed. Note we use the kill method which only marks the data dead but
    does not clear the contents. Hence, on exit most ensembles will have
    at least some members marked dead.  The function returns the number of
    members set.  The caller should test and complain if there are no matches.
    """
    base_error_message = "load_hypocenter_data_by_time:  "
    if db == None:
        raise MsPASSError(
            base_error_message + "Missing required argument db=MongoDB Database handle",
            ErrorSeverity.Fatal,
        )
    elif ens == None:
        raise MsPASSError(
            base_error_message + "Missing required argument ens=mspass Ensemble data",
            ErrorSeverity.Fatal,
        )
    # First we need to query the arrival table to find all arrivals within
    # the time range of this ensemble
    try:
        dbarrival = db.arrival
        stime = ens["startttime"]
        etime = ens["endtime"]
        query = {{dbtime_key: {"$gte": stime, "$lte": etime}}}
        narr = dbarrival.count_documents(query)
        if narr == 0:
            print(base_error_message, "No arrivals found in data time range:")
            print(UTCDateTime(stime), " to ", UTCDateTime(etime))
            if kill_null:
                for d in ens.member:
                    d.kill()
            return 0
        else:
            # This else block isn't essential, but makes the logic clearer
            # first scan the data for unique events.  For now use an evid
            # test.  This could be generalized to coordinates
            arrivals = dbarrival.find(query)
            evids = dict()
            for doc in arrivals:
                evnow = doc[event_id_key]
                lat = doc["source.lat"]
                lon = doc["source.lon"]
                depth = doc["source.depth"]
                otime = doc["source.time"]
                evids[evnow] = [lat, lon, depth, otime]
            if len(evids) > 1:
                # land if picks from multiple events are inside the data window
                # INCOMPLETE - will probably drop this function
                for k in evids.keys():
                    print(k)
            # Above block always sets lat,lon,depth, and otime to the
            # selected hypocenter data.  IMPORTANT is for the unambiguous
            # case where len(evids) is one we depend on the python property
            # that lat,lon,depth, and otime are set because they have not
            # gone out of scope
    except MsPASSError as err:
        print(err)
        raise err 
[docs]
def load_site_data(db, ens):
    """
    Loads site data into ens.  Similar to load_source_data but uses a diffrent
    match:  net,sta, time matching startdate->enddate.   Mark members dead and
    post an elog message if the site coordinates are not found.
    """
    dbsite = db.site
    try:
        for d in ens.member:
            if d.dead():
                continue
            t0 = d["starttime"]
            net = d["net"]
            sta = d["sta"]
            query = {
                "net": {"$eq": net},
                "sta": {"$eq": sta},
                "starttime": {"$lt": t0},
                "endtime": {"$gt": t0},
            }
            n = dbsite.count_documents(query)
            if n == 0:
                d.kill()
                d.elog.log_error(
                    "load_site_data",
                    "no match found in site collection for net="
                    + net
                    + " sta="
                    + sta
                    + " for this event",
                    ErrorSeverity.Invalid,
                )
            else:
                siterec = dbsite.find_one(query)
                d["site_lat"] = siterec["lat"]
                d["site_lon"] = siterec["lon"]
                d["site_elev"] = siterec["elev"]
                d["site_id"] = siterec["site_id"]
                if n > 1:
                    message = "Multiple ({n}) matches found for net={net} and sta={sta} with reference time {t0}".format(
                        n=n, net=net, sta=sta, t0=t0
                    )
                    d.elog.log_error("load_site_data", message, ErrorSeverity.Complaint)
        return ens
    except Exception as err:
        raise MsPASSError(
            "Something threw an unexpected exception", ErrorSeverity.Invalid
        ) from err 
[docs]
def load_channel_data(db, ens):
    """
    Loads channel data into ens.  Similar to load_source_data but uses a diffrent
    match:  net,sta,loc,time matching startdate->enddate.   Mark members dead and
    post an elog message if required metadata are not found.
    """
    dbchannel = db.channel
    try:
        for d in ens.member:
            if d.dead():
                continue
            t0 = d["starttime"]
            # this is a sanity check to avoid throwing exceptions
            if (
                d.is_defined("net")
                and d.is_defined("sta")
                and d.is_defined("loc")
                and d.is_defined("chan")
            ):
                net = d["net"]
                sta = d["sta"]
                chan = d["chan"]
                loc = d["loc"]
                query = {
                    "net": {"$eq": net},
                    "sta": {"$eq": sta},
                    "chan": {"$eq": chan},
                    "loc": {"$eq": loc},
                    "starttime": {"$lt": t0},
                    "endtime": {"$gt": t0},
                }
                n = dbchannel.count_documents(query)
                if n == 0:
                    d.kill()
                    d.elog.log_error(
                        "load_channel_data",
                        "no match found in channel collection for net="
                        + net
                        + " sta="
                        + sta
                        + " chan="
                        + chan
                        + " loc="
                        + loc
                        + " for this event",
                        ErrorSeverity.Invalid,
                    )
                    continue
                if n == 1:
                    chanrec = dbchannel.find_one(query)
                else:
                    # In this case we just complain - and keep use the first record
                    # that is what find_one returns.  We use the count to make
                    # the eror message cleaer
                    chanrec = dbchannel.find_one(query)
                    message = "Multiple ({n}) matches found for net={net} and sta={sta} with reference time {t0}".format(
                        n=n, net=net, sta=sta, t0=t0
                    )
                    d.elog.log_error("load_site_data", message, ErrorSeverity.Complaint)
                d["site_lat"] = chanrec["lat"]
                d["site_lon"] = chanrec["lon"]
                d["site_elev"] = chanrec["elev"]
                d["vang"] = chanrec["vang"]
                d["hang"] = chanrec["hang"]
                d["site_id"] = chanrec["_id"]
        return ens
    except Exception as err:
        raise MsPASSError(
            "Something threw an unexpected exception", ErrorSeverity.Invalid
        ) from err 
[docs]
def load_arrivals_by_id(
    db,
    tsens,
    phase="P",
    required_key_map={"phase": "phase", "time": "arrival_time"},
    optional_key_map={"iphase": "iphase", "deltim": "deltim"},
    verbose=False,
):
    """
    Special prototype function to load arrival times in arrival collection
    to TimeSeries data in a TimeSeriesEnsemble. Match is a fixed query
    that uses source_id tht is assumed set for the ensemble AND previously
    defined for all arrival documents to be matched.  The correct record
    in arrival is found by the combination of matching source_id, net, and sta.
    This algorithm does a lot of queries to the arrival collection so an
    index on net and sta is essential.   Probably would be good to add source_id
    to the index as well.
    :param db:  MongoDB database handle or mspsspy.Database object
    :param tsens:  TimeSeriesEnsemble to be processed
    :param required_key_map:  a  dict of pairs defining how keys in
      arrival should map to metadata keys of ensemble members.  The key of
      each entry in the dict is used to fetch the required attribute from
      the MongoDB doc found in a query.  The value of that attribute is
      posted to each member TimeSeries object with the value of the key
      value pair associated with that entry.  e.g. for the default
      we fetch the data from the MongoDB document with the key time and
      write the value retrieved to Metadata with the key arrival.time.
      If a required key is not found in the arrival document a MsPASSError
      will be logged  and that member will be marked dead.  Users should
      always test for an empty ensemble after running this function.
    :param optional_key_map:  similar to required_key_map but missing attributes
      only geneate a complaint message and the data will be left live
    :param verbose:  print some messages otherwise only posted to elog
    :return: count of number of live members in the ensemble at completion
    """
    dbarrival = db.arrival
    algorithm = "load_arrivals_by_id"
    if "source_id" in tsens:
        source_id = tsens["source_id"]
        for d in tsens.member:
            if d.dead():
                continue
            if ("net" in d) and ("sta" in d):
                net = d["net"]
                sta = d["sta"]
                query = {"source_id": source_id, "net": net, "sta": sta, "phase": phase}
                n = dbarrival.count_documents(query)
                # print('debug  query=',query,' yield n=',n)
                if n == 0:
                    d.elog.log_error(
                        algorithm,
                        "No matching arrival for source_id="
                        + source_id
                        + " and net:sta ="
                        + net
                        + ":"
                        + sta,
                        ErrorSeverity.Invalid,
                    )
                    if verbose:
                        print(
                            "No matching arrival for source_id="
                            + source_id
                            + " and net:sta ="
                            + net
                            + ":"
                            + sta
                        )
                    d.kill()
                else:
                    cursor = dbarrival.find(query)
                    if n == 1:
                        rec = cursor.next()
                    elif n > 1:
                        d.elog.log_error(
                            algorithm,
                            "Multiple documents match source_id="
                            + source_id
                            + " and net:sta ="
                            + net
                            + ":"
                            + sta
                            + "  Using first found",
                            ErrorSeverity.Complaint,
                        )
                        if verbose:
                            print(
                                "debug:  multiple docs match - printing full documents of all matches.  Will use first"
                            )
                            for rec in cursor:
                                print(rec)
                            cursor.rewind()
                        rec = cursor.next()
                    for k in required_key_map:
                        if k in rec:
                            x = rec[k]
                            dkey = required_key_map[k]
                            d[dkey] = x
                        else:
                            d.elog.log_error(
                                algorithm,
                                "Required attribute with key="
                                + k
                                + " not found in matching arrival document - data killed",
                                ErrorSeverity.Invalid,
                            )
                            d.kill()
                            if verbose:
                                print(
                                    "Required attribute with key="
                                    + k
                                    + " not found in matching arrival document - data killed"
                                )
                    for k in optional_key_map:
                        if k in rec:
                            x = rec[k]
                            dkey = optional_key_map[k]
                            d[dkey] = x
                        else:
                            d.elog.log_error(
                                algorithm,
                                "Optional attribute with key="
                                + k
                                + " was not found in matching arrival document and cannot be loaded",
                                ErrorSeverity.Complaint,
                            )
                            if verbose:
                                print(
                                    "Optional attribute with key="
                                    + k
                                    + " was not found in matching arrival document and cannot be loaded"
                                )
    else:
        message = "Enemble metadata does not contain required source_id attribute - killing all data in this ensemble"
        if verbose:
            print(message)
        for d in tsens.member:
            d.elog.log_error("load_arrivals_by_id", message, ErrorSeverity.Invalid)
            d.kill()
    nlive = 0
    for d in tsens.member:
        if d.live:
            nlive += 1
    return nlive