MsPASS C++ API  2.4.1.dev4+g92330b7a
Defines the C++ API for MsPASS
Loading...
Searching...
No Matches
Public Member Functions | Public Attributes | Protected Attributes | List of all members
mspass::utility::ProcessingHistory Class Reference

Lightweight class to preserve procesing chain of atomic objects. More...

#include <ProcessingHistory.h>

Inheritance diagram for mspass::utility::ProcessingHistory:
Inheritance graph
[legend]
Collaboration diagram for mspass::utility::ProcessingHistory:
Collaboration graph
[legend]

Public Member Functions

 ProcessingHistory ()
 
 ProcessingHistory (const std::string jobnm, const std::string jid)
 
 ProcessingHistory (const ProcessingHistory &parent)
 
bool is_empty () const
 
bool is_raw () const
 
bool is_origin () const
 
bool is_volatile () const
 
bool is_saved () const
 
size_t number_of_stages () override
 Return number of processing stages that have been applied to this object.
 
void set_as_origin (const std::string alg, const std::string algid, const std::string uuid, const AtomicType typ, bool define_as_raw=false)
 
std::string new_ensemble_process (const std::string alg, const std::string algid, const AtomicType typ, const std::vector< ProcessingHistory * > parents, const bool create_newid=true)
 
void add_one_input (const ProcessingHistory &data_to_add)
 Add one datum as an input for current data.
 
void add_many_inputs (const std::vector< ProcessingHistory * > &d)
 Define several data objects as inputs.
 
void merge (const ProcessingHistory &data_to_add)
 Merge the history nodes from another.
 
void accumulate (const std::string alg, const std::string algid, const AtomicType typ, const ProcessingHistory &newinput)
 Method to use with a spark reduce algorithm.
 
std::string clean_accumulate_uuids ()
 Clean up inconsistent uuids that can be produced by reduce.
 
std::string new_map (const std::string alg, const std::string algid, const AtomicType typ, const ProcessingStatus newstatus=ProcessingStatus::VOLATILE)
 Define this algorithm as a one-to-one map of same type data.
 
std::string new_map (const std::string alg, const std::string algid, const AtomicType typ, const ProcessingHistory &data_to_clone, const ProcessingStatus newstatus=ProcessingStatus::VOLATILE)
 Define this algorithm as a one-to-one map.
 
std::string map_as_saved (const std::string alg, const std::string algid, const AtomicType typ)
 Prepare the current data for saving.
 
void clear ()
 
std::multimap< std::string, mspass::utility::NodeDataget_nodes () const
 
int stage () const
 
ProcessingStatus status () const
 
std::string id () const
 
std::pair< std::string, std::string > created_by () const
 
NodeData current_nodedata () const
 
std::string newid ()
 
int number_inputs () const
 
int number_inputs (const std::string uuidstr) const
 
void set_id (const std::string newid)
 
std::list< mspass::utility::NodeDatainputs (const std::string id_to_find) const
 Return a list of data that define the inputs to a give uuids.
 
ProcessingHistoryoperator= (const ProcessingHistory &parent)
 
- Public Member Functions inherited from mspass::utility::BasicProcessingHistory
 BasicProcessingHistory (const std::string jobname, const std::string jobid)
 
 BasicProcessingHistory (const BasicProcessingHistory &parent)
 
std::string jobid () const
 
void set_jobid (const std::string &newjid)
 
std::string jobname () const
 
void set_jobname (const std::string jobname)
 
BasicProcessingHistoryoperator= (const BasicProcessingHistory &parent)
 

Public Attributes

ErrorLogger elog
 

Protected Attributes

std::multimap< std::string, mspass::utility::NodeDatanodes
 
- Protected Attributes inherited from mspass::utility::BasicProcessingHistory
std::string jid
 
std::string jnm
 

Detailed Description

Lightweight class to preserve procesing chain of atomic objects.

This class is intended to be used as a parent for any data object in MsPASS that should be considered atomic. It is designed to completely preserve the chain of processing algorithms applied to any atomic data to put it in it's current state. It is designed to save that information during processing with the core information that can then be saved to define the state. Writers for atomic objects inheriting this class should arrange to save the data contained in it to history collection in MongoDB. Note that actually doing the inverse is a different problem that are expected to be implemented as extesions of this class to be used in special programs used to reconstrut a data workflow and the processing chain applied to produce any final output.

The design was complicated by the need to keep the history data from causing memory bloat. A careless implementation could be prone to that problem even for modest chains, but we were particularly worried about iterative algorithms that could conceivably multiply the size of out of control. There was also the fundamental problem of dealing with transient versus data stored in longer term storage instead of just in memory. Our implementation was simplified by using the concept of a unique id with a Universal Unique IDentifier. (UUID) Our history mechanism assumes each data object has a uuid assigned to it on creation by an implementation id of the one object this particular record is associated with on dependent mechanism. That is, whenever a new object is created in MsPASS using the history feature one of these records will be created for each data object that is defined as atomic. This string defines unique key for the object it could be connected to with the this pointer. The parents of the current object are defined by the inputs data structure below.

In the current implementation id is string representation of a uuid maintained by each atomic object. We use a string to maximize flexibility at a minor cost for storage.

Names used imply the following concepts: raw - means the data is new input to mspass (raw data from data center, field experiment, or simulation). That tag means no prior history can be reconstructed. origin - top-level ancestor of current data. The top of a processing chain is always tagged as an origin. A top level can also be "raw" but not necessarily. In particular, readers that load partially processed data should mark the data read as an origin, but not raw. stage - all processed data objects that are volatile elements within a workflow are defined as a stage. They are presumed to leave their existence known only through ancestory preserved in the processing chain. A stage becomes a potential root only when it is saved by a writer where the writer will mark that position as a save. Considered calling this a branch, but that doesn't capture the concept right since we require this mechanism to correctly perserve splits into multiple outputs. We preserve that cleanly for each data object. That is, the implementation make it easy to reconstruct the history of a single final data object, but reconstructing interlinks between objects in an overall processing flow will be a challenge. That was a necessary compomise to avoid memory bloat. The history is properly viewed as a tree branching from a single root (the final output) to leaves that define all it's parents.

