Source code for workflow.amq_client

# pylint: disable=bare-except, too-many-instance-attributes, too-many-arguments, invalid-name, line-too-long
"""
ActiveMQ workflow manager client

@author: M. Doucet, Oak Ridge National Laboratory
@copyright: 2014 Oak Ridge National Laboratory
"""

import json
import logging
import os
import time

import stomp

from .database.transactions import get_message_queues
from .workflow_process import WorkflowProcess

try:
    from . import __version__ as version
except:  # noqa: E722
    version = "development"


[docs] class Client: """ ActiveMQ client Holds the connection to a broker """ def __init__( self, brokers, user, passcode, queues=None, workflow_check=False, check_frequency=24, workflow_recovery=False, flexible_tasks=False, consumer_name="amq_consumer", auto_ack=True, ): """ :param brokers: list of brokers we can connect to :param user: activemq user :param passcode: passcode for activemq user :param queues: list of queues to listen to :param workflow_check: if True, the workflow will be checked at a given interval :param check_frequency: number of hours between workflow checks :param workflow_recovery: if True, the manager will try to recover from workflow problems :param flexible_tasks: if True, the workflow tasks will be defined by the DB :param consumer_name: name of the AMQ listener :param auto_ack: if True, AMQ ack will be automatic """ # Connection parameters self._auto_ack = auto_ack self._brokers = brokers self._user = user self._passcode = passcode self._connection = None self._consumer_name = consumer_name self._queues = queues if self._queues is None: self._queues = get_message_queues(True) # Delay between workflow check [in seconds] self._workflow_check_delay = 60.0 * 60.0 * check_frequency self._workflow_check_start = time.time() self._workflow_check = workflow_check self._workflow_recovery = workflow_recovery self._flexible_tasks = flexible_tasks # Listener used for dealing with incoming messages self._listener = None startup_msg = "Starting Workflow Manager client %s\n" % self._consumer_name startup_msg += " Version: %s\n" % version startup_msg += " User: %s\n" % self._user startup_msg += " DB task definition allowed? %s\n" % str(self._flexible_tasks) startup_msg += " Workflow check enabled? %s\n" % str(self._workflow_check) if self._workflow_check: startup_msg += " Time between checks: %s seconds\n" % str(self._workflow_check_delay) startup_msg += " Recovery enabled? %s\n" % str(self._workflow_recovery) if self._workflow_recovery: startup_msg += " Delay since last activity before attempting recovery: %s seconds\n" % str( self._workflow_check_delay ) logging.info(startup_msg)
[docs] def set_listener(self, listener): """ Set the listener object that will process each incoming message. :param listener: listener object """ self._listener = listener self._connection = self.new_connection() self._listener.set_connection(self._connection)
[docs] def get_connection(self, consumer_name=None): """ Get existing connection or create a new one. """ if self._connection is None or not self._connection.is_connected(): self._disconnect() self._connection = self.new_connection(consumer_name) return self._connection
[docs] def new_connection(self, consumer_name=None): """ Establish and return a connection to ActiveMQ :param consumer_name: name to give the new connection """ if consumer_name is None: consumer_name = self._consumer_name logging.info("[%s] Connecting to %s", consumer_name, str(self._brokers)) conn = stomp.Connection(host_and_ports=self._brokers, keepalive=True) conn.set_listener(consumer_name, self._listener) conn.connect(self._user, self._passcode, wait=True) # Give the connection threads a little breather time.sleep(0.5) return conn
[docs] def connect(self): """ Connect to a broker """ if self._connection is None or not self._connection.is_connected(): self._disconnect() self._connection = self.get_connection() logging.info("[%s] Subscribing to %s", self._consumer_name, str(self._queues)) for i, q in enumerate(self._queues): if self._auto_ack: self._connection.subscribe(destination=q, id=i, ack="auto") else: self._connection.subscribe(destination=q, id=i, ack="client")
def _disconnect(self): """ Clean disconnect """ if self._connection is not None and self._connection.is_connected(): self._connection.disconnect() self._connection = None
[docs] def verify_workflow(self): """ Verify that the workflow has completed for all the runs and recover if it hasn't """ if self._workflow_check: try: WorkflowProcess( connection=self._connection, recovery=self._workflow_recovery, allowed_lag=self._workflow_check_delay, ).verify_workflow() except: # noqa: E722 logging.exception("Workflow verification failed:")
[docs] def listen_and_wait(self, waiting_period=1.0): """ Listen for the next message from the brokers. This method will simply return once the connection is terminated. :param waiting_period: sleep time between connection to a broker """ try: self.connect() self.verify_workflow() except: # noqa: E722 logging.exception("Problem starting AMQ client:") last_heartbeat = 0 while True: try: if self._connection is None or self._connection.is_connected() is False: self.connect() time.sleep(waiting_period) try: if time.time() - last_heartbeat > 5: last_heartbeat = time.time() data_dict = { "src_name": "workflowmgr", "status": "0", "pid": str(os.getpid()), } message = json.dumps(data_dict) self._connection.send( "/topic/SNS.COMMON.STATUS.WORKFLOW.0", message, persistent="false", ) except: # noqa: E722 logging.exception("Problem sending heartbeat:") # Check for workflow completion if time.time() - self._workflow_check_start > self._workflow_check_delay: self.verify_workflow() self._workflow_check_start = time.time() except: # noqa: E722 logging.exception("Problem connecting to AMQ broker:") time.sleep(5.0)