import os
import yaml
import pymongo
import collections
import json
try:
    import dask.bag as daskbag
except ImportError:
    pass
try:
    import pyspark
except ImportError:
    pass
from bson.objectid import ObjectId
from mspasspy.ccore.utility import MsPASSError, AntelopePf
from mspasspy.util.converter import AntelopePf2dict
from datetime import datetime
from dill.source import getsource
import functools
import mspasspy.algorithms.signals as signals
from mspasspy.global_history.ParameterGTree import ParameterGTree, parameter_to_GTree
[docs]
def mspass_map(
    data,
    func,
    global_history=None,
    object_history=False,
    alg_id=None,
    alg_name=None,
    parameters=None,
):
    """
     This decorator method performs map function in Python. Instead of performing the normal map function,
     if user provides global history manager, alg_id(optional), alg_name(optional) and parameters(optional)
     as input, the global history manager will log down the usage of the algorithm. Also, if user set
     object_history to be True, then each mspass object in this map function will save the object level history.
    :param data: a iterable which is to be mapped.
    :param func: target function to which map passes each element of given iterable.
    :param global_history: a user specified global history manager
    :type global_history: :class:`GlobalHistoryManager`
    :param object_history: save the each object's history in the map when True
    :param alg_id: a user specified alg_id for the map operation
    :type alg_id: :class:`str`/:class:`bson.objectid.ObjectId`
    :param alg_name: a user specified alg_name for the map operation
    :type alg_name: :class:`str`
    :param parameters: a user specified parameters for the map operation
    :type parameters: :class:`str`
    :return: mapped objects.
    """
    if parameters:
        parameterGTree = parameter_to_GTree(parameters_str=parameters)
    else:
        new_kwargs = {}
        new_kwargs["object_history"] = object_history
        if alg_name:
            new_kwargs["alg_name"] = alg_name
        if alg_id:
            new_kwargs["alg_id"] = alg_id
        parameterGTree = parameter_to_GTree(**new_kwargs)
    parameters_json = json.dumps(parameterGTree.asdict())
    if not alg_name:
        alg_name = func.__name__
    # get the alg_id if exists, else create a new one
    if not alg_id:
        # get the alg_id if exists
        if global_history:
            alg_id = global_history.get_alg_id(alg_name, parameters_json)
        # else create a new one
        if not alg_id:
            alg_id = ObjectId()
    # save the global history
    if global_history:
        global_history.logging(alg_id, alg_name, parameters_json)
    # save the object history
    if object_history:
        return map(
            lambda wf: func(
                wf,
                object_history=object_history,
                alg_name=alg_name,
                alg_id=str(alg_id),
            ),
            data,
        )
    return map(lambda wf: func(wf, object_history=object_history), data) 
[docs]
def mspass_reduce(
    data,
    func,
    global_history=None,
    object_history=False,
    alg_id=None,
    alg_name=None,
    parameters=None,
):
    """
     This method performs reduce function using functools. Instead of performing the normal reduce function,
     if user provides global history manager, alg_id(optional), alg_name(optional) and parameters(optional)
     as input, the global history manager will log down the usage of the algorithm. Also, if user set
     object_history to be True, then each mspass object in this reduce function will save the object level history.
    :param data: data to be processed, it needs to be a iterable. Apply func of two arguments cumulatively
     to the items of iterable, from left to right, so as to reduce the iterable to a single value.
    :param func: target function
    :param global_history: a user specified global history manager
    :type global_history: :class:`GlobalHistoryManager`
    :param object_history: save the each object's history in the reduce when True
    :param alg_id: a user specified alg_id for the reduce operation
    :type alg_id: :class:`str`/:class:`bson.objectid.ObjectId`
    :param alg_name: a user specified alg_name for the reduce operation
    :type alg_name: :class:`str`
    :param parameters: a user specified parameters for the reduce operation
    :type parameters: :class:`str`
    :return: reduced objects.
    """
    if parameters:
        parameterGTree = parameter_to_GTree(parameters_str=parameters)
    else:
        new_kwargs = {}
        new_kwargs["object_history"] = object_history
        if alg_name:
            new_kwargs["alg_name"] = alg_name
        if alg_id:
            new_kwargs["alg_id"] = alg_id
        parameterGTree = parameter_to_GTree(**new_kwargs)
    parameters_json = json.dumps(parameterGTree.asdict())
    if not alg_name:
        alg_name = func.__name__
    # get the alg_id if exists, else create a new one
    if not alg_id:
        # get the alg_id if exists
        if global_history:
            alg_id = global_history.get_alg_id(alg_name, parameters_json)
        # else create a new one
        if not alg_id:
            alg_id = ObjectId()
    # save the global history
    if global_history:
        global_history.logging(alg_id, alg_name, parameters_json)
    # save the object history
    if object_history:
        return functools.reduce(
            lambda a, b: func(
                a,
                b,
                object_history=object_history,
                alg_name=alg_name,
                alg_id=str(alg_id),
            ),
            data,
        )
    return functools.reduce(
        lambda a, b: func(a, b, object_history=object_history), data
    ) 