The concepts of raw, origin, and stage are implemented with the enum class defined above called ProcessingStatus. Each history record has that as an attribute, but each call to new_stage updates a copy kept inside this object to simplify the python wrappers.

Constructor & Destructor Documentation

◆ ProcessingHistory() [1/3]

mspass::utility::ProcessingHistory::ProcessingHistory ( )

Default constructor.

82 : elog() {
83 current_status = ProcessingStatus::UNDEFINED;
84 current_id = "UNDEFINED";
85 current_stage =
86 -1; // illegal value that could be used as signal for uninitalized
87 mytype = AtomicType::UNDEFINED;
88 algorithm = "UNDEFINED";
89 algid = "UNDEFINED";
90}
ErrorLogger elog
Definition ProcessingHistory.h:246

◆ ProcessingHistory() [2/3]

mspass::utility::ProcessingHistory::ProcessingHistory ( const std::string  jobnm,
const std::string  jid 
)

Construct and fill in BasicProcessingHistory job attributes.

Parameters
jobnm- set as jobname
jid- set as jobid
93 : BasicProcessingHistory(jobnm, jid), elog() {
94 current_status = ProcessingStatus::UNDEFINED;
95 current_id = "UNDEFINED";
96 current_stage =
97 -1; // illegal value that could be used as signal for uninitalized
98 mytype = AtomicType::UNDEFINED;
99 algorithm = "UNDEFINED";
100 algid = "UNDEFINED";
101}
std::string jid
Definition ProcessingHistory.h:105

◆ ProcessingHistory() [3/3]

mspass::utility::ProcessingHistory::ProcessingHistory ( const ProcessingHistory parent)

Standard copy constructor.

103 : BasicProcessingHistory(parent), elog(parent.elog), nodes(parent.nodes),
104 algorithm(parent.algorithm), algid(parent.algid) {
105 current_status = parent.current_status;
106 current_id = parent.current_id;
107 current_stage = parent.current_stage;
108 mytype = parent.mytype;
109}
std::multimap< std::string, mspass::utility::NodeData > nodes
Definition ProcessingHistory.h:672

Member Function Documentation

◆ accumulate()

void mspass::utility::ProcessingHistory::accumulate ( const std::string  alg,
const std::string  algid,
const AtomicType  typ,
const ProcessingHistory newinput 
)

Method to use with a spark reduce algorithm.

A reduce operator in spark utilizes a binary function where two inputs are used to generate a single output object. Because the inputs could be scattered on multiple processor nodes this operation must be associative. The new_ensemble_process method does not satisfy that constraint so this method was necessary to handle that type of algorithm correctly.

The way this algorithm works is it fundamentally branches on two different cases: (1) initialization, which is detected by testing if the node data map is empty or (2) secondary calls. This should work even if multiple inputs are combined at the end of the reduce operation because the copies being merged will not be empty. Note an empty input will create a complaint entry in the error log.

478 {
479 ProcessingHistory newinput(ni);
480 if ((newinput.algorithm != algin) || (newinput.algid != algidin) ||
481 (newinput.jid != newinput.jobid()) ||
482 (newinput.jnm != newinput.jobname())) {
483 NodeData nd;
484 nd = newinput.current_nodedata();
485 newinput.newid();
486 pair<string, NodeData> pn(newinput.current_id, nd);
487 newinput.nodes.insert(pn);
488 newinput.jid = newinput.jobid();
489 newinput.jnm = newinput.jobname();
490 newinput.algorithm = algin;
491 newinput.algid = algidin;
492 newinput.current_status = ProcessingStatus::VOLATILE;
493 newinput.current_stage = nd.stage + 1;
494 newinput.mytype = typ;
495 }
496 /* We have to detect an initialization condition without losing the
497 stored history. There are two conditions we need to handle. First,
498 if we create an empty container to hold the accmulator and put it on the
499 left hand side we will want to clear the history chain or we will
500 accumulate random junk. The second condition is if we accumulate in
501 a way were the left hand side is some existing data where we do want to
502 preserve the history. For the is_empty logic: we just copy the
503 newinput's history and add make its current node data the connection
504 backward - i.e. we have to make a new uuid and add an entry. */
505 if (this->is_empty()) {
506 this->newid();
507 nodes = ni.get_nodes();
508 NodeData nd;
509 nd = ni.current_nodedata();
510 pair<string, NodeData> pn(current_id, nd);
511 this->nodes.insert(pn);
512 this->set_jobid(ni.jobid());
513 this->set_jobname(ni.jobname());
514 algorithm = algin;
515 algid = algidin;
516 current_status = ProcessingStatus::VOLATILE;
517 current_stage = nd.stage + 1;
518 mytype = typ;
519 }
520 /* This is the condition for a left hand side that is not empty but not
521 yet initialized. We detect this condition by a mismatch in all the unique
522 names and ids that mark the current process define this reduce operation*/
523 else if ((this->algorithm != algin) || (this->algid != algidin) ||
524 (this->jid != newinput.jobid()) ||
525 (this->jnm != newinput.jobname())) {
526 /* This is similar to the block above, but the key difference here is we
527 have to push this's history data to convert it's current data to define an
528 input. That means getting a new uuid and pushing current node data to the
529 nodes map as an input */
530 NodeData nd;
531 nd = this->current_nodedata();
532 this->newid();
533 pair<string, NodeData> pn(current_id, nd);
534 this->nodes.insert(pn);
535 this->jid = newinput.jobid();
536 this->jnm = newinput.jobname();
537 this->algorithm = algin;
538 this->algid = algidin;
539 this->current_status = ProcessingStatus::VOLATILE;
540 this->current_stage = nd.stage + 1;
541 this->mytype = typ;
542 this->merge(newinput);
543 } else {
544 this->merge(newinput);
545 }
546}
std::string jnm
Definition ProcessingHistory.h:107
void set_jobid(const std::string &newjid)
Definition ProcessingHistory.h:89
void set_jobname(const std::string jobname)
Definition ProcessingHistory.h:93
NodeData current_nodedata() const
Definition ProcessingHistory.cc:661
void merge(const ProcessingHistory &data_to_add)
Merge the history nodes from another.
Definition ProcessingHistory.cc:441
ProcessingHistory()
Definition ProcessingHistory.cc:82
bool is_empty() const
Definition ProcessingHistory.cc:110
std::string newid()
Definition ProcessingHistory.cc:653

