"""
Tools to define the schema of Metadata.
"""
import os
import yaml
import schema
import bson.objectid
import mspasspy.ccore.seismic
from mspasspy.ccore.utility import MsPASSError
[docs]
class SchemaBase:
    def __init__(self, schema_file=None):
        self._attr_dict = {}
        if schema_file is None:
            if "MSPASS_HOME" in os.environ:
                schema_file = (
                    os.path.abspath(os.environ["MSPASS_HOME"])
                    + "/data/yaml/mspass.yaml"
                )
            else:
                schema_file = os.path.abspath(
                    os.path.dirname(__file__) + "/../data/yaml/mspass.yaml"
                )
        elif not os.path.isfile(schema_file):
            if "MSPASS_HOME" in os.environ:
                schema_file = os.path.join(
                    os.path.abspath(os.environ["MSPASS_HOME"]), "data/yaml", schema_file
                )
            else:
                schema_file = os.path.abspath(
                    os.path.join(os.path.dirname(__file__), "../data/yaml", schema_file)
                )
        try:
            with open(schema_file, "r") as stream:
                schema_dic = yaml.safe_load(stream)
        except yaml.YAMLError as e:
            raise MsPASSError(
                "Cannot parse schema definition file: " + schema_file, "Fatal"
            ) from e
        except EnvironmentError as e:
            raise MsPASSError(
                "Cannot open schema definition file: " + schema_file, "Fatal"
            ) from e
        try:
            _check_format(schema_dic)
        except schema.SchemaError as e:
            raise MsPASSError("The schema definition is not valid", "Fatal") from e
        self._raw = schema_dic
    def __getitem__(self, key):
        try:
            return getattr(self, key)
        except AttributeError as ae:
            raise MsPASSError(
                "The schema of " + key + " is not defined", "Invalid"
            ) from ae
    def __delitem__(self, key):
        try:
            return delattr(self, key)
        except AttributeError as ae:
            raise MsPASSError(
                "The schema of " + key + " is not defined", "Invalid"
            ) from ae
    def __contains__(self, key):
        return key in self._attr_dict 
[docs]
class SchemaDefinitionBase:
    _main_dic = {}
    _alias_dic = {}
    _required_keys = []
    _xref_keys = []
[docs]
    def add(self, name, attr):
        """
        Add a new entry to the definitions. Note that because the internal
        container is `dict` if attribute for name is already present it
        will be silently replaced.
        :param name: The name of the attribute to be added
        :type name: str
        :param attr: A dictionary that defines the property of the added attribute.
            Note that the type must be defined.
        :type attr: dict
        :raises mspasspy.ccore.utility.MsPASSError: if type is not defined in attr
        """
        self._main_dic[name] = attr
        if "aliases" in attr:
            for als in attr["aliases"]:
                self._alias_dic[als] = name
        if attr["constraint"] == "required":
            self._required_keys.append(name)
        elif attr["constraint"] == "xref_key":
            self._xref_keys.append(name) 
[docs]
    def add_alias(self, key, aliasname):
        """
        Add an alias for key
        :param key: key to be added
        :type key: str
        :param aliasname: aliasname to be added
        :type aliasname: str
        """
        self._main_dic[key]["aliases"].append(aliasname)
        self._alias_dic[aliasname] = key 
[docs]
    def remove_alias(self, alias):
        """
        Remove an alias. Will silently ignore if alias is not found
        :param alias: alias to be removed
        :type alias: str
        """
        if alias in self._alias_dic:
            key = self._alias_dic[alias]
            self._main_dic[key]["aliases"].remove(alias)
            del self._alias_dic[alias] 
[docs]
    def aliases(self, key):
        """
        Get a list of aliases for a given key.
        :param key: The unique key that has aliases defined.
        :type key: str
        :return: A list of aliases associated to the key.
        :rtype: list
        """
        return (
            None
            if "aliases" not in self._main_dic[key]
            else self._main_dic[key]["aliases"]
        ) 