[docs]
def mspass_spark_map(
    self,
    func,
    *args,
    global_history=None,
    object_history=False,
    alg_id=None,
    alg_name=None,
    parameters=None,
    **kwargs,
):
    """
     This decorator method add more functionaliy on the standard spark map method and be a part of member functions
     in the spark RDD library. Instead of performing the normal map function, if user provides global history manager,
     alg_id(optional), alg_name(optional) and parameters(optional) as input, the global history manager will log down
     the usage of the algorithm. Also, if user set object_history to be True, then each mspass object in this map function
     will save the object level history.
    :param func: target function
    :param global_history: a user specified global history manager
    :type global_history: :class:`GlobalHistoryManager`
    :param object_history: save the each object's history in the map when True
    :param alg_id: a user specified alg_id for the map operation
    :type alg_id: :class:`str`/:class:`bson.objectid.ObjectId`
    :param alg_name: a user specified alg_name for the map operation
    :type alg_name: :class:`str`
    :param parameters: a user specified parameters for the map operation
    :type parameters: :class:`str`
    :return: a spark `RDD` format of objects.
    """
    if parameters:
        parameterGTree = parameter_to_GTree(parameters_str=parameters)
    else:
        new_kwargs = kwargs.copy()
        new_kwargs["object_history"] = object_history
        if alg_name:
            new_kwargs["alg_name"] = alg_name
        if alg_id:
            new_kwargs["alg_id"] = alg_id
        parameterGTree = parameter_to_GTree(*args, **new_kwargs)
    parameters_json = json.dumps(parameterGTree.asdict())
    if not alg_name:
        alg_name = func.__name__
    # get the alg_id if exists, else create a new one
    if not alg_id:
        # get the alg_id if exists
        alg_id = global_history.get_alg_id(alg_name, parameters_json)
        # else create a new one
        if not alg_id:
            alg_id = ObjectId()
    # save the global history
    if global_history:
        global_history.logging(alg_id, alg_name, parameters_json)
    # read_data method
    if alg_name.rfind("read_data") != -1 and alg_name.rfind("read_data") + 9 == len(
        alg_name
    ):
        if global_history:
            return self.map(
                lambda wf: func(
                    wf, *args, alg_name=alg_name, alg_id=str(alg_id), **kwargs
                )
            )
        else:
            return self.map(lambda wf: func(wf, *args, **kwargs))
    # save_data method
    if alg_name.rfind("save_data") != -1 and alg_name.rfind("save_data") + 9 == len(
        alg_name
    ):
        # (return_code, mspass_object) is return for save_data, otherwise the original mspass_object is unchanged
        if global_history:
            return self.map(
                lambda wf: (
                    func(wf, *args, alg_name=alg_name, alg_id=str(alg_id), **kwargs),
                    wf,
                )
            )
        else:
            return self.map(lambda wf: (func(wf, *args, **kwargs), wf))
    # save the object history
    if object_history:
        return self.map(
            lambda wf: func(
                wf,
                *args,
                object_history=object_history,
                alg_name=alg_name,
                alg_id=str(alg_id),
                **kwargs,
            )
        )
    return self.map(lambda wf: func(wf, *args, object_history=object_history, **kwargs)) 