References current_nodedata(), get_nodes(), is_empty(), mspass::utility::BasicProcessingHistory::jid, mspass::utility::BasicProcessingHistory::jnm, mspass::utility::BasicProcessingHistory::jobid(), mspass::utility::BasicProcessingHistory::jobname(), merge(), newid(), nodes, mspass::utility::BasicProcessingHistory::set_jobid(), mspass::utility::BasicProcessingHistory::set_jobname(), and mspass::utility::NodeData::stage.

◆ add_many_inputs()

void mspass::utility::ProcessingHistory::add_many_inputs ( const std::vector< ProcessingHistory * > &  d)

Define several data objects as inputs.

This method acts like add_one_input in that it alters only the inputs chain. In fact it is nothing more than a loop over the components of the vector calling add_one_input for each component.

Parameters
dis the vector of data to define as inputs
305 {
306 vector<ProcessingHistory *>::const_iterator dptr;
307 for (dptr = d.begin(); dptr != d.end(); ++dptr) {
309 ptr = (*dptr);
310 this->add_one_input(*ptr);
311 }
312}
void add_one_input(const ProcessingHistory &data_to_add)
Add one datum as an input for current data.
Definition ProcessingHistory.cc:264

References add_one_input().

◆ add_one_input()

void mspass::utility::ProcessingHistory::add_one_input ( const ProcessingHistory data_to_add)

Add one datum as an input for current data.

This method MUST ONLY be called after a call to new_ensemble_process in the situation were additional inputs need to be defined that were not available at the time new_ensemble_process was called. An example might be a stack that was created within the scope of "algorithm" and then used in some way to create the output data. In any case it differs fundamentally from new_ensemble_process in that it does not touch attributes that define the current state of "this". It simply says this is another input to the data "this" contains.

Parameters
data_to_addis the ProcessingHistory of the data object to be defined as input. Note the type of the data to which it is linked will be saved as the base of the input chain from data_to_add. It can be different from the type of "this".
264 {
265
266 if (data_to_add.is_empty()) {
267 stringstream ss;
268 ss << "Data with uuid=" << data_to_add.id() << " has an empty history chain"
269 << endl
270 << "At best this will leave ProcessingHistory incomplete" << endl;
271 elog.log_error("ProcessingHistory::add_one_input", ss.str(),
272 ErrorSeverity::Complaint);
273 } else {
274 multimap<string, NodeData>::iterator nptr;
275 multimap<string, NodeData> newhistory = data_to_add.get_nodes();
276 multimap<string, NodeData>::iterator nl, nu;
277 /* As above this one needs check for duplicates and only add
278 a node if the data are unique. This is simple compared to
279 new_ensemble_process because we just have to check one object's history at a
280 time. */
281 for (nptr = newhistory.begin(); nptr != newhistory.end(); ++nptr) {
282 string key(nptr->first);
283 if (this->nodes.count(key) > 0) {
284 nl = this->nodes.lower_bound(key);
285 nu = this->nodes.upper_bound(key);
286 for (auto ptr = nl; ptr != nu; ++ptr) {
287 NodeData ndtest(ptr->second);
288 if (ndtest != (nptr->second)) {
289 this->nodes.insert(*nptr);
290 }
291 }
292 } else {
293 this->nodes.insert(*nptr);
294 }
295 }
296 /* Don't forget head node data*/
297 NodeData nd = data_to_add.current_nodedata();
298 NodeData ndhere = this->current_nodedata();
299 pair<string, NodeData> pnd(current_id, nd);
300 this->nodes.insert(pnd);
301 }
302}
int log_error(const mspass::utility::MsPASSError &merr)
Definition ErrorLogger.cc:72

References current_nodedata(), elog, get_nodes(), id(), is_empty(), mspass::utility::ErrorLogger::log_error(), and nodes.

◆ clean_accumulate_uuids()

string mspass::utility::ProcessingHistory::clean_accumulate_uuids ( )

Clean up inconsistent uuids that can be produced by reduce.

In a spark reduce operation it is possible to create multiple uuid keys for inputs to the same algorithm instance. That happpens because the mechanism used by ProcessingHistory to define the process history tree is not associative. When a reduce gets sprayed across multiple nodes multiple initializations can occur that make artifical inconsitent uuids. This method should normally be called after a reduce operator if history is being preserved or the history chain may be foobarred - no invalid just mess up with extra branches in the processing tree.

A VERY IMPORTANT limitation of the algorithm used by this method is that the combination of algorithm and algid in "this" MUST be unique for a given job run when a reduce is called. i.e. if an earlier workflow had used alg and algid but with a different jobid and jobname the distintion cannot be detected with this algorithm. This means our global history handling must guarantee algid is unique for each run.

