workflow package
Subpackages
- workflow.database package
- Subpackages
- workflow.database.report package
- Submodules
- workflow.database.report.models module
DataRun
DataRun.DoesNotExist
DataRun.MultipleObjectsReturned
DataRun.create_and_save()
DataRun.created_on
DataRun.file
DataRun.get_next_by_created_on()
DataRun.get_previous_by_created_on()
DataRun.id
DataRun.instrument_id
DataRun.instrument_id_id
DataRun.instrumentstatus_set
DataRun.ipts_id
DataRun.ipts_id_id
DataRun.is_complete()
DataRun.json_encode()
DataRun.last_error()
DataRun.objects
DataRun.run_number
DataRun.runstatus_set
DataRun.workflowsummary_set
DataRunManager
Error
IPTS
IPTSManager
Information
Instrument
Instrument.DoesNotExist
Instrument.MultipleObjectsReturned
Instrument.activeinstrument_set
Instrument.choice_set
Instrument.datarun_set
Instrument.id
Instrument.instrumentstatus_set
Instrument.legacyurl_set
Instrument.monitoredvariable_set
Instrument.name
Instrument.number_of_expts()
Instrument.number_of_runs()
Instrument.objects
Instrument.pv_set
Instrument.pvcache_set
Instrument.pvstring_set
Instrument.pvstringcache_set
Instrument.reductionproperty_set
Instrument.signal_set
Instrument.statuscache_set
Instrument.statusvariable_set
Instrument.task_set
InstrumentManager
InstrumentStatus
RunStatus
RunStatus.DoesNotExist
RunStatus.MultipleObjectsReturned
RunStatus.created_on
RunStatus.error_set
RunStatus.get_next_by_created_on()
RunStatus.get_previous_by_created_on()
RunStatus.has_errors()
RunStatus.id
RunStatus.information_set
RunStatus.last_error()
RunStatus.last_info()
RunStatus.message_id
RunStatus.objects
RunStatus.queue_id
RunStatus.queue_id_id
RunStatus.run_id
RunStatus.run_id_id
RunStatusManager
StatusQueue
Task
TaskManager
WorkflowSummary
WorkflowSummary.DoesNotExist
WorkflowSummary.MultipleObjectsReturned
WorkflowSummary.catalog_started
WorkflowSummary.cataloged
WorkflowSummary.complete
WorkflowSummary.id
WorkflowSummary.objects
WorkflowSummary.reduced
WorkflowSummary.reduction_catalog_started
WorkflowSummary.reduction_cataloged
WorkflowSummary.reduction_needed
WorkflowSummary.reduction_started
WorkflowSummary.run_id
WorkflowSummary.run_id_id
WorkflowSummary.update()
WorkflowSummaryManager
- Module contents
- workflow.database.report package
- Submodules
- workflow.database.manage module
- workflow.database.transactions module
- Module contents
- Subpackages
Submodules
workflow.amq_client module
ActiveMQ workflow manager client
@author: M. Doucet, Oak Ridge National Laboratory @copyright: 2014 Oak Ridge National Laboratory
- class workflow.amq_client.Client(brokers, user, passcode, queues=None, workflow_check=False, check_frequency=24, workflow_recovery=False, flexible_tasks=False, consumer_name='amq_consumer', auto_ack=True)[source]
Bases:
object
ActiveMQ client Holds the connection to a broker
- listen_and_wait(waiting_period=1.0)[source]
Listen for the next message from the brokers. This method will simply return once the connection is terminated.
- Parameters:
waiting_period – sleep time between connection to a broker
- new_connection(consumer_name=None)[source]
Establish and return a connection to ActiveMQ
- Parameters:
consumer_name – name to give the new connection
workflow.amq_listener module
ActiveMQ listener class for the workflow manager
- class workflow.amq_listener.Listener(use_db_tasks=False, auto_ack=True)[source]
Bases:
ConnectionListener
AMQ listener for the workflow manager
- on_message(frame)[source]
Process a message. Example of an ActiveMQ header: headers: {‘expires’: ‘0’, ‘timestamp’: ‘1344613053723’,
‘destination’: ‘/queue/POSTPROCESS.DATA_READY’, ‘persistent’: ‘true’, ‘priority’: ‘5’, ‘message-id’: ‘ID:mac83086.ornl.gov-59780-1344536680877-8:2:1:1:1’}
- Parameters:
frame – stomp.utils.Frame
workflow.daemon module
Code taken from: http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/
- class workflow.daemon.Daemon(pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null')[source]
Bases:
object
A generic daemon class.
Usage: subclass the Daemon class and override the run() method
- daemonize()[source]
do the UNIX double-fork magic, see Stevens’ “Advanced Programming in the UNIX Environment” for details (ISBN 0201563177) http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
workflow.sns_post_processing module
Workflow manager process
workflow.state_utilities module
- workflow.state_utilities.decode_message(message)[source]
Decode message and turn it into a dictionnary we can understand.
Messages from streaming translation are expected to be an absolute path of the following type:
Old system: /SNS/EQSANS/IPTS-1234/…/EQSANS_5678_event.nxs ADARA: /SNS/EQSANS/IPTS-1234/nexus/EQSANS_5678.nxs.h5
Calibration runs, etc… have 2009_06_24_CAL instead of IPTS-xxxx
workflow.states module
Action classes to be called when receiving specific messages.
To add an action for a specific queue, add a StateAction class with the name of the queue in lower-case, replacing periods with underscores.
- class workflow.states.Catalog_request(connection=None, use_db_task=False)[source]
Bases:
StateAction
Default action for CATALOG.REQUEST messages
- class workflow.states.Postprocess_data_ready(connection=None, use_db_task=False)[source]
Bases:
StateAction
Default action for POSTPROCESS.DATA_READY messages
- class workflow.states.Reduction_complete(connection=None, use_db_task=False)[source]
Bases:
StateAction
Default action for REDUCTION.COMPLETE messages
- class workflow.states.Reduction_request(connection=None, use_db_task=False)[source]
Bases:
StateAction
Default action for REDUCTION.REQUEST messages
workflow.workflow_process module
Actual process that each data run must go through.
- class workflow.workflow_process.WorkflowProcess(connection=None, recovery=True, allowed_lag=3600)[source]
Bases:
StateAction