[docs]
    def apply_aliases(self, md, alias):
        """
        Apply a set of aliases to a data object.
        This method will change the unique keys of a data object into aliases.
        The alias argument can either be a path to a valid yaml file of
        key:alias pairs or a dict. If the "key" is an alias itself, it will
        be converted to its corresponding unique name before being used to
        change to the alias. It will also add the applied alias to the schema's
        internal alias container such that the same schema object can be used
        to convert the alias back.
        :param md: Data object to be altered. Normally a :class:`mspasspy.ccore.seismic.Seismogram`
            or :class:`mspasspy.ccore.seismic.TimeSeries` but can be a raw :class:`mspasspy.ccore.utility.Metadata`.
        :type md: :class:`mspasspy.ccore.utility.Metadata`
        :param alias: a yaml file or a dict that have pairs of key:alias
        :type alias: dict/str
        """
        alias_dic = alias
        if isinstance(alias, str) and os.path.isfile(alias):
            try:
                with open(alias, "r") as stream:
                    alias_dic = yaml.safe_load(stream)
            except yaml.YAMLError as e:
                raise MsPASSError(
                    "Cannot parse alias definition file: " + alias, "Fatal"
                ) from e
            except EnvironmentError as e:
                raise MsPASSError(
                    "Cannot open alias definition file: " + alias, "Fatal"
                ) from e
        if isinstance(alias_dic, dict):
            for k, a in alias_dic.items():
                unique_k = self.unique_name(k)
                self.add_alias(unique_k, a)
                md.change_key(unique_k, a)
        else:
            raise MsPASSError(
                "The alias argument of type {} is not recognized, it should be either a {} path or a {}".format(
                    type(alias), str, dict
                ),
                "Fatal",
            ) 
[docs]
    def clear_aliases(self, md):
        """
        Restore any aliases to unique names.
        Aliases are needed to support legacy packages, but can cause downstream problem
        if left intact. This method clears any aliases and sets them to the unique_name
        defined by this object. Note that if the unique_name is already defined, it will
        silently remove the alias only.
        :param md: Data object to be altered. Normally a :class:`mspasspy.ccore.seismic.Seismogram`
            or :class:`mspasspy.ccore.seismic.TimeSeries` but can be a raw :class:`mspasspy.ccore.utility.Metadata`.
        :type md: :class:`mspasspy.ccore.utility.Metadata`
        """
        for key in md.keys():
            if self.is_alias(key):
                if self.unique_name(key) not in md:
                    md.change_key(key, self.unique_name(key))
                else:
                    del md[key] 
[docs]
    def concept(self, key):
        """
        Return a description of the concept this attribute defines.
        :param key: The name that defines the attribute of interest
        :type key: str
        :return: A string with a terse description of the concept this attribute defines
        :rtype: str
        :raises mspasspy.ccore.utility.MsPASSError: if concept is not defined
        """
        if "concept" not in self._main_dic[key]:
            raise MsPASSError("concept is not defined for " + key, "Complaint")
        return self._main_dic[key]["concept"] 
[docs]
    def has_alias(self, key):
        """
        Test if a key has registered aliases
        Sometimes it is helpful to have alias keys to define a common concept.
        For instance, if an attribute is loaded from a relational db one might
        want to use alias names of the form table.attribute as an alias to
        attribute. has_alias should be called first to establish if a name has
        an alias. To get a list of aliases call the aliases method.
        :param key: key to be tested
        :type key: str
        :return: `True` if the key has aliases, else or if key is not defined return `False`
        :rtype: bool
        """
        return key in self._main_dic and "aliases" in self._main_dic[key] 
[docs]
    def is_alias(self, key):
        """
        Test if a key is a registered alias
        This asks the inverse question to has_alias. That is, it yields true of the key is
        registered as a valid alias. It returns false if the key is not defined at all. Note
        it will yield false if the key is a registered unique name and not an alias.
        :param key: key to be tested
        :type key: str
        :return: `True` if the key is a alias, else return `False`
        :rtype: bool
        """
        return key in self._alias_dic 
[docs]
    def is_defined(self, key):
        """
        Test if a key is defined either as a unique key or an alias
        :param key: key to be tested
        :type key: str
        :return: `True` if the key is defined
        :rtype: bool
        """
        return key in self._main_dic or key in self._alias_dic 