Returns
unique uuid for alg,algid match set in the history chain. Note if there are no duplicates it simply returns the only one it finds. If there are duplicates it returns the lexically smallest (first in alphabetic order) uuid. Most importantly if there is no match or if history is empty it returns the string UNDEFINED.
548 {
549 /* Return undefined immediately if the history chain is empty */
550 if (this->is_empty())
551 return string("UNDEFINED");
552 NodeData ndthis = this->current_nodedata();
553 string alg(ndthis.algorithm);
554 string algidtest(ndthis.algid);
555 /* The algorithm here finds all entries for which algorithm is alg and
556 algid matches aldid. We build a list of uuids (keys) linked to that unique
557 algorithm. We then use the id in ndthis as the master*/
558 set<string> matching_ids;
559 matching_ids.insert(ndthis.uuid);
560 /* this approach of pushing iterators to this list that match seemed to
561 be the only way I could make this work correctly. Not sure why, but
562 the added cost over handling this correctly in the loops is small. */
563 std::list<multimap<string, NodeData>::iterator> need_to_erase;
564 for (auto nptr = this->nodes.begin(); nptr != this->nodes.end(); ++nptr) {
565 /* this copy operation is somewhat inefficient, but the cost is small
566 compared to how obscure the code will look if we directly manipulate the
567 second value */
568 NodeData nd(nptr->second);
569 /* this depends upon the distinction between set and multiset. i.e. an
570 insert of a duplicate does nothing*/
571 if ((alg == nd.algorithm) && (algidtest == nd.algid)) {
572 matching_ids.insert(nd.uuid);
573 need_to_erase.push_back(nptr);
574 }
575 }
576 // handle no match situation gracefully
577 if (matching_ids.empty())
578 return string("UNDEFINED");
579 /* Nothing more to do but return the uuid if there is only one*/
580 if (matching_ids.size() == 1)
581 return *(matching_ids.begin());
582 else {
583 for (auto sptr = need_to_erase.begin(); sptr != need_to_erase.end();
584 ++sptr) {
585 nodes.erase(*sptr);
586 }
587 need_to_erase.clear();
588 }
589 /* Here is the complicated case. We use the uuid from ndthis as the master
590 and change all the others. This operation works ONLY because in a multimap
591 erase only invalidates the iterator it points to and others remain valid.
592 */
593 string master_uuid = ndthis.uuid;
594 for (auto sptr = matching_ids.begin(); sptr != matching_ids.end(); ++sptr) {
595 /* Note this test is necessary to stip the master_uuid - no else needed*/
596 if ((*sptr) != master_uuid) {
597 multimap<string, NodeData>::iterator nl, nu;
598 nl = this->nodes.lower_bound(*sptr);
599 nu = this->nodes.upper_bound(*sptr);
600 for (auto nptr = nl; nptr != nu; ++nptr) {
601 NodeData nd;
602 nd = (nptr->second);
603 need_to_erase.push_back(nptr);
604 nodes.insert(pair<string, NodeData>(master_uuid, nd));
605 }
606 }
607 }
608 for (auto sptr = need_to_erase.begin(); sptr != need_to_erase.end(); ++sptr) {
609 nodes.erase(*sptr);
610 }
611
612 return master_uuid;
613}

References mspass::utility::NodeData::algid, mspass::utility::NodeData::algorithm, current_nodedata(), is_empty(), nodes, and mspass::utility::NodeData::uuid.

◆ clear()

void mspass::utility::ProcessingHistory::clear ( )

Clear this history chain - use with caution.

633 {
634 nodes.clear();
635 current_status = ProcessingStatus::UNDEFINED;
636 current_stage = 0;
637 mytype = AtomicType::UNDEFINED;
638 algorithm = "UNDEFINED";
639 algid = "UNDEFINED";
640}

References nodes.

◆ created_by()

std::pair< std::string, std::string > mspass::utility::ProcessingHistory::created_by ( ) const
inline

Return the algorithm name and id that created current node.

601 {
602 std::pair<std::string, std::string> result(algorithm, algid);
603 return result;
604 }

◆ current_nodedata()

NodeData mspass::utility::ProcessingHistory::current_nodedata ( ) const

Return all the attributes of current.

This is a convenience method strictly for the C++ interface (it too nonpythonic to be useful to wrap for python). It returns a NodeData class containing the attributes of the head of the chain. Like the getters above that is needed to save that data.

661 {
662 NodeData nd;
663 nd.status = current_status;
664 nd.uuid = current_id;
665 nd.type = mytype;
666 nd.stage = current_stage;
667 nd.algorithm = algorithm;
668 nd.algid = algid;
669 return nd;
670}

References mspass::utility::NodeData::algid, mspass::utility::NodeData::algorithm, mspass::utility::NodeData::stage, mspass::utility::NodeData::status, mspass::utility::NodeData::type, and mspass::utility::NodeData::uuid.

◆ get_nodes()

multimap< string, NodeData > mspass::utility::ProcessingHistory::get_nodes ( ) const

Retrieve the nodes multimap that defines the tree stucture branches.

This method does more than just get the protected multimap called nodes. It copies the map and then pushes the "current" contents to the map before returning the copy. This allows the data defines as current to not be pushed into the tree until they are needed.

614 {
615 /* Return empty map if it has no data - necessary or the logic
616 below will insert an empty head to the chain. */
617 if (this->is_empty())
618 return nodes; // a way to return an empty container
619 /* This is wrong, I think, but retained to test before removing.
620 remove this once current idea is confirmed. Note if that
621 proves true we can also remove the two lines above as they do
622 nothing useful*/
623 /*
624 NodeData nd;
625 nd=this->current_nodedata();
626 pair<string,NodeData> pn(current_id,nd);
627 multimap<string,NodeData> result(this->nodes);
628 result.insert(pn);
629 return result;
630 */
631 return nodes;
632}

References is_empty(), and nodes.

◆ id()

std::string mspass::utility::ProcessingHistory::id ( ) const
inline

Return the id of this object set for this history chain.

We maintain the uuid for a data object inside this class. This method fetches the string representation of the uuid of this data object.

599{ return current_id; };

◆ inputs()

list< NodeData > mspass::utility::ProcessingHistory::inputs ( const std::string  id_to_find) const

Return a list of data that define the inputs to a give uuids.

This low level getter returns the NodeData objects that define the inputs to the uuid of some piece of data that was used as input at some stage for the current object.

Parameters
id_to_findis the uuid for which input data is desired.
Returns
list of NodeData that define the inputs. Will silently return empty list if the key is not found.
672 {
673 list<NodeData> result;
674 // Return empty list immediately if key not found
675 if (nodes.count(id_to_find) <= 0)
676 return result;
677 /* Note these have to be const_iterators because method is tagged const*/
678 multimap<string, NodeData>::const_iterator upper, lower;
679 lower = nodes.lower_bound(id_to_find);
680 upper = nodes.upper_bound(id_to_find);
681 multimap<string, NodeData>::const_iterator mptr;
682 for (mptr = lower; mptr != upper; ++mptr) {
683 result.push_back(mptr->second);
684 }
685 return result;
686};

References nodes.

◆ is_empty()

bool mspass::utility::ProcessingHistory::is_empty ( ) const

Return true if the processing chain is empty.

This method provides a standard test for an invalid, empty processing chain. Constructors except the copy constructor will all put this object in an invalid state that will cause this method to return true. Only if the chain is initialized properly with a call to set_as_origin will this method return a false.

110 {
111 if ((current_status == ProcessingStatus::UNDEFINED) && (nodes.empty()))
112 return true;
113 return false;
114}

