workflow package
Subpackages
- workflow.database package
- Subpackages
- workflow.database.report package
- Submodules
- workflow.database.report.models module
DataRunDataRun.DoesNotExistDataRun.MultipleObjectsReturnedDataRun.create_and_save()DataRun.created_onDataRun.fileDataRun.get_next_by_created_on()DataRun.get_previous_by_created_on()DataRun.idDataRun.instrument_idDataRun.instrument_id_idDataRun.instrumentstatus_setDataRun.ipts_idDataRun.ipts_id_idDataRun.is_complete()DataRun.json_encode()DataRun.last_error()DataRun.objectsDataRun.run_numberDataRun.run_titleDataRun.runstatus_setDataRun.workflowsummary
DataRunManagerErrorIPTSIPTSManagerInformationInstrumentInstrument.DoesNotExistInstrument.MultipleObjectsReturnedInstrument.activeinstrumentInstrument.choice_setInstrument.datarun_setInstrument.idInstrument.instrumentstatusInstrument.monitoredvariable_setInstrument.nameInstrument.number_of_expts()Instrument.number_of_runs()Instrument.objectsInstrument.pv_setInstrument.pvcache_setInstrument.pvstringcache_setInstrument.reductionproperty_setInstrument.statuscache_setInstrument.statusvariable_setInstrument.task_set
InstrumentManagerInstrumentStatusRunStatusRunStatus.DoesNotExistRunStatus.MultipleObjectsReturnedRunStatus.created_onRunStatus.error_setRunStatus.get_next_by_created_on()RunStatus.get_previous_by_created_on()RunStatus.has_errors()RunStatus.idRunStatus.information_setRunStatus.last_error()RunStatus.last_info()RunStatus.message_idRunStatus.objectsRunStatus.queue_idRunStatus.queue_id_idRunStatus.run_idRunStatus.run_id_id
RunStatusManagerStatusQueueStatusQueueMessageCountStatusQueueMessageCount.DoesNotExistStatusQueueMessageCount.MultipleObjectsReturnedStatusQueueMessageCount.created_onStatusQueueMessageCount.get_next_by_created_on()StatusQueueMessageCount.get_previous_by_created_on()StatusQueueMessageCount.idStatusQueueMessageCount.message_countStatusQueueMessageCount.objectsStatusQueueMessageCount.queueStatusQueueMessageCount.queue_idStatusQueueMessageCount.to_dict()
TaskTaskManagerWorkflowSummaryWorkflowSummary.DoesNotExistWorkflowSummary.MultipleObjectsReturnedWorkflowSummary.catalog_startedWorkflowSummary.catalogedWorkflowSummary.completeWorkflowSummary.idWorkflowSummary.objectsWorkflowSummary.reducedWorkflowSummary.reduction_catalog_startedWorkflowSummary.reduction_catalogedWorkflowSummary.reduction_neededWorkflowSummary.reduction_startedWorkflowSummary.run_idWorkflowSummary.run_id_idWorkflowSummary.update()
WorkflowSummaryManager
- Module contents
- workflow.database.report package
- Submodules
- workflow.database.manage module
- workflow.database.transactions module
- Module contents
- Subpackages
Submodules
workflow.amq_client module
ActiveMQ workflow manager client
@author: M. Doucet, Oak Ridge National Laboratory @copyright: 2014 Oak Ridge National Laboratory
- class workflow.amq_client.Client(brokers, user, passcode, queues=None, workflow_check=False, check_frequency=24, workflow_recovery=False, flexible_tasks=False, consumer_name='amq_consumer', auto_ack=True)[source]
Bases:
objectActiveMQ client Holds the connection to a broker
- listen_and_wait(waiting_period=1.0)[source]
Listen for the next message from the brokers. This method will simply return once the connection is terminated.
- Parameters:
waiting_period – sleep time between connection to a broker
- new_connection(consumer_name=None)[source]
Establish and return a connection to ActiveMQ
- Parameters:
consumer_name – name to give the new connection
workflow.amq_listener module
ActiveMQ listener class for the workflow manager
- class workflow.amq_listener.Listener(use_db_tasks=False, auto_ack=True)[source]
Bases:
ConnectionListenerAMQ listener for the workflow manager
- on_message(frame)[source]
Process a message. Example of an ActiveMQ header: headers: {‘expires’: ‘0’, ‘timestamp’: ‘1344613053723’,
‘destination’: ‘/queue/POSTPROCESS.DATA_READY’, ‘persistent’: ‘true’, ‘priority’: ‘5’, ‘message-id’: ‘ID:mac83086.ornl.gov-59780-1344536680877-8:2:1:1:1’}
- Parameters:
frame – stomp.utils.Frame
workflow.sns_post_processing module
Workflow manager process
workflow.state_utilities module
- workflow.state_utilities.decode_message(message)[source]
Decode message and turn it into a dictionnary we can understand.
Messages from streaming translation are expected to be an absolute path of the following type:
Old system: /SNS/EQSANS/IPTS-1234/…/EQSANS_5678_event.nxs ADARA: /SNS/EQSANS/IPTS-1234/nexus/EQSANS_5678.nxs.h5
Calibration runs, etc… have 2009_06_24_CAL instead of IPTS-xxxx
workflow.states module
Action classes to be called when receiving specific messages.
To add an action for a specific queue, add a StateAction class with the name of the queue in lower-case, replacing periods with underscores.
- class workflow.states.Catalog_request(connection=None, use_db_task=False)[source]
Bases:
StateActionDefault action for CATALOG.REQUEST messages
- class workflow.states.Postprocess_data_ready(connection=None, use_db_task=False)[source]
Bases:
StateActionDefault action for POSTPROCESS.DATA_READY messages
- class workflow.states.Reduction_complete(connection=None, use_db_task=False)[source]
Bases:
StateActionDefault action for REDUCTION.COMPLETE messages
- class workflow.states.Reduction_request(connection=None, use_db_task=False)[source]
Bases:
StateActionDefault action for REDUCTION.REQUEST messages
workflow.workflow_process module
Actual process that each data run must go through.
- class workflow.workflow_process.WorkflowProcess(connection=None, recovery=True, allowed_lag=3600)[source]
Bases:
StateAction