[docs]
    def keys(self):
        """
        Get a list of all the unique keys defined.
        :return: list of all the unique keys defined
        :rtype: list
        """
        return self._main_dic.keys() 
[docs]
    def type(self, key):
        """
        Return the type of an attribute. If not recognized, it returns None.
        :param key: The name that defines the attribute of interest
        :type key: str
        :return: type of the attribute associated with ``key``
        :rtype: :class:`type`
        """
        tp = self._main_dic[key]["type"].strip().casefold()
        if tp in ["int", "integer"]:
            return int
        if tp in ["double", "float"]:
            return float
        if tp in ["str", "string"]:
            return str
        if tp in ["bool", "boolean"]:
            return bool
        if tp in ["dict"]:
            return dict
        if tp in ["list"]:
            return list
        if tp in ["objectid"]:
            return bson.objectid.ObjectId
        if tp in ["bytes", "byte", "object"]:
            return bytes
        return None 
[docs]
    def unique_name(self, aliasname):
        """
        Get definitive name for an alias.
        This method is used to ask the opposite question as aliases. The aliases method
        returns all acceptable alternatives to a definitive name defined as the key to
        get said list. This method asks what definitive key should be used to fetch an
        attribute. Note that if the input is already the unique name, it will return itself.
        :param aliasname: the name of the alias for which we want the definitive key
        :type aliasname: str
        :return: the name of the definitive key
        :rtype: str
        :raises mspasspy.ccore.utility.MsPASSError: if aliasname is not defined
        """
        if aliasname in self._main_dic:
            return aliasname
        if aliasname in self._alias_dic:
            return self._alias_dic[aliasname]
        raise MsPASSError(aliasname + " is not defined", "Invalid") 
[docs]
    def required_keys(self):
        """
        Return all the required keys in the current collection as a list
        :return: type of data associated with the collection
        :rtype: a :class:`list` of :class:`str`
        """
        return self._required_keys 
[docs]
    def xref_keys(self):
        """
        Return all the xref keys in the current collection as a list
        :return: type of data associated with the collection
        :rtype: a :class:`list` of :class:`str`
        """
        return self._xref_keys 
[docs]
    def constraint(self, key):
        """
        Return a description of the constraint this attribute defines.
        :param key: The name that defines the attribute of interest
        :type key: str
        :return: A string with a terse description of the constraint this attribute defines
        :rtype: str
        """
        return self._main_dic[key]["constraint"] 
[docs]
    def is_required(self, key):
        """
        Test if the constraint of the key is required to the schema
        :param key: key to be tested
        :type key: str
        :return: `True` if the constraint of the key is required
        :rtype: bool
        """
        return self._main_dic[key]["constraint"] == "required" 
[docs]
    def is_xref_key(self, key):
        """
        Test if the constraint of the key is xref_key to the schema
        :param key: key to be tested
        :type key: str
        :return: `True` if the constraint of the key is xref_key
        :rtype: bool
        """
        return self._main_dic[key]["constraint"] == "xref_key" 
[docs]
    def is_normal(self, key):
        """
        Test if the constraint of the key is normal to the schema
        :param key: key to be tested
        :type key: str
        :return: `True` if the constraint of the key is normal
        :rtype: bool
        """
        return self._main_dic[key]["constraint"] == "normal" 
[docs]
    def is_optional(self, key):
        """
        Test if the constraint of the key is optional to the schema
        :param key: key to be tested
        :type key: str
        :return: `True` if the constraint of the key is optional
        :rtype: bool
        """
        return self._main_dic[key]["constraint"] == "optional" 
 
[docs]
class DatabaseSchema(SchemaBase):
    def __init__(self, schema_file=None):
        super().__init__(schema_file)
        self._default_dic = {}
        for collection in self._raw["Database"]:
            schemadef = DBSchemaDefinition(self._raw["Database"], collection)
            setattr(self, collection, schemadef)
            self._attr_dict[collection] = schemadef
            if "default" in self._raw["Database"][collection]:
                self._default_dic[self._raw["Database"][collection]["default"]] = (
                    collection
                )
    def __setitem__(self, key, value):
        if not isinstance(value, DBSchemaDefinition):
            raise MsPASSError("value is not a DBSchemaDefinition", "Invalid")
        setattr(self, key, value)
        self._attr_dict[key] = value
        self._default_dic[key] = key