References nodes.

◆ is_origin()

bool mspass::utility::ProcessingHistory::is_origin ( ) const

Return true if the current data is in state defined as "origin" - see class description

121 {
122 if (current_status == ProcessingStatus::RAW ||
123 current_status == ProcessingStatus::ORIGIN)
124 return true;
125 else
126 return false;
127}

◆ is_raw()

bool mspass::utility::ProcessingHistory::is_raw ( ) const

Return true if the current data is in state defined as "raw" - see class description

115 {
116 if (current_status == ProcessingStatus::RAW)
117 return true;
118 else
119 return false;
120}

◆ is_saved()

bool mspass::utility::ProcessingHistory::is_saved ( ) const

Return true if the current data is in state defined as "saved" - see class description

134 {
135 if (current_status == ProcessingStatus::SAVED)
136 return true;
137 else
138 return false;
139}

◆ is_volatile()

bool mspass::utility::ProcessingHistory::is_volatile ( ) const

Return true if the current data is in state defined as "volatile" - see class description

128 {
129 if (current_status == ProcessingStatus::VOLATILE)
130 return true;
131 else
132 return false;
133}

◆ map_as_saved()

string mspass::utility::ProcessingHistory::map_as_saved ( const std::string  alg,
const std::string  algid,
const AtomicType  typ 
)

Prepare the current data for saving.

Saving data is treated as a special form of map operation. That is because a save by our definition is always a one-to-one operation with an index entry for each atomic object. This method pushes a new entry in the history chain tagged by the algorithm/algid field for the writer. It differs from new_map in the important sense that the uuid is not changed. The record this sets in the nodes multimap will then have the same uuid for the key as the that in NodeData. That along with the status set SAVED can be used downstream to recognize save records.

It is VERY IMPORTANT for use of this method to realize this method saves nothing. It only preps the history chain data so calls that follow will retrieve the right information to reconstruct the full history chain. Writers should follow this sequence:

  1. call map_as_saved with the writer name for algorithm definition
  2. save the data and history chain to MongoDB.
  3. be sure you have a copy of the uuid string of the data just saved and call the clear method.
  4. call the set_as_origin method using the uuid saved with the algorithm/id the same as used for earlier call to map_as_saved. This makes the put ProcessingHistory in a state identical to that produced by a reader.
Parameters
algis the algorithm names to assign to the ouput. This would normally be name defining the writer.
algidis an id designator to uniquely define an instance of algorithm. Note that algid must itself be a unique keyword or the history chains will get scrambled. alg is mostly carried as baggage to make output more easily comprehended without additional lookups. Note one model to distinguish records of actual save and redefinition of the data as an origin (see above) is to use a different id for the call to map_as_saved and later call to set_as_origin. This code doesn't care, but that is an implementation detail in how this will work with MongoDB.
typdefines the data type (C++ class) that was just saved.
398 {
399 if (this->is_empty()) {
400 stringstream ss;
401 ss << "Attempt to call this method on an empty history chain for uuid="
402 << this->id() << endl
403 << "Cannot preserve history for writer=" << alg << " with id=" << algid
404 << endl;
405 elog.log_error("ProcessingHistory::map_as_saved", ss.str(),
406 ErrorSeverity::Complaint);
407 return current_id;
408 }
409 /* This is essentially pushing current data to the end of the history chain
410 but using a special id that may or may not be saved by the caller.
411 We use a fixed keyword defined in ProcessingHistory.h assuming saves
412 are always a one-to-one operation (definition of atomic really)*/
413 NodeData nd(this->current_nodedata());
414 pair<string, NodeData> pn(SAVED_ID_KEY, nd);
415 this->nodes.insert(pn);
416 /* Now we reset current to define it as the saver. Then calls to the
417 getters for the multimap will properly insert this data as the end of the
418 chain. Note a key difference from new_map is we don't create a new uuid.
419 I don't think that will cause an ambiguity, but it might be better to
420 just create a new one here - will do it this way unless that proves a problem
421 as the equality of the two might be a useful test for other purposes */
422 algorithm = alg;
423 algid = algid_in;
424 current_status = ProcessingStatus::SAVED;
425 current_id = SAVED_ID_KEY;
426 if (current_stage >= 0)
427 ++current_stage;
428 else {
430 "ProcessingHistory::map_as_saved",
431 "current_stage on entry had not been initialized\nImproper usage will "
432 "create an invalid history chain that may cause downstream problems",
433 ErrorSeverity::Complaint);
434 current_stage = 0;
435 }
436 mytype = typ;
437 return current_id;
438}
std::string id() const
Definition ProcessingHistory.h:599

References current_nodedata(), elog, id(), is_empty(), mspass::utility::ErrorLogger::log_error(), and nodes.

◆ merge()

void mspass::utility::ProcessingHistory::merge ( const ProcessingHistory data_to_add)

Merge the history nodes from another.

Parameters
data_to_addis the ProcessingHistory of the data object to be merged.
441 {
442
443 if (data_to_add.is_empty()) {
444 stringstream ss;
445 ss << "Data with uuid=" << data_to_add.id() << " has an empty history chain"
446 << endl
447 << "At best this will leave ProcessingHistory incomplete" << endl;
448 elog.log_error("ProcessingHistory::merge", ss.str(),
449 ErrorSeverity::Complaint);
450 } else {
451 multimap<string, NodeData>::iterator nptr;
452 multimap<string, NodeData> newhistory = data_to_add.get_nodes();
453 multimap<string, NodeData>::iterator nl, nu;
454 for (nptr = newhistory.begin(); nptr != newhistory.end(); ++nptr) {
455 string key(nptr->first);
456 /* if the data_to_add's key matches its current id,
457 we merge all the nodes under the current id of *this. */
458 if (key == data_to_add.current_id) {
459 this->nodes.insert(std::make_pair(this->current_id, nptr->second));
460 } else if (this->nodes.count(key) > 0) {
461 nl = this->nodes.lower_bound(key);
462 nu = this->nodes.upper_bound(key);
463 for (auto ptr = nl; ptr != nu; ++ptr) {
464 NodeData ndtest(ptr->second);
465 if (ndtest != (nptr->second)) {
466 this->nodes.insert(*nptr);
467 }
468 }
469 } else {
470 this->nodes.insert(*nptr);
471 }
472 }
473 }
474}

