workflow.database package

Subpackages

Submodules

workflow.database.manage module

workflow.database.transactions module

Perform DB transactions

workflow.database.transactions.add_status_entry(headers, data)[source]

Populate the reporting database with the contents of a status message of the following format:

headers: {‘expires’: ‘0’, ‘timestamp’: ‘1344613053723’,

‘destination’: ‘/queue/POSTPROCESS.DATA_READY’, ‘persistent’: ‘true’, ‘priority’: ‘5’, ‘message-id’: ‘ID:mac83086.ornl.gov-59780-1344536680877-8:2:1:1:1’}

The data is a dictionary in a JSON format.

data: {“instrument”: tokens[2],

“ipts”: tokens[3], “run_number”: run_number, “data_file”: message}

Parameters:
  • headers – ActiveMQ message header dictionary

  • data – JSON encoded message content

workflow.database.transactions.add_task(instrument, input_queue, task_class='', task_queues=None, success_queues=None)[source]

Add a task entry

workflow.database.transactions.add_workflow_status_entry(destination, message)[source]

Add a database entry for an event generated by the workflow manager. This represents additional information regarding interventions by the workflow manager.

Parameters:
  • destination – string representing the StatusQueue

  • message – JSON encoded data dictionary

workflow.database.transactions.get_message_queues(only_workflow_inputs=True)[source]

Get the list of message queues from the DB

Parameters:

only_workflow_inputs – if True, only the queues that the workflow manager listens to will be returned

workflow.database.transactions.get_task(message_headers, message_data)[source]

Find the DB entry for this queue

Parameters:
  • headers – message headers

  • message – JSON-encoded message content

workflow.database.transactions.sql_dump_tasks()[source]

Dump the SQL necessary to insert the current task definitions

Module contents