workflow package

Subpackages

Submodules

workflow.amq_client module

ActiveMQ workflow manager client

@author: M. Doucet, Oak Ridge National Laboratory @copyright: 2014 Oak Ridge National Laboratory

class workflow.amq_client.Client(brokers, user, passcode, queues=None, workflow_check=False, check_frequency=24, workflow_recovery=False, flexible_tasks=False, consumer_name='amq_consumer', auto_ack=True)[source]

Bases: object

ActiveMQ client Holds the connection to a broker

connect()[source]

Connect to a broker

get_connection(consumer_name=None)[source]

Get existing connection or create a new one.

listen_and_wait(waiting_period=1.0)[source]

Listen for the next message from the brokers. This method will simply return once the connection is terminated.

Parameters:

waiting_period – sleep time between connection to a broker

new_connection(consumer_name=None)[source]

Establish and return a connection to ActiveMQ

Parameters:

consumer_name – name to give the new connection

set_listener(listener)[source]

Set the listener object that will process each incoming message.

Parameters:

listener – listener object

verify_workflow()[source]

Verify that the workflow has completed for all the runs and recover if it hasn’t

workflow.amq_listener module

ActiveMQ listener class for the workflow manager

class workflow.amq_listener.Listener(use_db_tasks=False, auto_ack=True)[source]

Bases: ConnectionListener

AMQ listener for the workflow manager

on_message(frame)[source]

Process a message. Example of an ActiveMQ header: headers: {‘expires’: ‘0’, ‘timestamp’: ‘1344613053723’,

‘destination’: ‘/queue/POSTPROCESS.DATA_READY’, ‘persistent’: ‘true’, ‘priority’: ‘5’, ‘message-id’: ‘ID:mac83086.ornl.gov-59780-1344536680877-8:2:1:1:1’}

Parameters:

frame – stomp.utils.Frame

set_amq_user(brokers, user, passcode)[source]

Set the ActiveMQ credentials to use when created a new connection

set_connection(connection)[source]

Set a AMQ connection

workflow.daemon module

Code taken from: http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/

class workflow.daemon.Daemon(pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null')[source]

Bases: object

A generic daemon class.

Usage: subclass the Daemon class and override the run() method

daemonize()[source]

do the UNIX double-fork magic, see Stevens’ “Advanced Programming in the UNIX Environment” for details (ISBN 0201563177) http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16

delpid()[source]
restart()[source]

Restart the daemon

run()[source]

You should override this method when you subclass Daemon. It will be called after the process has been daemonized by start() or restart().

start()[source]

Start the daemon

status()[source]
stop()[source]

Stop the daemon

workflow.sns_post_processing module

Workflow manager process

class workflow.sns_post_processing.WorkflowDaemon(pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null', check_frequency=None, workflow_recovery=False, flexible_tasks=False)[source]

Bases: Daemon

Workflow daemon

run()[source]

Run the workflow manager daemon

workflow.sns_post_processing.run()[source]

Interactive run command

workflow.sns_post_processing.run_daemon(pid_file, stdout_file, stderr_file, check_frequency, recover, flexible_tasks, command)[source]

Start daemon

workflow.state_utilities module

workflow.state_utilities.decode_message(message)[source]

Decode message and turn it into a dictionnary we can understand.

Messages from streaming translation are expected to be an absolute path of the following type:

Old system: /SNS/EQSANS/IPTS-1234/…/EQSANS_5678_event.nxs ADARA: /SNS/EQSANS/IPTS-1234/nexus/EQSANS_5678.nxs.h5

Calibration runs, etc… have 2009_06_24_CAL instead of IPTS-xxxx

workflow.state_utilities.logged_action(action)[source]

Decorator used to log a received message before processing it

workflow.states module

Action classes to be called when receiving specific messages.

To add an action for a specific queue, add a StateAction class with the name of the queue in lower-case, replacing periods with underscores.

class workflow.states.Catalog_request(connection=None, use_db_task=False)[source]

Bases: StateAction

Default action for CATALOG.REQUEST messages

class workflow.states.Postprocess_data_ready(connection=None, use_db_task=False)[source]

Bases: StateAction

Default action for POSTPROCESS.DATA_READY messages

class workflow.states.Reduction_complete(connection=None, use_db_task=False)[source]

Bases: StateAction

Default action for REDUCTION.COMPLETE messages

class workflow.states.Reduction_request(connection=None, use_db_task=False)[source]

Bases: StateAction

Default action for REDUCTION.REQUEST messages

class workflow.states.StateAction(connection=None, use_db_task=False)[source]

Bases: object

Base class for processing messages

send(destination, message, persistent='true')[source]

Send a message to a queue

Parameters:
  • destination – name of the queue

  • message – message content

workflow.workflow_process module

Actual process that each data run must go through.

class workflow.workflow_process.WorkflowProcess(connection=None, recovery=True, allowed_lag=3600)[source]

Bases: StateAction

verify_workflow()[source]

Walk through the data runs and make sure they have gone through the whole workflow.

Module contents