[docs]
    def default(self, name):
        """
        Return the schema definition of a default collection.
        This method is used when multiple collections of the same concept is defined.
        For example, the wf_TimeSeries and wf_Seismogram are both collections that
        are used for data objects (characterized by their common wf prefix). The
        Database API needs a default wf collection to operate on when no collection
        name is explicitly given. In this case, this default_name method can be used.
        Note that if requested name has no default collection defined and it is a
        defined collection, it will treat that collection itself as the default.
        :param name: The requested default collection
        :type name: str
        :return: the schema definition of the default collection
        :rtype: :class:`mspasspy.db.schema.DBSchemaDefinition`
        :raises mspasspy.ccore.utility.MsPASSError: if the name has no default defined
        """
        if name in self._default_dic:
            return getattr(self, self._default_dic[name])
        if name in self._raw["Database"]:
            return getattr(self, name)
        raise MsPASSError(name + " has no default defined", "Invalid") 
[docs]
    def default_name(self, name):
        """
        Return the name of a default collection.
        This method is behaves similar to the default method, but it only returns the
        name as a string instead.
        :param name: The requested default collection
        :type name: str
        :return: the name of the default collection
        :rtype: str
        :raises mspasspy.ccore.utility.MsPASSError: if the name has no default defined
        """
        if name in self._default_dic:
            return self._default_dic[name]
        if name in self._raw["Database"]:
            return name
        raise MsPASSError(name + " has no default defined", "Invalid") 
[docs]
    def set_default(self, collection: str, default: str = None):
        """
        Set a collection as the default.
        This method is used to change the default collections (e.g., switching between
        wf_TimeSeries and wf_Seismogram). If ``default`` is not given, it will try to
        infer one from ``collection`` at the first occurrence of "_"
        (e.g., wf_TimeSeries will become wf).
        :param collection: The name of the targetting collection
        :type collection: str
        :param default: the default name to be set to
        :type default: str, optional
        """
        if not hasattr(self, collection):
            raise MsPASSError(collection + " is not a defined collection", "Invalid")
        if default is None:
            self._default_dic[collection.split("_", 1)[0]] = collection
        else:
            self._default_dic[default] = collection 
[docs]
    def unset_default(self, default: str):
        """
        Unset a default.
        This method does nothing if ``default`` is not defined.
        :param default: the default name to be unset
        :type default: str
        """
        if default in self._default_dic:
            self._default_dic.pop(default) 
 
[docs]
class DBSchemaDefinition(SchemaDefinitionBase):
    def __init__(self, schema_dic, collection_str):
        self._collection_str = collection_str
        self._main_dic = {}
        self._alias_dic = {}
        self._data_type = None
        self._required_keys = []
        self._xref_keys = []
        self._main_dic.update(schema_dic[collection_str]["schema"])
        for key, attr in self._main_dic.items():
            if "reference" in attr:
                k = key
                if k == attr["reference"] + "_id":
                    k = "_id"
                refer_dic = schema_dic[attr["reference"]]
                while "base" in refer_dic:
                    refer_dic = schema_dic[refer_dic["base"]]
                    if k in refer_dic["schema"]:
                        break
                foreign_attr = refer_dic["schema"][k]
                # The order of below operation matters. The behavior is that we only
                # extend attr with items from foreign_attr that are not defined in attr.
                # This garantees that the foreign_attr won't overwrite attr's exisiting keys.
                compiled_attr = dict(list(foreign_attr.items()) + list(attr.items()))
                self._main_dic[key] = compiled_attr
            if "aliases" in attr:
                self._alias_dic.update(
                    {item: key for item in attr["aliases"] if item != key}
                )
            if "constraint" in attr:
                if attr["constraint"] == "required":
                    self._required_keys.append(key)
                elif attr["constraint"] == "xref_key":
                    self._xref_keys.append(key)
        if "data_type" in schema_dic[collection_str]:
            self._data_type = schema_dic[collection_str]["data_type"]