References elog, get_nodes(), id(), is_empty(), mspass::utility::ErrorLogger::log_error(), and nodes.

◆ new_ensemble_process()

string mspass::utility::ProcessingHistory::new_ensemble_process ( const std::string  alg,
const std::string  algid,
const AtomicType  typ,
const std::vector< ProcessingHistory * >  parents,
const bool  create_newid = true 
)

Define history chain for an algorithm with multiple inputs in an ensemble.

Use this method to define the history chain for an algorithm that has multiple inputs for each output. Each output needs to call this method to build the connections that define how all inputs link to the the new data being created by the algorithm that calls this method. Use this method for map operators that have an ensemble object as input and a single data object as output. This method should be called in creation of the output object. If the algorthm builds multiple outputs to build an output ensemble call this method for each output before pushing it to the output ensemble container.

This method should not be used for a reduce operation in spark. It does not satisfy the associative rule for reduce. Use accumulate for reduce operations.

Normally, it makes sense to have the boolean create_newid true so it is guaranteed the current_id is unique. There is little cost in creating a new one if there is any doubt the current_id is not a duplicate. The false option is there only for rare cases where the current id value needs to be preserved.

Note the vector of data passed is raw pointers for efficiency to avoid excessive copying. For normal use this should not create memory leaks but make sure you don't try to free what the pointers point to or problems are guaranteed. It is VERY IMPORTANT to realize that all the pointers are presumed to point to the ProcessingHistory component of a set of larger data object (Seismogram or TimeSeries). The parents do not all have be a common type as if they have valid history data within them their current type will be defined.

This method ALWAYS marks the status as VOLATILE.

Parameters
algis the algorithm names to assign to the origin node. This would normally be name defining the algorithm that makes sense to a human.
algidis an id designator to uniquely define an instance of algorithm. Note that algid must itself be a unique keyword or the history chains will get scrambled. alg is mostly carried as baggage to make output more easily comprehended without additional lookups.
typdefines the data type (C++ class) the algorithm that is generating this data will create.
parentsis a vector of ProcessingHistory pointers for all input data objects used to create this ensemble.
create_newidis a boolean defining how the current id is handled. As described above, if true the method will call newid and set that as the current id of this data object. If false the current value is left intact.
Returns
a string representation of the uuid of the data to which this ProcessingHistory is now attached.
175 {
176 if (create_newid) {
177 this->newid();
178 }
179 /* We need to clear the tree contents because all the parents will
180 branch from this. Hence, we have to put the node data into an empty
181 container */
182 this->clear();
183 algorithm = alg;
184 algid = algid_in;
185 mytype = typ;
186 /* Initialize current stage but assume it will be updated as max of
187 parents below */
188 current_stage = 0;
189 multimap<string, NodeData>::const_iterator nptr, nl, nu;
190 size_t i;
191 /* current_stage can be ambiguous from multiple inputs. We define
192 the current stage from a reduce as the largest stage value found
193 in all inputs. Note we only test the stage value at the head for
194 each parent */
195 int max_stage(0);
196 for (i = 0; i < parents.size(); ++i) {
197 if (parents[i]->is_empty()) {
198 stringstream ss;
199 ss << "Vector member number " << i << " with uuid=" << parents[i]->id()
200 << " has an empty history chain" << endl
201 << "At best the processing history data will be incomplete" << endl;
202 elog.log_error("ProcessingHistory::new_ensemble_process", ss.str(),
203 ErrorSeverity::Complaint);
204 continue;
205 }
206 multimap<string, NodeData> parent_node_data(parents[i]->get_nodes());
207 /* We also have to get the head data with this method now */
208 NodeData nd = parents[i]->current_nodedata();
209 if (nd.stage > max_stage)
210 max_stage = nd.stage;
211 for (nptr = parent_node_data.begin(); nptr != parent_node_data.end();
212 ++nptr) {
213 /*Adding to nodes multimap has a complication. It is possible in
214 some situations to have duplicate node data coming from different
215 inputs. The method we use to reconstruct the processing history tree
216 will be confused by such duplicates so we need to test for pure
217 duplicates in NodeData values. This algorithm would not scale well
218 if the number of values with a common key is large for either
219 this or parent[i]*/
220 string key(nptr->first);
221 if (this->nodes.count(key) > 0) {
222 nl = this->nodes.lower_bound(key);
223 nu = this->nodes.upper_bound(key);
224 for (auto ptr = nl; ptr != nu; ++ptr) {
225 NodeData ndtest(ptr->second);
226 if (ndtest != (nptr->second)) {
227 this->nodes.insert(*nptr);
228 }
229 }
230 } else {
231 /* No problem just inserting a node if there were no previous
232 entries*/
233 this->nodes.insert(*nptr);
234 }
235 }
236 /* Also insert the head data */
237 pair<string, NodeData> pnd(current_id, nd);
238 this->nodes.insert(pnd);
239 }
240 current_stage = max_stage;
241 /* Now reset the current contents to make it the base of the history tree.
242 Be careful of uninitialized current_stage*/
243 if (current_stage >= 0)
244 ++current_stage;
245 else {
246 elog.log_error("ProcessingHistory::new_ensemble_process",
247 "current_stage for none of the parents was "
248 "initialized\nImproper usage will create an invalid history "
249 "chain that may cause downstream problems",
250 ErrorSeverity::Complaint);
251 current_stage = 0;
252 }
253 algorithm = alg;
254 algid = algid_in;
255 // note this is output type - inputs can be variable and defined by nodes
256 mytype = typ;
257 current_status = ProcessingStatus::VOLATILE;
258 return current_id;
259}
void clear()
Definition ProcessingHistory.cc:633
std::multimap< std::string, mspass::utility::NodeData > get_nodes() const
Definition ProcessingHistory.cc:614

References clear(), elog, get_nodes(), is_empty(), mspass::utility::ErrorLogger::log_error(), newid(), nodes, and mspass::utility::NodeData::stage.

◆ new_map() [1/2]

