#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from abc import ABC, abstractmethod
from mspasspy.ccore.utility import MsPASSError, ErrorSeverity
from mspasspy.ccore.seismic import (
TimeSeries,
Seismogram,
TimeSeriesEnsemble,
SeismogramEnsemble,
)
# this is copied from edit.py. easier to duplicate than load that entire
# module with this one
def _input_is_valid(d):
"""
This internal function standardizes the test to certify the
input datum, d, is or is not a valid MsPASS data object. Putting it
in one place makes extending the code base for other data types much
easier. It uses an isinstance tests of d to standardize the test that
the input is valid data. It returns True if d is one a valid data
object known to mspass. Returns false it not. Caller must decide
what to do if the function returns false.
"""
return isinstance(
d, (TimeSeries, Seismogram, TimeSeriesEnsemble, SeismogramEnsemble)
)
# We need this for matchers that only work for atomic data (e.g. mseed matching)
def _input_is_atomic(d):
return isinstance(d, (TimeSeries, Seismogram))
[docs]
class NMF(ABC):
"""
Abstract base class for a family of Normalization Match Functions (NMF).
This family of object are used in MsPASS to standize the api for
generic mongodb match operation for "normalizing" a collection.
Normalization is comparable to a relational database join.
With MongoDB normalization is most sensible for the case when the
collection to be normalized is much smaller than collection that is
to be joined (normalized) i.e. the normalization operation is many
to one with many links from the documents in the collection to be
normalized to each normalizing document. The stock normalizing collections
in MsPASS are channel, site, and source.
The api defines two basic operations any concrete instance of the
class must implement: (1) a method to fetch the entire document
defining a match and (2) a method to fetch and copy specified
key-value pairs to a valid MsPASS data object.
"""
def __init__(self, kill_on_failure=True, verbose=False):
self.kill_on_failure = kill_on_failure
# this list is always needed for normalize method. Here we just
# initialize it to an empty list to avoid that step in all subclasses
self.attributes_to_load = []
self.verbose = verbose
def __call__(self, d):
"""
This convenience method allows a concrete instance to be
called with the simpler syntax with the (implied) principle
method "normalize". e.g. to normalize d with the
channel collection using id_matcher you can use
d = id_matcher(d) instead of d = id_matcher.normalize(d)
"""
self.normalize(d)
[docs]
@abstractmethod
def get_document(self, d):
"""
Fetch the database document matching the input datum.
:param d: MsPASS data object used to build the match query.
:return: matching MongoDB document, or ``None`` when no match is found.
:rtype: dict or None
"""
pass
[docs]
@abstractmethod
def normalize(self, d):
"""
Copy selected fields from the matching database document to a datum.
:param d: MsPASS data object to normalize.
:return: normalized input datum. Failed matches may log errors and
kill the datum depending on matcher configuration.
"""
pass
[docs]
def log_error(
self, d, matchername, message, kill=False, severity=ErrorSeverity.Informational
):
"""
This base class method is used to standardize the error logging
functionality of all NMF objects. It writes a standardized
message to simplify writing of subclasses - they need only
define the matchername (normally the name of the subclass) and
format a specific message to be posted. The caller may optionally
kill the datum and specify an alternative severity level to
the default warning.
Note most subclasses may want to include a verbose option in the constructor
(or the reciprocal silent) that provide an option of only writing log messages when
verbose is set true. There are possible cases with large data sets where
verbose messages can cause bottlenecks and bloated elog collections.
:param d: MsPASS data object to which elog message is to be
written.
:param matchername: is the string assigned to the "algorithm" field
of the message posted to d.elog.
:param message: specialized message to post - this string is added
to an internal generic message.
:param kill: boolean controlling if the message should cause the
datum to be killed. Default False meaning only the message
is posted.
:param severity: ErrorSeverity to assign to elog message
(See ErrorLogger docstring). Default is Informational
"""
if _input_is_valid(d):
fullmessage = message
if kill:
d.kill()
fullmessage += "\nDatum was killed"
d.elog.log_error(matchername, fullmessage, severity)
else:
raise MsPASSError(
"NMF.log_error method received invalid data; arg 0 must be a MsPASS data object",
ErrorSeverity.Fatal,
)
[docs]
class ID_matcher(NMF):
""" """
def __init__(self, db, collection, attributes_to_load, kill_on_failure=True):
if isinstance(collection, str):
super().__init__(kill_on_failure)
self.collection = collection
self.mdkey = collection + "_id"
self.dbhandle = db[collection]
# assume type errors will be thrown if attributes_to_load is not array like
# this is attributes_to_load is initialized to an empty list in
# super()
for x in attributes_to_load:
self.attributes_to_load.append(x)
else:
raise TypeError(
"ID_matcher constructor: arg0 must be a collection name - received invalid type"
)
[docs]
def get_document(self, d):
"""
Fetch the database document matching the input datum.
:param d: MsPASS data object used to build the match query.
:return: matching MongoDB document, or ``None`` when no match is found.
:rtype: dict or None
"""
if d.is_defined(self.mdkey):
query = {"_id": d[self.mdkey]}
# Note this will return None if the query fails - callers should handle that condition
return self.dbhandle.find_one(query)
else:
# this is actually redundant with log_error usage but better to be clear
if self.kill_on_failure:
d.kill()
message = (
"Normalizing ID with key="
+ self.mdkey
+ " is not defined in this object"
)
self.log_error(d, "ID_matcher", message, True, ErrorSeverity.Invalid)
return None
[docs]
def normalize(self, d):
"""
Copy selected fields from the matching database document to a datum.
:param d: MsPASS data object to normalize.
:return: normalized input datum. Failed matches may log errors and
kill the datum depending on matcher configuration.
"""
if _input_is_valid(d):
if d.dead():
return d
doc = self.get_document(d)
if doc == None:
message = (
"No matching _id found for "
+ self.mdkey
+ " in collection="
+ self.collection
)
self.log_error(
d,
"ID_matcher",
message,
self.kill_on_failure,
ErrorSeverity.Invalid,
)
else:
for key in self.attributes_to_load:
if key in doc:
d[key] = doc[key]
else:
# We accumulate error messages to aid user debugging
# but it could create bloated elog collections
message = (
"No data for key="
+ self.mdkey
+ " in document returned from collection="
+ self.collection
)
self.log_error(
d,
"ID_matcher",
message,
self.kill_on_failure,
ErrorSeverity.Invalid,
)
# Notice logic that if no match is found we log the error
# (and usually kill it) and land here. Allows application in a map
# operation
return d
[docs]
class mseed_channel_matcher(NMF):
"""
This class is used to match wf_miniseed to the channel collection using
the mseed standard channel string tags net, sta, chan, and (optionally) loc.
It can also be used to data saved in wf_TimeSeries where the mseed tags
are often altered by MsPASS to change fields like ``net`` to
``READONLYERROR_net``.
There is an automatic fallback for each of the tags where if the proper
name is not found we alway try to use the ``READONLYERROR_`` version before
giving up.
An issue with this matcher is that it is very common to have redundant
entries in the channel collection for the same channel of data. That
can happen for a variety of reasons that are harmless. When that happens
the method of this object will normally post an elog message warning of the
potential issue. Those warnings can be silenced by setting verbose
in the constructor to False.
"""
def __init__(
self,
db,
attributes_to_load=["lat", "lon", "elev", "hang", "vang"],
kill_on_failure=True,
readonly_tag="READONLYERROR_",
prepend_collection_name=True,
verbose=True,
):
super().__init__(kill_on_failure, verbose)
self.dbhandle = db["channel"]
# assume type errors will be thrown if attributes_to_load is not array like
for x in attributes_to_load:
self.attributes_to_load.append(x)
self.readonly_tag = readonly_tag
self.prepend_collection_name = prepend_collection_name
[docs]
def get_document(self, d, time=None):
"""
Fetch the channel/site document matching net, station, location, and time.
:param d: atomic MsPASS data object used to build the match query.
:param time: optional match time; defaults to ``d.t0``.
:return: matching MongoDB document, or ``None`` when no match is found.
:rtype: dict or None
"""
if not _input_is_atomic(d):
raise TypeError(
"mseed_channel_matcher.get_document: data received as arg0 is not an atomic MsPASS data object"
)
query_is_ok = True
query = {}
if d.is_defined("net"):
query["net"] = d["net"]
elif d.is_defined(self.readonly_tag + "net"):
query["net"] = d[self.readonly_tag + "net"]
else:
self.log_error(
d,
"mseed_channel_matcher",
"Required match key=net or "
+ self.readonly_tag
+ "net are not defined for this datum",
self.kill_on_failure,
ErrorSeverity.Invalid,
)
# We repeat the above logic for sta and chan for debugging but it
# could cause bloated elog messages if a user makes a dumb error
# with a large data set. that seems preferable to mysterious behavior
# could make it a verbose option but for now we will always blunder on
if d.is_defined("sta"):
query["sta"] = d["sta"]
elif d.is_defined(self.readonly_tag + "sta"):
query["sta"] = d[self.readonly_tag + "sta"]
else:
query_is_ok = False
self.log_error(
d,
"mseed_channel_matcher",
"Required match key=sta or "
+ self.readonly_tag
+ "sta are not defined for this datum",
self.kill_on_failure,
ErrorSeverity.Invalid,
)
if d.is_defined("chan"):
query["chan"] = d["chan"]
elif d.is_defined(self.readonly_tag + "chan"):
query["chan"] = d[self.readonly_tag + "chan"]
else:
query_is_ok = False
self.log_error(
d,
"mseed_channel_matcher",
"Required match key=chan or "
+ self.readonly_tag
+ "chan are not defined for this datum",
self.kill_on_failure,
ErrorSeverity.Invalid,
)
# loc has to be handled differently because it is often not defined
# We just don't add loc to the query if it isn't defined
if d.is_defined("loc"):
query["loc"] = d["loc"]
elif d.is_defined(self.readonly_tag + "loc"):
query["loc"] = d[self.readonly_tag + "loc"]
# return now if this datum has been marked dead
if not query_is_ok:
return None
# default to data start time if time is not explicitly passed
if time:
querytime = time
else:
querytime = d.t0
query["starttime"] = {"$lt": querytime}
query["endtime"] = {"$gt": querytime}
matchsize = self.dbhandle.count_documents(query)
if matchsize == 0:
return None
if matchsize > 1 and self.verbose:
self.log_error(
d,
"mseed_channel_matcher",
"Multiple channel docs match net:sta:chan:loc:time for this datum - using first one found",
False,
ErrorSeverity.Complaint,
)
return self.dbhandle.find_one(query)
[docs]
def normalize(self, d, time=None):
"""
Normalize a datum with fields from the matching channel/site document.
:param d: atomic MsPASS data object to normalize.
:param time: optional match time; defaults to ``d.t0``.
:return: normalized input datum.
"""
if d.dead():
return d
if _input_is_atomic(d):
doc = self.get_document(d, time)
if doc == None:
message = "No matching document was found in channel collection for this datum"
self.log_error(
d,
"mseed_channel_matcher",
message,
self.kill_on_failure,
ErrorSeverity.Invalid,
)
else:
for key in self.attributes_to_load:
if key in doc:
if self.prepend_collection_name:
mdkey = "channel_" + key
else:
mdkey = key
d[mdkey] = doc[key]
else:
# We accumulate error messages to aid user debugging
# but it could create bloated elog collections
message = (
"No data for key="
+ self.mdkey
+ " in document returned from collection="
+ self.collection
)
self.log_error(
d,
"mseed_channel_matcher",
message,
self.kill_on_failure,
ErrorSeverity.Invalid,
)
# Notice logic that if no match is found we log the error
# (and usually kill it) and land here. Allows application in a map
# operation
return d
# this class is modified form mseed_channel_matcher removing the chan
# key and replacing channel by site
[docs]
class mseed_site_matcher(NMF):
"""
This class is used to match derived from seed data to the site collection using
the mseed standard site string tags net, sta, and (optionally) loc.
It can also be used to data saved in wf_TimeSeries or wf_Seismogram where the mseed tags
are often altered by MsPASS to change fields like ``net`` to
``READONLYERROR_net``.
There is an automatic fallback for each of the tags where if the proper
name is not found we alway try to use the ``READONLYERROR_`` version before
giving up.
An issue with this matcher is that it is very common to have redundant
entries in the site collection for the same site of data. That
can happen for a variety of reasons that are harmless. When that happens
the method of this object will normally post an elog message warning of the
potential issue. Those warnings can be silenced by setting verbose
in the constructor to False.
"""
def __init__(
self,
db,
attributes_to_load=["lat", "lon", "elev"],
kill_on_failure=True,
readonly_tag="READONLYERROR_",
prepend_collection_name=True,
verbose=True,
):
super().__init__(kill_on_failure, verbose)
self.dbhandle = db["site"]
# assume type errors will be thrown if attributes_to_load is not array like
for x in attributes_to_load:
self.attributes_to_load.append(x)
self.readonly_tag = readonly_tag
self.prepend_collection_name = prepend_collection_name
[docs]
def get_document(self, d, time=None):
"""
Fetch the channel/site document matching net, station, location, and time.
:param d: atomic MsPASS data object used to build the match query.
:param time: optional match time; defaults to ``d.t0``.
:return: matching MongoDB document, or ``None`` when no match is found.
:rtype: dict or None
"""
if not _input_is_atomic(d):
raise TypeError(
"mseed_site_matcher.get_document: data received as arg0 is not an atomic MsPASS data object"
)
query_is_ok = True
query = {}
if d.is_defined("net"):
query["net"] = d["net"]
elif d.is_defined(self.readonly_tag + "net"):
query["net"] = d[self.readonly_tag + "net"]
else:
self.log_error(
d,
"mseed_site_matcher",
"Required match key=net or "
+ self.readonly_tag
+ "net are not defined for this datum",
self.kill_on_failure,
ErrorSeverity.Invalid,
)
# We repeat the above logic for sta and chan for debugging but it
# could cause bloated elog messages if a user makes a dumb error
# with a large data set. that seems preferable to mysterious behavior
# could make it a verbose option but for now we will always blunder on
if d.is_defined("sta"):
query["sta"] = d["sta"]
elif d.is_defined(self.readonly_tag + "sta"):
query["sta"] = d[self.readonly_tag + "sta"]
else:
query_is_ok = False
self.log_error(
d,
"mseed_site_matcher",
"Required match key=sta or "
+ self.readonly_tag
+ "sta are not defined for this datum",
self.kill_on_failure,
ErrorSeverity.Invalid,
)
# loc has to be handled differently because it is often not defined
# We just don't add loc to the query if it isn't defined
if d.is_defined("loc"):
query["loc"] = d["loc"]
elif d.is_defined(self.readonly_tag + "loc"):
query["loc"] = d[self.readonly_tag + "loc"]
# return now if this datum has been marked dead
if not query_is_ok:
return None
# default to data start time if time is not explicitly passed
if time:
querytime = time
else:
querytime = d.t0
query["starttime"] = {"$lt": querytime}
query["endtime"] = {"$gt": querytime}
matchsize = self.dbhandle.count_documents(query)
if matchsize == 0:
return None
if matchsize > 1 and self.verbose:
self.log_error(
d,
"mseed_site_matcher",
"Multiple site docs match net:sta:chan:loc:time for this datum - using first one found",
False,
ErrorSeverity.Complaint,
)
return self.dbhandle.find_one(query)
[docs]
def normalize(self, d, time=None):
"""
Normalize a datum with fields from the matching channel/site document.
:param d: atomic MsPASS data object to normalize.
:param time: optional match time; defaults to ``d.t0``.
:return: normalized input datum.
"""
if d.dead():
return d
if _input_is_atomic(d):
doc = self.get_document(d, time)
if doc == None:
message = (
"No matching document was found in site collection for this datum"
)
self.log_error(
d,
"mseed_site_matcher",
message,
self.kill_on_failure,
ErrorSeverity.Invalid,
)
else:
for key in self.attributes_to_load:
if key in doc:
if self.prepend_collection_name:
mdkey = "site_" + key
else:
mdkey = key
d[mdkey] = doc[key]
else:
# We accumulate error messages to aid user debugging
# but it could create bloated elog collections
message = (
"No data for key="
+ self.mdkey
+ " in document returned from collection="
+ self.collection
)
self.log_error(
d,
"mseed_site_matcher",
message,
self.kill_on_failure,
ErrorSeverity.Invalid,
)
# Notice logic that if no match is found we log the error
# (and usually kill it) and land here. Allows application in a map
# operation
return d
else:
raise TypeError("mseed_site_matcher.normalize: received invalid data type")
[docs]
class origin_time_source_matcher(NMF):
""" """
def __init__(
self,
db,
collection="source",
t0offset=0.0,
tolerance=4.0,
attributes_to_load=["lat", "lon", "depth", "time"],
kill_on_failure=True,
prepend_collection_name=True,
verbose=True,
):
""" """
super().__init__(kill_on_failure, verbose)
self.collection = collection
self.dbhandle = db[collection]
self.t0offset = t0offset
self.tolerance = tolerance
self.prepend_collection_name = prepend_collection_name
for x in attributes_to_load:
self.attributes_to_load.append(x)
[docs]
def get_document(self, d, time=None):
""" """
if _input_is_valid(d):
# this logic allows setting ensemble metadata using a specific
# time but if time is not defined we default to using data start time (t0)
if time == None:
test_time = d.t0 - self.t0offset
else:
test_time = time - self.t0offset
query = {
"time": {
"$ge": test_time - self.tolerance,
"$le": test_time + self.tolerance,
}
}
matchsize = self.dbhandle.count_documents(query)
if matchsize == 0:
return None
elif matchsize > 1 and self.verbose:
self.log_error(
d,
"origin_time_source_matcher",
"multiple source documents match the origin time computed from time received - using first found",
ErrorSeverity.Complaint,
)
return self.dbhandle.find_one(query)
else:
return None
[docs]
def normalize(self, d, time=None):
"""
Normalize a datum with source fields matched by origin time.
:param d: MsPASS data object to normalize.
:param time: optional origin time override.
:return: normalized input datum.
"""
if d.dead():
return d
if _input_is_valid(d):
doc = self.get_document(d, time)
if doc == None:
message = (
"No matching document was found in"
+ self.collection
+ " collection for this datum"
)
self.log_error(
d,
"origin_time_source_matcher",
message,
self.kill_on_failure,
ErrorSeverity.Invalid,
)
else:
for key in self.attributes_to_load:
if key in doc:
if self.prepend_collection_name:
mdkey = self.collection + "_" + key
else:
mdkey = key
d[mdkey] = doc[key]
else:
# We accumulate error messages to aid user debugging
# but it could create bloated elog collections
message = (
"No data for key="
+ self.mdkey
+ " in document returned from collection="
+ self.collection
)
self.log_error(
d,
"origin_time_source_matcher",
message,
self.kill_on_failure,
ErrorSeverity.Invalid,
)
# Notice logic that if no match is found we log the error
# (and usually kill it) and land here. Allows application in a map
# operation
return d
else:
raise TypeError(
"origin_time_source_matcher.normalize: received invalid data type"
)
[docs]
class css30_arrival_interval_matcher(NMF):
"""
This matcher is used to match phase picks stored in the database
(default is arrival collection) to waveforms. The basic algorithm
is an interval match. That is, an arrival with a time between
starttime and endtime is considered a match. If multiple matches
are found for same phase name the algorithm uses a time offset test
of starttime relative to the phase time. The data for the arrival
doc with time most closely matched to starttime+time_offset is
selected.
The main use of this class is to match a collection of raw data
with arrival time picks made by another source
(e.g. the Array Network Facilty of Earthscope css3.0 arrival picks)
"""
def __init__(
self,
db,
startime_offset=60.0,
phasename="P",
phasename_key="phase",
attributes_to_load=["time"],
load_if_defined=["evid", "iphase", "seaz", "esaz", "deltim", "timeres"],
kill_on_failure=False,
prepend_collection_name=True,
verbose=True,
arrival_collection_name="arrival",
):
""" """
super().__init__(kill_on_failure, verbose)
self.phasename = phasename
self.phasename_key = phasename_key
for x in attributes_to_load:
self.attributes_to_load.append(x)
self.load_if_defined = []
for x in load_if_defined:
self.load_if_defined.append(x)
self.prepend_collection_name = prepend_collection_name
self.dbhandle = db[arrival_collection_name]
[docs]
def get_document(self, d):
"""
Fetch the database document matching the input datum.
:param d: MsPASS data object used to build the match query.
:return: matching MongoDB document, or ``None`` when no match is found.
:rtype: dict or None
"""
stime = d.t0
etime = d.endtime()
query = {self.phasename_key: self.phasename}
query["time"] = {"$ge": stime, "$le": etime}
n = self.dbhandle.count_documents(query)
if n == 0:
return None
elif n == 1:
return self.dbhandle.find_one(query)
else:
cursor = self.dbhandle.find(query)
matchlist = []
# the key here perhaps should be set in constructor
# for now it is frozen as this constant
for doc in cursor:
# ignore any docs with the time attribute not set
if "time" in doc:
dt = doc["time"] - self.time_offset
matchlist.append([abs(dt), doc])
# handle these special cases
n_to_test = len(matchlist)
if n_to_test == 0:
raise MsPASSError(
"css30_arrival_interval_matcher.get_document: no arrival docs found with phasename set as"
+ self.phasename
+ " with a time attribute defined. This should not happen and indicates a serious database inconsistence. Aborting",
ErrorSeverity.Fatal,
)
elif n_to_test == 1:
# weird syntax but this returns to doc of the one and only
# tuple getting through the above loop. Execution of this
# block should be very very rare
return matchlist[0][1]
else:
dtmin = matchlist[0][0]
imin = 0
for i in range(len(matchlist) - 1):
ii = i + 1
dt = matchlist[ii][0]
# not dt values are stored as abs differences
if dt < dtmin:
imin = ii
dtmin = dt
return matchlist[imin][1]
[docs]
def normalize(self, d):
"""
Copy selected fields from the matching database document to a datum.
:param d: MsPASS data object to normalize.
:return: normalized input datum. Failed matches may log errors and
kill the datum depending on matcher configuration.
"""
if d.dead():
return d
if _input_is_atomic(d):
doc = self.get_document(d)
if doc == None:
message = (
"No matching document was found in"
+ self.arrival_collection_name
+ " collection for this datum"
)
self.log_error(
d,
"css30_arrival_interval_matcher",
message,
self.kill_on_failure,
ErrorSeverity.Invalid,
)
else:
for key in self.attributes_to_load:
if key in doc:
if self.prepend_collection_name:
mdkey = self.collection + "_" + key
else:
mdkey = key
d[mdkey] = doc[key]
else:
# We accumulate error messages to aid user debugging
# but it could create bloated elog collections
message = (
"No data for key="
+ self.mdkey
+ " in document returned from collection="
+ self.collection
)
self.log_error(
d,
"css30_arrival_interval_matcher",
message,
self.kill_on_failure,
ErrorSeverity.Invalid,
)
# similar for optional but don't log errors for missing
# attributes unless verbose is set true
for key in self.load_if_defined:
if key in doc:
if self.prepend_collection_name:
mdkey = self.collection + "_" + key
else:
mdkey = key
d[mdkey] = doc[key]
elif self.verbose:
self.log_error(
"css30_arrival_interval_matcher",
"No data found with optional load key=" + key,
ErrorSeverity.Informational,
)
# Notice logic that if no match is found we log the error
# (and usually kill it) and land here. Allows application in a map
# operation
return d
else:
raise TypeError(
"css30_arrival_interval_matcher.normalize: received invalid data type"
)