[docs]
    def add(self, name, attr):
        """
        Add a new entry to the definitions. Note that because the internal
        container is `dict` if attribute for name is already present it
        will be silently replaced.
        :param name: The name of the attribute to be added
        :type name: str
        :param attr: A dictionary that defines the property of the added attribute.
            Note that the type must be defined.
        :type attr: dict
        :raises mspasspy.ccore.utility.MsPASSError: if type is not defined in attr
        """
        schema_v = _get_schema_definition_schema(None, "database", False)
        attr = schema_v.validate(attr)
        SchemaDefinitionBase.add(self, name, attr) 
[docs]
    def reference(self, key):
        """
        Return the collection name that a key is referenced from.
        :param key: the name of the key
        :type key: str
        :return: the name of the collection
        :rtype: str
        :raises mspasspy.ccore.utility.MsPASSError: if the key is not defined
        """
        if key not in self._main_dic:
            raise MsPASSError(key + " is not defined", "Invalid")
        return (
            self._collection_str
            if "reference" not in self._main_dic[key]
            else self._main_dic[key]["reference"]
        ) 
[docs]
    def data_type(self):
        """
        Return the data type that the collection is used to reference.
        If not recognized, it returns None.
        :return: type of data associated with the collection
        :rtype: :class:`type`
        """
        if self._data_type in ["TimeSeries", "timeseries"]:
            return mspasspy.ccore.seismic.TimeSeries
        if self._data_type in ["Seismogram", "seismogram"]:
            return mspasspy.ccore.seismic.Seismogram
        return None 
 
[docs]
class MDSchemaDefinition(SchemaDefinitionBase):
    def __init__(self, schema_dic, collection_str, dbschema):
        self._main_dic = {}
        self._alias_dic = {}
        self._required_keys = []
        self._xref_keys = []
        self._main_dic.update(schema_dic[collection_str]["schema"])
        for key, attr in self._main_dic.items():
            if "collection" in attr:
                s_key = key
                col_name = attr["collection"]
                if key.startswith(col_name):
                    s_key = key.replace(col_name + "_", "")
                # get the default name in case one is used in the dbschema
                col_name = dbschema.default_name(col_name)
                foreign_attr = getattr(dbschema, col_name)._main_dic[s_key]
                # The order of below operation matters. The behavior is that we only
                # extend attr with items from foreign_attr that are not defined in attr.
                # This garantees that the foreign_attr won't overwrite attr's exisiting keys.
                compiled_attr = dict(list(foreign_attr.items()) + list(attr.items()))
                self._main_dic[key] = compiled_attr
            # have to use "self._main_dic[key]" instead of attr here because the dict is updated above
            if "aliases" in self._main_dic[key]:
                self._alias_dic.update(
                    {
                        item: key
                        for item in self._main_dic[key]["aliases"]
                        if item != key
                    }
                )
            if "constraint" in self._main_dic[key]:
                if self._main_dic[key]["constraint"] == "required":
                    self._required_keys.append(key)
                elif self._main_dic[key]["constraint"] == "xref_key":
                    self._xref_keys.append(key)
[docs]
    def add(self, name, attr):
        """
        Add a new entry to the definitions. Note that because the internal
        container is `dict` if attribute for name is already present it
        will be silently replaced.
        :param name: The name of the attribute to be added
        :type name: str
        :param attr: A dictionary that defines the property of the added attribute.
            Note that the type must be defined.
        :type attr: dict
        :raises mspasspy.ccore.utility.MsPASSError: if type is not defined in attr
        """
        schema_v = _get_schema_definition_schema(None, "metadata", False)
        attr = schema_v.validate(attr)
        SchemaDefinitionBase.add(self, name, attr) 
[docs]
    def collection(self, key):
        """
        Return the collection name that a key belongs to
        :param key: the name of the key
        :type key: str
        :return: the name of the collection
        :rtype: str
        """
        return (
            None
            if "collection" not in self._main_dic[key]
            else self._main_dic[key]["collection"]
        ) 