std::string mspass::utility::ProcessingHistory::new_map ( const std::string  alg,
const std::string  algid,
const AtomicType  typ,
const ProcessingHistory data_to_clone,
const ProcessingStatus  newstatus = ProcessingStatus::VOLATILE 
)

Define this algorithm as a one-to-one map.

Many algorithms define a one-to-one map where each one input data object creates one output data object. This class allows the input and output to be different data types requiring only that one input will map to one output. It differs from the overloaded method with fewer arguments in that it should be used if you need to clear and refresh the history chain for any reason. Known examples are creating simulation waveforms for testing within a workflow that have no prior history data loaded but which clone some properties of another piece of data. This method should be used in any situation where the history chain in the current data is wrong but the contents are the linked to some other process chain. It is supplied to cover odd cases, but use will likely be rare.

Parameters
algis the algorithm names to assign to the origin node. This would normally be name defining the algorithm that makes sense to a human.
algidis an id designator to uniquely define an instance of algorithm. Note that algid must itself be a unique keyword or the history chains will get scrambled. alg is mostly carried as baggage to make output more easily comprehended without additional lookups.
typdefines the data type (C++ class) the algorithm that is generating this data will create.
data_to_cloneis reference to the ProcessingHistory section of a parent data object that should be used to override the existing history chain.
newstatusis how the status marking for the output. Normal (default) would be VOLATILE. This argument was included mainly for flexibility in case we wanted to extend the allowed entries in ProcessingStatus.
363 {
364 /* We must be sure the chain is empty before we push the clone's data there*/
365 this->clear();
366 /* this works because get_nodes pushes the current data to the nodes
367 multimap. We intentionally do not test for an empty nodes map
368 assuming one wouldn't call this without knowing that was necessary.
369 That may be an incorrect assumption, but will use it until proven otherwise*/
370 nodes = copy_to_clone.get_nodes();
371 NodeData nd;
372 nd = this->current_nodedata();
373 /* We always need a new id here for this object we are handling as the child
374 */
375 current_id = this->newid();
376 pair<string, NodeData> pn(current_id, nd);
377 this->nodes.insert(pn);
378 algorithm = alg;
379 algid = algid_in;
380 current_status =
381 newstatus; // Probably should default in include file to VOLATILE
382 if (current_stage >= 0)
383 ++current_stage;
384 else {
386 "ProcessingHistory::new_map",
387 "current_stage on entry had not been initialized\nImproper usage will "
388 "create an invalid history chain that may cause downstream problems",
389 ErrorSeverity::Complaint);
390 current_stage = 0;
391 }
392 mytype = typ;
393 return current_id;
394}

References clear(), current_nodedata(), elog, get_nodes(), mspass::utility::ErrorLogger::log_error(), newid(), and nodes.

◆ new_map() [2/2]

std::string mspass::utility::ProcessingHistory::new_map ( const std::string  alg,
const std::string  algid,
const AtomicType  typ,
const ProcessingStatus  newstatus = ProcessingStatus::VOLATILE 
)

Define this algorithm as a one-to-one map of same type data.

Many algorithms define a one-to-one map where each one input data object creates one output data object. This (overloaded) version of this method is most appropriate when input and output are the same type and the history chain (ProcessingHistory) is what the new algorithm will alter to make the result when it finishes. Use the overloaded version with a separate ProcessingHistory copy if the current object's data are not correct. In this algorithm the chain for this algorithm is simply appended with new definitions.

Parameters
algis the algorithm names to assign to the origin node. This would normally be name defining the algorithm that makes sense to a human.
algidis an id designator to uniquely define an instance of algorithm. Note that algid must itself be a unique keyword or the history chains will get scrambled. alg is mostly carried as baggage to make output more easily comprehended without additional lookups.
typdefines the data type (C++ class) the algorithm that is generating this data will create.
newstatusis how the status marking for the output. Normal (default) would be VOLATILE. This argument was included mainly for flexibility in case we wanted to extend the allowed entries in ProcessingStatus.
322 {
323 if (this->is_empty()) {
324 stringstream ss;
325 ss << "Attempt to call this method on an empty history chain for uuid="
326 << this->id() << endl
327 << "Cannot preserve history for algorithm=" << alg
328 << " with id=" << algid << endl;
329 elog.log_error("ProcessingHistory::new_map", ss.str(),
330 ErrorSeverity::Complaint);
331 return current_id;
332 }
333 /* In this case we have to push current data to the history chain */
334 NodeData nd;
335 nd = this->current_nodedata();
336 /* We always need a new id here for this object we are handling as the child
337 */
338 current_id = this->newid();
339 /* The new id is now the key to link back to previous record so we insert
340 nd with the new key to define that link */
341 pair<string, NodeData> pn(current_id, nd);
342 this->nodes.insert(pn);
343 algorithm = alg;
344 algid = algid_in;
345 current_status =
346 newstatus; // Probably should default in include file to VOLATILE
347 if (current_stage >= 0)
348 ++current_stage;
349 else {
351 "ProcessingHistory::new_map",
352 "current_stage on entry had not been initialized\nImproper usage will "
353 "create an invalid history chain that may cause downstream problems",
354 ErrorSeverity::Complaint);
355 current_stage = 0;
356 }
357 mytype = typ;
358 return current_id;
359}

References current_nodedata(), elog, id(), is_empty(), mspass::utility::ErrorLogger::log_error(), newid(), and nodes.

◆ newid()

string mspass::utility::ProcessingHistory::newid ( )

Create a new id.

This creates a new uuid - how is an implementation detail but here we use boost's random number generator uuid generator that has some absurdly small probability of generating two equal ids. It returns the string representation of the id created.

653 {
654 boost::uuids::random_generator gen;
655 boost::uuids::uuid uuidval;
656 uuidval = gen();
657 this->current_id = boost::uuids::to_string(uuidval);
658 return current_id;
659}

◆ number_inputs() [1/2]

int mspass::utility::ProcessingHistory::number_inputs ( ) const

Return the number of inputs used to create current data.

In a number of contexts it can be useful to know the number of inputs defined for the current object. This returns that count.

650 {
651 return this->number_inputs(current_id);
652}
int number_inputs() const
Definition ProcessingHistory.cc:650

References number_inputs().

◆ number_inputs() [2/2]