[docs]
def mspass_dask_map(
    self,
    func,
    *args,
    global_history=None,
    object_history=False,
    alg_id=None,
    alg_name=None,
    parameters=None,
    **kwargs,
):
    """
     This decorator method add more functionaliy on the standard dask map method and be a part of member functions
     in the dask bag library. Instead of performing the normal map function, if user provides global history manager,
     alg_id(optional), alg_name(optional) and parameters(optional) as input, the global history manager will log down
     the usage of the algorithm. Also, if user set object_history to be True, then each mspass object in this map function
     will save the object level history.
    :param func: target function
    :param global_history: a user specified global history manager
    :type global_history: :class:`GlobalHistoryManager`
    :param object_history: save the each object's history in the map when True
    :param alg_id: a user specified alg_id for the map operation
    :type alg_id: :class:`str`/:class:`bson.objectid.ObjectId`
    :param alg_name: a user specified alg_name for the map operation
    :type alg_name: :class:`str`
    :param parameters: a user specified parameters for the map operation
    :type parameters: :class:`str`
    :return: a dask `bag` format of objects.
    """
    if parameters:
        parameterGTree = parameter_to_GTree(parameters_str=parameters)
    else:
        new_kwargs = kwargs.copy()
        new_kwargs["object_history"] = object_history
        if alg_name:
            new_kwargs["alg_name"] = alg_name
        if alg_id:
            new_kwargs["alg_id"] = alg_id
        parameterGTree = parameter_to_GTree(*args, **new_kwargs)
    parameters_json = json.dumps(parameterGTree.asdict())
    if not alg_name:
        alg_name = func.__name__
    # get the alg_id if exists, else create a new one
    if not alg_id:
        # get the alg_id if exists
        alg_id = global_history.get_alg_id(alg_name, parameters_json)
        # else create a new one
        if not alg_id:
            alg_id = ObjectId()
    # save the global history
    if global_history:
        global_history.logging(alg_id, alg_name, parameters_json)
    # read_data method
    if alg_name.rfind("read_data") != -1 and alg_name.rfind("read_data") + 9 == len(
        alg_name
    ):
        if global_history:
            return self.map(
                lambda wf: func(
                    wf, *args, alg_name=alg_name, alg_id=str(alg_id), **kwargs
                )
            )
        else:
            return self.map(lambda wf: func(wf, *args, **kwargs))
    # save_data method
    if alg_name.rfind("save_data") != -1 and alg_name.rfind("save_data") + 9 == len(
        alg_name
    ):
        # (return_code, mspass_object) is return for save_data, otherwise the original mspass_object is unchanged
        if global_history:
            return self.map(
                lambda wf: (
                    func(wf, *args, alg_name=alg_name, alg_id=str(alg_id), **kwargs),
                    wf,
                )
            )
        else:
            return self.map(lambda wf: (func(wf, *args, **kwargs), wf))
    # save the object history
    if object_history:
        return self.map(
            func,
            *args,
            object_history=object_history,
            alg_name=alg_name,
            alg_id=str(alg_id),
            **kwargs,
        )
    return self.map(func, *args, object_history=object_history, **kwargs) 