[docs]
    def set_collection(self, key, collection, dbschema=None):
        """
        Set the collection name that a key belongs to. It optionally takes
        a dbschema argument and will set the attribute of that key with the
        corresponding one defined in the dbschema.
        :param key: the name of the key
        :type key: str
        :param collection: the name of the collection
        :type collection: str
        :param dbschema: the database schema used to set the attributes of the key.
        :type dbschema: :class:`mspasspy.db.schema.DatabaseSchema`
        :raises mspasspy.ccore.utility.MsPASSError: if the key is not defined
        """
        if key not in self._main_dic:
            raise MsPASSError(key + " is not defined", "Invalid")
        if dbschema:
            readonly = self.readonly(key)
            aliases = self.aliases(key)
            s_key = key
            col_name = collection
            if key.startswith(col_name):
                s_key = key.replace(col_name + "_", "")
            col_name = dbschema.default_name(col_name)
            foreign_attr = getattr(dbschema, col_name)._main_dic[s_key]
            self._main_dic[key] = foreign_attr
            if not readonly:
                self.set_writeable(key)
            if aliases:
                if "aliases" not in self._main_dic[key]:
                    self._main_dic[key]["aliases"] = aliases
                else:
                    [
                        self._main_dic[key]["aliases"].append(x)
                        for x in aliases
                        if x not in self._main_dic[key]["aliases"]
                    ]
                    self._alias_dic.update(
                        {
                            item: key
                            for item in self._main_dic[key]["aliases"]
                            if item != key
                        }
                    )
        self._main_dic[key]["collection"] = collection 
[docs]
    def swap_collection(self, original_collection, new_collection, dbschema=None):
        """
        Swap the collection name of all the keys of a matching colletions.
        It optionally takes a dbschema argument and will set the attribute
        of that key with the corresponding one defined in the dbschema. It
        will silently do nothing if no matching collection is defined.
        :param original_collection: the name of the collection to be swapped
        :type original_collection: str
        :param new_collection: the name of the collection to be changed into
        :type new_collection: str
        :param dbschema: the database schema used to set the attributes of the key.
        :type dbschema: :class:`mspasspy.db.schema.DatabaseSchema`
        """
        for key, attr in self._main_dic.items():
            if "collection" in attr and attr["collection"] == original_collection:
                self.set_collection(key, new_collection, dbschema) 
[docs]
    def readonly(self, key):
        """
        Check if an attribute is marked readonly.
        :param key: key to be tested
        :type key: str
        :return: `True` if the key is readonly or its readonly attribute is not defined, else return `False`
        :rtype: bool
        :raises mspasspy.ccore.utility.MsPASSError: if the key is not defined
        """
        if key not in self._main_dic:
            raise MsPASSError(key + " is not defined", "Invalid")
        return (
            True
            if "readonly" not in self._main_dic[key]
            else self._main_dic[key]["readonly"]
        ) 
[docs]
    def writeable(self, key):
        """
        Check if an attribute is writeable. Inverted logic from the readonly method.
        :param key: key to be tested
        :type key: str
        :return: `True` if the key is not readonly, else or its readonly attribute is not defined return `False`
        :rtype: bool
        :raises mspasspy.ccore.utility.MsPASSError: if the key is not defined
        """
        return not self.readonly(key) 
[docs]
    def set_readonly(self, key):
        """
        Lock an attribute to assure it will not be saved.
        Parameters can be defined readonly. That is a standard feature of this class,
        but is normally expected to be set on construction of the object.
        There are sometimes reason to lock out a parameter to keep it from being
        saved in output. This method allows this. On the other hand, use this feature
        only if you fully understand the downstream implications or you may experience
        unintended consequences.
        :param key: the key for the attribute with properties to be redefined
        :type key: str
        :raises mspasspy.ccore.utility.MsPASSError: if the key is not defined
        """
        if key not in self._main_dic:
            raise MsPASSError(key + " is not defined", "Invalid")
        self._main_dic[key]["readonly"] = True 
[docs]
    def set_writeable(self, key):
        """
        Force an attribute to be writeable.
        Normally some parameters are marked readonly on construction to avoid
        corrupting the database with inconsistent data defined with a common
        key. (e.g. sta) This method overrides such definitions for any key so
        marked. This method should be used with caution as it could have
        unintended side effects.
        :param key: the key for the attribute with properties to be redefined
        :type key: str
        :raises mspasspy.ccore.utility.MsPASSError: if the key is not defined
        """
        if key not in self._main_dic:
            raise MsPASSError(key + " is not defined", "Invalid")
        self._main_dic[key]["readonly"] = False 
 
"""below is all about _check_format utility function """
def _is_basic_type(s):
    """
    Helper function used to check if the value of the type attribute is valid
    :param s: the str value in the type attribute
    :type s: str
    :return: `True` if the str value is valid, else return `False`
    :rtype: bool
    """
    s = s.strip().casefold()
    if s in [
        "objectid",
        "int",
        "integer",
        "float",
        "double",
        "bool",
        "boolean",
        "str",
        "string",
        "list",
        "dict",
        "bytes",
        "byte",
        "object",
    ]:
        return True
    return False
def _check_min_default_key(db_dict):
    """
    Helper function used to check if there are at least 3 defined default keys, which are 'wf', 'elog' and 'history_object'
    :param db_dict: the database schema dictionary
    :type db_dict: dict
    :return: True if contains, else raise a schema.SchemaError
    :rtype: bool
    """
    default_key_set = set()
    for collection in db_dict:
        default_key_set.add(collection)
        if "default" in db_dict[collection]:
            default_key_set.add(db_dict[collection]["default"])
    if (
        "wf" not in default_key_set
        or "elog" not in default_key_set
        or "history_object" not in default_key_set
    ):
        raise schema.SchemaError(
            "wf, elog and history_object must be all defined as default key in a collection or as a collection name itself"
        )
    return True
def _is_valid_schema(dic, schema_):
    """
    Helper function used to validate all collections in a schema dictionary
    :param dic: a schema dictionary
    :type dic: dict
    :param schema_: the defined schema used to validate a database collection
    :type schema_: schema.Schema
    :return: True if all collections pass the validation
    :rtype: bool
    """
    for collection in dic:
        schema_.validate(dic[collection])
    return True
def _is_valid_database_schema_definition(dic, collection_name_list):
    """
    Helper function used to validate all attributes in a database schema definition dictionary
    :param dic: a schema definition dictionary
    :type dic: dict
    :param collection_name_list: a list of collection name to be used for reference
    :type collection_name_list: list
    :return: True if all attributes pass the validation
    :rtype: bool
    """
    for attr in dic:
        if "reference" in dic[attr]:
            dic[attr] = _get_schema_definition_schema(
                collection_name_list, "database", True
            ).validate(dic[attr])
        else:
            dic[attr] = _get_schema_definition_schema(
                collection_name_list, "database", False
            ).validate(dic[attr])
    return True
def _is_valid_metadata_schema_definition(dic, collection_name_list):
    """
    Helper function used to validate all attributes in a metadata schema definition dictionary
    :param dic: a schema definition dictionary
    :type dic: dict
    :param collection_name_list: a list of collection name to be used for reference
    :type collection_name_list: list
    :return: True if all attributes pass the validation
    :rtype: bool
    """
    for attr in dic:
        if "collection" in dic[attr]:
            dic[attr] = _get_schema_definition_schema(
                collection_name_list, "metadata", True
            ).validate(dic[attr])
        else:
            dic[attr] = _get_schema_definition_schema(
                collection_name_list, "metadata", False
            ).validate(dic[attr])
    return True
def _get_schema_definition_schema(collection_name_list, name, ref):
    """
    Helper function to get the schema for schema definitions
    :param collection_name_list: a list of collection name to be used for reference
    :type collection_name_list: list
    :param name: the name of the schema, which can be "database" or "metadata"
    :type name: str
    :param ref: whether the schema definition uses a reference key
    :type ref: bool
    :return: the schema for valication
    :rtype: schema.Schema
    """
    type_key = schema.And(str, lambda s: _is_basic_type(s))
    type_required_schema = {"type": type_key}
    type_optional_schema = {schema.Optional("type"): type_key}
    constraint_key = schema.And(
        str, lambda s: s in ["required", "xref_key", "normal", "optional"]
    )
    constraint_schema = {schema.Optional("constraint"): constraint_key}
    constraint_default_schema = {
        schema.Optional("constraint", default="normal"): constraint_key
    }
    common_db_schema = {
        # if collection_name_list is empty, the reference check will always pass. This is used in the add method of SchemaDefinition only.
        schema.Optional("reference"): schema.And(
            str, lambda s: s in collection_name_list if collection_name_list else True
        ),
        schema.Optional("concept"): str,
        # aliases can only be string or a list of string anything else will be converted automatically here
        schema.Optional("aliases"): schema.Use(
            lambda s: (
                [s]
                if type(s) is str
                else ([str(i) for i in s] if type(s) is list else str(s))
            )
        ),
    }
    common_md_schema = {
        schema.Optional("collection"): schema.And(
            str, lambda s: s in collection_name_list if collection_name_list else True
        ),
        schema.Optional("concept"): str,
        # aliases can only be string or a list of string anything else will be converted automatically here
        schema.Optional("aliases"): schema.Use(
            lambda s: (
                [s]
                if type(s) is str
                else ([str(i) for i in s] if type(s) is list else str(s))
            )
        ),
        schema.Optional("readonly"): bool,
    }
    if name == "database":
        if ref:
            return schema.Schema(
                dict(
                    list(common_db_schema.items())
                    + list(type_optional_schema.items())
                    + list(constraint_schema.items())
                ),
                ignore_extra_keys=True,
            )
        else:
            return schema.Schema(
                dict(
                    list(common_db_schema.items())
                    + list(type_required_schema.items())
                    + list(constraint_default_schema.items())
                ),
                ignore_extra_keys=True,
            )
    else:  # name == "metadata"
        if ref:
            return schema.Schema(
                dict(
                    list(common_md_schema.items())
                    + list(type_optional_schema.items())
                    + list(constraint_schema.items())
                ),
                ignore_extra_keys=True,
            )
        else:
            return schema.Schema(
                dict(
                    list(common_md_schema.items())
                    + list(type_required_schema.items())
                    + list(constraint_default_schema.items())
                ),
                ignore_extra_keys=True,
            )
def _check_format(schema_dic):
    """
    check if a mspass.yaml file user provides is valid or not
    :param schema_dic: the dictionary that a yaml file is dumped
    :type schema_dic: dict
    :return: `True` if the schema is valid, else return `False`
    :rtype: bool
    """
    # Make sure Database and Metadata exist
    schema.Schema({"Database": dict, "Metadata": dict}).validate(schema_dic)
    collection_name_list = schema_dic["Database"].keys()
    database_collection_schema = schema.Schema(
        {
            schema.Optional("default"): str,
            schema.Optional("data_type"): str,
            schema.Optional("base"): schema.And(
                str, lambda s: s in collection_name_list
            ),
            "schema": schema.And(
                dict,
                lambda dic: _is_valid_database_schema_definition(
                    dic, collection_name_list
                ),
            ),
        },
        ignore_extra_keys=True,
    )
    metadata_collection_schema = schema.Schema(
        {
            "schema": schema.And(
                dict,
                lambda dic: _is_valid_metadata_schema_definition(
                    dic, collection_name_list
                ),
            )
        },
        ignore_extra_keys=True,
    )
    yaml_schema = schema.Schema(
        {
            "Database": schema.And(
                dict,
                schema.And(
                    lambda dic: _is_valid_schema(dic, database_collection_schema),
                    lambda dic: _check_min_default_key(dic),
                ),
            ),
            "Metadata": schema.And(
                dict, lambda dic: _is_valid_schema(dic, metadata_collection_schema)
            ),
        },
        ignore_extra_keys=True,
    )
    yaml_schema.validate(schema_dic)