int mspass::utility::ProcessingHistory::number_inputs ( const std::string  uuidstr) const

Return the number of inputs defined for any data in the process chain.

This overloaded version of number_inputs asks for the number of inputs defined for an arbitrary uuid. This is useful only if backtracing the ancestory of a child.

Parameters
uuidstris the uuid string to check in the ancestory record.
644 {
645 // Return result is int to mesh better with python even though
646 // count returns size_t
647 int n = nodes.count(testuuid);
648 return n;
649}

References nodes.

◆ number_of_stages()

size_t mspass::utility::ProcessingHistory::number_of_stages ( )
overridevirtual

Return number of processing stages that have been applied to this object.

One might want to know how many processing steps have been previously applied to produce the current data. For linear algorithms that would be useful only in debugging, but for an iterative algorithm it can be essential to avoid infinite loops with a loop limit parameter. This method returns how many times something has been done to alter the associated data. It returns 0 if the data are raw.

Important note is that the number return is the number of processing steps since the last save. Because a save operation is assumed to save the history chain then flush it there is not easy way at present to keep track of the total number of stages. If we really need this functionality it could be easily retrofitted with another private variable that is not reset when the clear method is called.

Reimplemented from mspass::utility::BasicProcessingHistory.

140{ return current_stage; }

◆ operator=()

ProcessingHistory & mspass::utility::ProcessingHistory::operator= ( const ProcessingHistory parent)

Assignment operator.

689 {
690 if (this != (&parent)) {
692 nodes = parent.nodes;
693 current_status = parent.current_status;
694 current_id = parent.current_id;
695 current_stage = parent.current_stage;
696 mytype = parent.mytype;
697 algorithm = parent.algorithm;
698 algid = parent.algid;
699 elog = parent.elog;
700 }
701 return *this;
702}
BasicProcessingHistory & operator=(const BasicProcessingHistory &parent)
Definition ProcessingHistory.h:95

References elog, nodes, and mspass::utility::BasicProcessingHistory::operator=().

◆ set_as_origin()

void mspass::utility::ProcessingHistory::set_as_origin ( const std::string  alg,
const std::string  algid,
const std::string  uuid,
const AtomicType  typ,
bool  define_as_raw = false 
)

Set to define this as the top origin of a history chain.

This method should be called when a new object is created to initialize the history as an origin. Note again an origin may be raw but not all origins are define as raw. This interface controls that through the boolean define_as_raw (false by default). python wrappers should define an alternate set_as_raw method that calls this method with define_as_raw set true.

It is VERY IMPORTANT to realize that the uuid argument passed to this method is if fundamental importance. That string is assumed to be a uuid that can be linked to either a parent data object read from storage and/or linked to a history chain saved by a prior run. It becomes the current_id for the data to which this object is a parent. This method also always does two things that define how the contents can be used. current_stage is ALWAYS set 0. We distinguish a pure origin from an intermediate save ONLY by the status value saved in the history chain. That is, only uuids with status set to RAW are viewed as guaranteed to be stored. A record marked ORIGIN is assumed to passed through save operation. To retrieve the history chain from multiple runs the pieces have to be pieced together by history data stored in MongoDB.

The contents of the history data structures should be empty when this method is called. That would be the norm for any constructor except those that make a deep copy. If unsure the clear method should be called before this method is called. If it isn't empty it will be cleared anyway and a complaint message will be posted to elog.

Parameters
algis the algorithm names to assign to the origin node. This would normally be a reader name, but it could be a synthetic generator.
algidis an id designator to uniquely define an instance of algorithm. Note that algid must itself be a unique keyword or the history chains will get scrambled.
uuidunique if for this data object (see note above)
typdefines the data type (C++ class) "this" points to. It might be possible to determine this dynamically, but a design choice was to only allow registered classes through this mechanism. i.e. the enum class typ implements has a finite number of C++ classes it accepts. The type must be a child ProcessingHistory.
define_as_rawsets status as RAW if true and ORIGIN otherwise.
Exceptions
Neverthrows an exception BUT this method will post a complaint to elog if the history data structures are not empty and it the clear method needs to be called internally.
152 {
153 const string base_error("ProcessingHistory::set_as_origin: ");
154 if (nodes.size() > 0) {
155 elog.log_error(alg + ":" + algid_in,
156 base_error + "Illegal usage. History chain was not empty. "
157 " Calling clear method and continuing",
158 ErrorSeverity::Complaint);
159 this->clear();
160 }
161 if (define_as_raw) {
162 current_status = ProcessingStatus::RAW;
163 } else {
164 current_status = ProcessingStatus::ORIGIN;
165 }
166 algorithm = alg;
167 algid = algid_in;
168 current_id = uuid;
169 mytype = typ;
170 /* Origin/raw are always defined as stage 0 even after a save. */
171 current_stage = 0;
172}

References clear(), elog, mspass::utility::ErrorLogger::log_error(), and nodes.

◆ set_id()

void mspass::utility::ProcessingHistory::set_id ( const std::string  newid)

Set the uuid manually.

It may occasionally be necessary to create a uuid by some other mechanism. This allows that, but this method should be used with caution and only if you understand the consequences.

Parameters
newidis string definition to use for the id.
660{ this->current_id = newid; }

References newid().

◆ stage()

int mspass::utility::ProcessingHistory::stage ( ) const
inline

Return the current stage count for this object.

We maintain a counter of the number of processing steps that have been applied to produce this data object. This simple method returns that counter. With this implementation this is identical to number_of_stages. We retain it in the API in the event we want to implement an accumulating counter.

591{ return current_stage; };

◆ status()

ProcessingStatus mspass::utility::ProcessingHistory::status ( ) const
inline

Return the current status definition (an enum).

593{ return current_status; };

Member Data Documentation

◆ elog

ErrorLogger mspass::utility::ProcessingHistory::elog

Error log for non-fatal processing-history consistency complaints.

◆ nodes

std::multimap<std::string, mspass::utility::NodeData> mspass::utility::ProcessingHistory::nodes
protected

Connections between each data object uuid and its input node records.

The key is the uuid of a data object, and each associated NodeData value describes one input used to create the data identified by that uuid.


The documentation for this class was generated from the following files: