Source code for mspasspy.global_history.manager

import os
import yaml
import pymongo
import collections
import json

try:
    import dask.bag as daskbag
except ImportError:
    pass
try:
    import pyspark
except ImportError:
    pass
from bson.objectid import ObjectId
from mspasspy.ccore.utility import MsPASSError, AntelopePf
from mspasspy.util.converter import AntelopePf2dict
from datetime import datetime, timezone
from dill.source import getsource
import functools

import mspasspy.algorithms.signals as signals
from mspasspy.global_history.ParameterGTree import ParameterGTree, parameter_to_GTree


[docs] def mspass_map( data, func, global_history=None, object_history=False, alg_id=None, alg_name=None, parameters=None, ): """ This decorator method performs map function in Python. Instead of performing the normal map function, if user provides global history manager, alg_id(optional), alg_name(optional) and parameters(optional) as input, the global history manager will log down the usage of the algorithm. Also, if user set object_history to be True, then each mspass object in this map function will save the object level history. :param data: a iterable which is to be mapped. :param func: target function to which map passes each element of given iterable. :param global_history: a user specified global history manager :type global_history: :class:`GlobalHistoryManager` :param object_history: save the each object's history in the map when True :param alg_id: a user specified alg_id for the map operation :type alg_id: :class:`str`/:class:`bson.objectid.ObjectId` :param alg_name: a user specified alg_name for the map operation :type alg_name: :class:`str` :param parameters: a user specified parameters for the map operation :type parameters: :class:`str` :return: mapped objects. """ if parameters: parameterGTree = parameter_to_GTree(parameters_str=parameters) else: new_kwargs = {} new_kwargs["object_history"] = object_history if alg_name: new_kwargs["alg_name"] = alg_name if alg_id: new_kwargs["alg_id"] = alg_id parameterGTree = parameter_to_GTree(**new_kwargs) parameters_json = json.dumps(parameterGTree.asdict()) if not alg_name: alg_name = func.__name__ # get the alg_id if exists, else create a new one if not alg_id: # get the alg_id if exists if global_history: alg_id = global_history.get_alg_id(alg_name, parameters_json) # else create a new one if not alg_id: alg_id = ObjectId() # save the global history if global_history: global_history.logging(alg_id, alg_name, parameters_json) # save the object history if object_history: return map( lambda wf: func( wf, object_history=object_history, alg_name=alg_name, alg_id=str(alg_id), ), data, ) return map(lambda wf: func(wf, object_history=object_history), data)
[docs] def mspass_reduce( data, func, global_history=None, object_history=False, alg_id=None, alg_name=None, parameters=None, ): """ This method performs reduce function using functools. Instead of performing the normal reduce function, if user provides global history manager, alg_id(optional), alg_name(optional) and parameters(optional) as input, the global history manager will log down the usage of the algorithm. Also, if user set object_history to be True, then each mspass object in this reduce function will save the object level history. :param data: data to be processed, it needs to be a iterable. Apply func of two arguments cumulatively to the items of iterable, from left to right, so as to reduce the iterable to a single value. :param func: target function :param global_history: a user specified global history manager :type global_history: :class:`GlobalHistoryManager` :param object_history: save the each object's history in the reduce when True :param alg_id: a user specified alg_id for the reduce operation :type alg_id: :class:`str`/:class:`bson.objectid.ObjectId` :param alg_name: a user specified alg_name for the reduce operation :type alg_name: :class:`str` :param parameters: a user specified parameters for the reduce operation :type parameters: :class:`str` :return: reduced objects. """ if parameters: parameterGTree = parameter_to_GTree(parameters_str=parameters) else: new_kwargs = {} new_kwargs["object_history"] = object_history if alg_name: new_kwargs["alg_name"] = alg_name if alg_id: new_kwargs["alg_id"] = alg_id parameterGTree = parameter_to_GTree(**new_kwargs) parameters_json = json.dumps(parameterGTree.asdict()) if not alg_name: alg_name = func.__name__ # get the alg_id if exists, else create a new one if not alg_id: # get the alg_id if exists if global_history: alg_id = global_history.get_alg_id(alg_name, parameters_json) # else create a new one if not alg_id: alg_id = ObjectId() # save the global history if global_history: global_history.logging(alg_id, alg_name, parameters_json) # save the object history if object_history: return functools.reduce( lambda a, b: func( a, b, object_history=object_history, alg_name=alg_name, alg_id=str(alg_id), ), data, ) return functools.reduce( lambda a, b: func(a, b, object_history=object_history), data )
[docs] def mspass_spark_map( self, func, *args, global_history=None, object_history=False, alg_id=None, alg_name=None, parameters=None, **kwargs, ): """ This decorator method add more functionaliy on the standard spark map method and be a part of member functions in the spark RDD library. Instead of performing the normal map function, if user provides global history manager, alg_id(optional), alg_name(optional) and parameters(optional) as input, the global history manager will log down the usage of the algorithm. Also, if user set object_history to be True, then each mspass object in this map function will save the object level history. :param func: target function :param global_history: a user specified global history manager :type global_history: :class:`GlobalHistoryManager` :param object_history: save the each object's history in the map when True :param alg_id: a user specified alg_id for the map operation :type alg_id: :class:`str`/:class:`bson.objectid.ObjectId` :param alg_name: a user specified alg_name for the map operation :type alg_name: :class:`str` :param parameters: a user specified parameters for the map operation :type parameters: :class:`str` :return: a spark `RDD` format of objects. """ if parameters: parameterGTree = parameter_to_GTree(parameters_str=parameters) else: new_kwargs = kwargs.copy() new_kwargs["object_history"] = object_history if alg_name: new_kwargs["alg_name"] = alg_name if alg_id: new_kwargs["alg_id"] = alg_id parameterGTree = parameter_to_GTree(*args, **new_kwargs) parameters_json = json.dumps(parameterGTree.asdict()) if not alg_name: alg_name = func.__name__ # get the alg_id if exists, else create a new one if not alg_id: # get the alg_id if exists alg_id = global_history.get_alg_id(alg_name, parameters_json) # else create a new one if not alg_id: alg_id = ObjectId() # save the global history if global_history: global_history.logging(alg_id, alg_name, parameters_json) # read_data method if alg_name.rfind("read_data") != -1 and alg_name.rfind("read_data") + 9 == len( alg_name ): if global_history: return self.map( lambda wf: func( wf, *args, alg_name=alg_name, alg_id=str(alg_id), **kwargs ) ) else: return self.map(lambda wf: func(wf, *args, **kwargs)) # save_data method if alg_name.rfind("save_data") != -1 and alg_name.rfind("save_data") + 9 == len( alg_name ): # (return_code, mspass_object) is return for save_data, otherwise the original mspass_object is unchanged if global_history: return self.map( lambda wf: ( func(wf, *args, alg_name=alg_name, alg_id=str(alg_id), **kwargs), wf, ) ) else: return self.map(lambda wf: (func(wf, *args, **kwargs), wf)) # save the object history if object_history: return self.map( lambda wf: func( wf, *args, object_history=object_history, alg_name=alg_name, alg_id=str(alg_id), **kwargs, ) ) return self.map(lambda wf: func(wf, *args, object_history=object_history, **kwargs))
[docs] def mspass_dask_map( self, func, *args, global_history=None, object_history=False, alg_id=None, alg_name=None, parameters=None, **kwargs, ): """ This decorator method add more functionaliy on the standard dask map method and be a part of member functions in the dask bag library. Instead of performing the normal map function, if user provides global history manager, alg_id(optional), alg_name(optional) and parameters(optional) as input, the global history manager will log down the usage of the algorithm. Also, if user set object_history to be True, then each mspass object in this map function will save the object level history. :param func: target function :param global_history: a user specified global history manager :type global_history: :class:`GlobalHistoryManager` :param object_history: save the each object's history in the map when True :param alg_id: a user specified alg_id for the map operation :type alg_id: :class:`str`/:class:`bson.objectid.ObjectId` :param alg_name: a user specified alg_name for the map operation :type alg_name: :class:`str` :param parameters: a user specified parameters for the map operation :type parameters: :class:`str` :return: a dask `bag` format of objects. """ if parameters: parameterGTree = parameter_to_GTree(parameters_str=parameters) else: new_kwargs = kwargs.copy() new_kwargs["object_history"] = object_history if alg_name: new_kwargs["alg_name"] = alg_name if alg_id: new_kwargs["alg_id"] = alg_id parameterGTree = parameter_to_GTree(*args, **new_kwargs) parameters_json = json.dumps(parameterGTree.asdict()) if not alg_name: alg_name = func.__name__ # get the alg_id if exists, else create a new one if not alg_id: # get the alg_id if exists alg_id = global_history.get_alg_id(alg_name, parameters_json) # else create a new one if not alg_id: alg_id = ObjectId() # save the global history if global_history: global_history.logging(alg_id, alg_name, parameters_json) # read_data method if alg_name.rfind("read_data") != -1 and alg_name.rfind("read_data") + 9 == len( alg_name ): if global_history: return self.map( lambda wf: func( wf, *args, alg_name=alg_name, alg_id=str(alg_id), **kwargs ) ) else: return self.map(lambda wf: func(wf, *args, **kwargs)) # save_data method if alg_name.rfind("save_data") != -1 and alg_name.rfind("save_data") + 9 == len( alg_name ): # (return_code, mspass_object) is return for save_data, otherwise the original mspass_object is unchanged if global_history: return self.map( lambda wf: ( func(wf, *args, alg_name=alg_name, alg_id=str(alg_id), **kwargs), wf, ) ) else: return self.map(lambda wf: (func(wf, *args, **kwargs), wf)) # save the object history if object_history: return self.map( func, *args, object_history=object_history, alg_name=alg_name, alg_id=str(alg_id), **kwargs, ) return self.map(func, *args, object_history=object_history, **kwargs)
[docs] def mspass_spark_reduce( self, func, *args, global_history=None, object_history=False, alg_id=None, alg_name=None, parameters=None, **kwargs, ): """ This decorator method add more functionaliy on the standard spark reduce method and be a part of member functions in the spark RDD library. Instead of performing the normal reduce function, if user provides global history manager, alg_id(optional), alg_name(optional) and parameters(optional) as input, the global history manager will log down the usage of the algorithm. Also, if user set object_history to be True, then each mspass object in this reduce function will save the object level history. :param func: target function :param global_history: a user specified global history manager :type global_history: :class:`GlobalHistoryManager` :param object_history: save the each object's history in the reduce when True :param alg_id: a user specified alg_id for the reduce operation :type alg_id: :class:`str`/:class:`bson.objectid.ObjectId` :param alg_name: a user specified alg_name for the reduce operation :type alg_name: :class:`str` :param parameters: a user specified parameters for the reduce operation :type parameters: :class:`str` :return: a spark `RDD` format of objects. """ if parameters: parameterGTree = parameter_to_GTree(parameters_str=parameters) else: new_kwargs = kwargs.copy() new_kwargs["object_history"] = object_history if alg_name: new_kwargs["alg_name"] = alg_name if alg_id: new_kwargs["alg_id"] = alg_id parameterGTree = parameter_to_GTree(*args, **new_kwargs) parameters_json = json.dumps(parameterGTree.asdict()) if not alg_name: alg_name = func.__name__ # get the alg_id if exists, else create a new one if not alg_id: # get the alg_id if exists alg_id = global_history.get_alg_id(alg_name, parameters_json) # else create a new one if not alg_id: alg_id = ObjectId() # save the global history if global_history: global_history.logging(alg_id, alg_name, parameters_json) # save the object history if object_history: return self.reduce( lambda a, b: func( a, b, *args, object_history=object_history, alg_name=alg_name, alg_id=str(alg_id), **kwargs, ) ) return self.reduce( lambda a, b: func(a, b, *args, object_history=object_history, **kwargs) )
[docs] def mspass_dask_fold( self, func, *args, global_history=None, object_history=False, alg_id=None, alg_name=None, parameters=None, **kwargs, ): """ This decorator method add more functionaliy on the standard dask fold method and be a part of member functions in the dask bag library. Instead of performing the normal fold function, if user provides global history manager, alg_id(optional), alg_name(optional) and parameters(optional) as input, the global history manager will log down the usage of the algorithm. Also, if user set object_history to be True, then each mspass object in this fold function will save the object level history. :param func: target function :param global_history: a user specified global history manager :type global_history: :class:`GlobalHistoryManager` :param object_history: save the each object's history in the fold when True :param alg_id: a user specified alg_id for the fold operation :type alg_id: :class:`str`/:class:`bson.objectid.ObjectId` :param alg_name: a user specified alg_name for the fold operation :type alg_name: :class:`str` :param parameters: a user specified parameters for the fold operation :type parameters: :class:`str` :return: a dask `bag` format of objects. """ if parameters: parameterGTree = parameter_to_GTree(parameters_str=parameters) else: new_kwargs = kwargs.copy() new_kwargs["object_history"] = object_history if alg_name: new_kwargs["alg_name"] = alg_name if alg_id: new_kwargs["alg_id"] = alg_id parameterGTree = parameter_to_GTree(*args, **new_kwargs) parameters_json = json.dumps(parameterGTree.asdict()) if not alg_name: alg_name = func.__name__ # get the alg_id if exists, else create a new one if not alg_id: # get the alg_id if exists alg_id = global_history.get_alg_id(alg_name, parameters_json) # else create a new one if not alg_id: alg_id = ObjectId() # save the global history if global_history: global_history.logging(alg_id, alg_name, parameters_json) # save the object history if object_history: return self.fold( lambda a, b: func( a, b, *args, object_history=object_history, alg_name=alg_name, alg_id=str(alg_id), **kwargs, ) ) return self.fold( lambda a, b: func(a, b, *args, object_history=object_history, **kwargs) )
[docs] class GlobalHistoryManager: """ A Global History Mananger handler. This is a handler used in the mspass_client, normally user should not directly create a Global History Manager by his own. Instead, user should get the Global History Manager through mspass client's methods. """ def __init__(self, database_instance, job_name, collection=None): self.job_name = job_name # generate an bson UUID for this job, should be unique on the application level self.job_id = ObjectId() self.history_db = database_instance self.collection = collection if not self.collection: # use the `history` collection defined in database schema self.collection = self.history_db.database_schema.default_name( "history_global" ) # create unique index -> (alg_name, parameters) self.history_db[self.collection].create_index( [("alg_name", pymongo.TEXT), ("parameters", pymongo.TEXT)], ) # modify pyspark/dask map to our defined map try: daskbag.Bag.mspass_map = mspass_dask_map except NameError: pass try: pyspark.RDD.mspass_map = mspass_spark_map except NameError: pass # modify pyspark/dask reduce to our defined reduce try: daskbag.Bag.mspass_reduce = mspass_dask_fold except NameError: pass try: pyspark.RDD.mspass_reduce = mspass_spark_reduce except NameError: pass
[docs] def logging(self, alg_id, alg_name, parameters): """ Save the usage of the algorithm in the map/reduce operation :param alg_id: the UUID of the combination of algorithm_name and parameters :type alg_id: :class:`bson.objectid.ObjectId` :param alg_name: the name of the algorithm :type alg_name: :class:`str` :param parameters: the parameters of the algorithm :type parameters: :class:`str` """ # current timestamp when logging into database timestamp = datetime.now(timezone.utc).timestamp() self.history_db[self.collection].insert_one( { "time": timestamp, "job_id": self.job_id, "job_name": self.job_name, "alg_name": alg_name, "alg_id": alg_id, "parameters": parameters, } )
[docs] def get_alg_id(self, alg_name, parameters): """ Save the usage of the algorithm in the map/reduce operation :param alg_name: the name of the algorithm :type alg_name: :class:`str` :param alg_id: the UUID of the combination of algorithm_name and parameters :type alg_id: :class:`bson.objectid.ObjectId` :param parameters: the parameters of the algorithm :type parameters: :class:`str` """ # no alg_name and parameters combination in the database if not self.history_db[self.collection].count_documents( {"alg_name": alg_name, "parameters": parameters} ): return None doc = self.history_db[self.collection].find_one( {"alg_name": alg_name, "parameters": parameters} ) return doc["alg_id"]
[docs] def get_alg_list(self, job_name, job_id=None): """ Get a list of history records by job name(and job_id) :param job_name: the name of the job :type job_name: :class:`str` :param job_id: the UUID of the job :type job_id: :class:`bson.objectid.ObjectId` """ query = {"job_name": job_name} if job_id: query["job_id"] = job_id alg_list = [] docs = self.history_db[self.collection].find(query) for doc in docs: alg_list.append(doc) return alg_list
[docs] def set_alg_name_and_parameters(self, alg_id, alg_name, parameters): """ Set the alg_name and parameters by a user specified alg_id :param alg_id: the UUID of the combination of algorithm_name and parameters, used to find the records :type alg_id: :class:`bson.objectid.ObjectId` :param alg_name: the name of the algorithm user would like to set :type alg_name: :class:`str` :param parameters: the parameters of the algorithm user would like to set :type parameters: :class:`str` """ doc = self.history_db[self.collection].find_one({"alg_id": alg_id}) if not doc: raise MsPASSError("No such history record with alg_id = " + alg_id, "Fatal") update_dict = {} update_dict["alg_name"] = alg_name update_dict["parameters"] = parameters self.history_db[self.collection].update_many( {"alg_id": alg_id}, {"$set": update_dict} )