[docs]
def mspass_spark_reduce(
    self,
    func,
    *args,
    global_history=None,
    object_history=False,
    alg_id=None,
    alg_name=None,
    parameters=None,
    **kwargs,
):
    """
     This decorator method add more functionaliy on the standard spark reduce method and be a part of member functions
     in the spark RDD library. Instead of performing the normal reduce function, if user provides global history manager,
     alg_id(optional), alg_name(optional) and parameters(optional) as input, the global history manager will log down
     the usage of the algorithm. Also, if user set object_history to be True, then each mspass object in this reduce function
     will save the object level history.
    :param func: target function
    :param global_history: a user specified global history manager
    :type global_history: :class:`GlobalHistoryManager`
    :param object_history: save the each object's history in the reduce when True
    :param alg_id: a user specified alg_id for the reduce operation
    :type alg_id: :class:`str`/:class:`bson.objectid.ObjectId`
    :param alg_name: a user specified alg_name for the reduce operation
    :type alg_name: :class:`str`
    :param parameters: a user specified parameters for the reduce operation
    :type parameters: :class:`str`
    :return: a spark `RDD` format of objects.
    """
    if parameters:
        parameterGTree = parameter_to_GTree(parameters_str=parameters)
    else:
        new_kwargs = kwargs.copy()
        new_kwargs["object_history"] = object_history
        if alg_name:
            new_kwargs["alg_name"] = alg_name
        if alg_id:
            new_kwargs["alg_id"] = alg_id
        parameterGTree = parameter_to_GTree(*args, **new_kwargs)
    parameters_json = json.dumps(parameterGTree.asdict())
    if not alg_name:
        alg_name = func.__name__
    # get the alg_id if exists, else create a new one
    if not alg_id:
        # get the alg_id if exists
        alg_id = global_history.get_alg_id(alg_name, parameters_json)
        # else create a new one
        if not alg_id:
            alg_id = ObjectId()
    # save the global history
    if global_history:
        global_history.logging(alg_id, alg_name, parameters_json)
    # save the object history
    if object_history:
        return self.reduce(
            lambda a, b: func(
                a,
                b,
                *args,
                object_history=object_history,
                alg_name=alg_name,
                alg_id=str(alg_id),
                **kwargs,
            )
        )
    return self.reduce(
        lambda a, b: func(a, b, *args, object_history=object_history, **kwargs)
    ) 
[docs]
def mspass_dask_fold(
    self,
    func,
    *args,
    global_history=None,
    object_history=False,
    alg_id=None,
    alg_name=None,
    parameters=None,
    **kwargs,
):
    """
     This decorator method add more functionaliy on the standard dask fold method and be a part of member functions
     in the dask bag library. Instead of performing the normal fold function, if user provides global history manager,
     alg_id(optional), alg_name(optional) and parameters(optional) as input, the global history manager will log down
     the usage of the algorithm. Also, if user set object_history to be True, then each mspass object in this fold function
     will save the object level history.
    :param func: target function
    :param global_history: a user specified global history manager
    :type global_history: :class:`GlobalHistoryManager`
    :param object_history: save the each object's history in the fold when True
    :param alg_id: a user specified alg_id for the fold operation
    :type alg_id: :class:`str`/:class:`bson.objectid.ObjectId`
    :param alg_name: a user specified alg_name for the fold operation
    :type alg_name: :class:`str`
    :param parameters: a user specified parameters for the fold operation
    :type parameters: :class:`str`
    :return: a dask `bag` format of objects.
    """
    if parameters:
        parameterGTree = parameter_to_GTree(parameters_str=parameters)
    else:
        new_kwargs = kwargs.copy()
        new_kwargs["object_history"] = object_history
        if alg_name:
            new_kwargs["alg_name"] = alg_name
        if alg_id:
            new_kwargs["alg_id"] = alg_id
        parameterGTree = parameter_to_GTree(*args, **new_kwargs)
    parameters_json = json.dumps(parameterGTree.asdict())
    if not alg_name:
        alg_name = func.__name__
    # get the alg_id if exists, else create a new one
    if not alg_id:
        # get the alg_id if exists
        alg_id = global_history.get_alg_id(alg_name, parameters_json)
        # else create a new one
        if not alg_id:
            alg_id = ObjectId()
    # save the global history
    if global_history:
        global_history.logging(alg_id, alg_name, parameters_json)
    # save the object history
    if object_history:
        return self.fold(
            lambda a, b: func(
                a,
                b,
                *args,
                object_history=object_history,
                alg_name=alg_name,
                alg_id=str(alg_id),
                **kwargs,
            )
        )
    return self.fold(
        lambda a, b: func(a, b, *args, object_history=object_history, **kwargs)
    ) 
[docs]
class GlobalHistoryManager:
    """
    A Global History Mananger handler.
    This is a handler used in the mspass_client, normally user should not directly create
    a Global History Manager by his own. Instead, user should get the Global History Manager
    through mspass client's methods.
    """
    def __init__(self, database_instance, job_name, collection=None):
        self.job_name = job_name
        # generate an bson UUID for this job, should be unique on the application level
        self.job_id = ObjectId()
        self.history_db = database_instance
        self.collection = collection
        if not self.collection:
            # use the `history` collection defined in database schema
            self.collection = self.history_db.database_schema.default_name(
                "history_global"
            )
        # create unique index -> (alg_name, parameters)
        self.history_db[self.collection].create_index(
            [("alg_name", pymongo.TEXT), ("parameters", pymongo.TEXT)],
        )
        # modify pyspark/dask map to our defined map
        try:
            daskbag.Bag.mspass_map = mspass_dask_map
        except NameError:
            pass
        try:
            pyspark.RDD.mspass_map = mspass_spark_map
        except NameError:
            pass
        # modify pyspark/dask reduce to our defined reduce
        try:
            daskbag.Bag.mspass_reduce = mspass_dask_fold
        except NameError:
            pass
        try:
            pyspark.RDD.mspass_reduce = mspass_spark_reduce
        except NameError:
            pass
[docs]
    def logging(self, alg_id, alg_name, parameters):
        """
        Save the usage of the algorithm in the map/reduce operation
        :param alg_id: the UUID of the combination of algorithm_name and parameters
        :type alg_id: :class:`bson.objectid.ObjectId`
        :param alg_name: the name of the algorithm
        :type alg_name: :class:`str`
        :param parameters: the parameters of the algorithm
        :type parameters: :class:`str`
        """
        # current timestamp when logging into database
        timestamp = datetime.utcnow().timestamp()
        self.history_db[self.collection].insert_one(
            {
                "time": timestamp,
                "job_id": self.job_id,
                "job_name": self.job_name,
                "alg_name": alg_name,
                "alg_id": alg_id,
                "parameters": parameters,
            }
        ) 
[docs]
    def get_alg_id(self, alg_name, parameters):
        """
        Save the usage of the algorithm in the map/reduce operation
        :param alg_name: the name of the algorithm
        :type alg_name: :class:`str`
        :param alg_id: the UUID of the combination of algorithm_name and parameters
        :type alg_id: :class:`bson.objectid.ObjectId`
        :param parameters: the parameters of the algorithm
        :type parameters: :class:`str`
        """
        # no alg_name and parameters combination in the database
        if not self.history_db[self.collection].count_documents(
            {"alg_name": alg_name, "parameters": parameters}
        ):
            return None
        doc = self.history_db[self.collection].find_one(
            {"alg_name": alg_name, "parameters": parameters}
        )
        return doc["alg_id"] 
[docs]
    def get_alg_list(self, job_name, job_id=None):
        """
        Get a list of history records by job name(and job_id)
        :param job_name: the name of the job
        :type job_name: :class:`str`
        :param job_id: the UUID of the job
        :type job_id: :class:`bson.objectid.ObjectId`
        """
        query = {"job_name": job_name}
        if job_id:
            query["job_id"] = job_id
        alg_list = []
        docs = self.history_db[self.collection].find(query)
        for doc in docs:
            alg_list.append(doc)
        return alg_list 
[docs]
    def set_alg_name_and_parameters(self, alg_id, alg_name, parameters):
        """
        Set the alg_name and parameters by a user specified alg_id
        :param alg_id: the UUID of the combination of algorithm_name and parameters, used to find the records
        :type alg_id: :class:`bson.objectid.ObjectId`
        :param alg_name: the name of the algorithm user would like to set
        :type alg_name: :class:`str`
        :param parameters: the parameters of the algorithm user would like to set
        :type parameters: :class:`str`
        """
        doc = self.history_db[self.collection].find_one({"alg_id": alg_id})
        if not doc:
            raise MsPASSError("No such history record with alg_id = " + alg_id, "Fatal")
        update_dict = {}
        update_dict["alg_name"] = alg_name
        update_dict["parameters"] = parameters
        self.history_db[self.collection].update_many(
            {"alg_id": alg_id}, {"$set": update_dict}
        )