Source code for workflow.sns_post_processing

#!/usr/bin/env python
# pylint: disable=invalid-name, line-too-long, too-many-arguments
"""
Workflow manager process
"""

import argparse
import logging
import logging.handlers
import os
import sys

import psutil

from workflow.amq_client import Client
from workflow.amq_listener import Listener

from .database import transactions  # noqa: F401
from .settings import BROKERS, LOGGING_LEVEL, WKFLOW_PASSCODE, WKFLOW_USER

# Set log level
logging.getLogger().setLevel(LOGGING_LEVEL)
logging.getLogger("stomp.py").setLevel(logging.WARNING)

# Formatter
ft = logging.Formatter("%(levelname)s:%(asctime)-15s %(message)s")
# Create a stream handler for stdout
sh = logging.StreamHandler()
sh.setLevel(logging.INFO)
sh.setFormatter(ft)
logging.getLogger().addHandler(sh)
# Create a log file handler
fh = logging.handlers.TimedRotatingFileHandler("workflow.log", when="midnight", backupCount=15)
fh.setLevel(LOGGING_LEVEL)
fh.setFormatter(ft)
logging.getLogger().addHandler(fh)


[docs] def start(check_frequency, workflow_recovery, flexible_tasks): """ Run the workflow manager """ auto_ack = True if check_frequency is None: check_frequency = 24 workflow_check = False else: workflow_check = True c = Client( BROKERS, WKFLOW_USER, WKFLOW_PASSCODE, workflow_check=workflow_check, check_frequency=check_frequency, workflow_recovery=workflow_recovery, flexible_tasks=flexible_tasks, consumer_name="workflow_manager", auto_ack=auto_ack, ) listener = Listener(use_db_tasks=flexible_tasks, auto_ack=auto_ack) listener.set_amq_user(BROKERS, WKFLOW_USER, WKFLOW_PASSCODE) c.set_listener(listener) c.listen_and_wait(0.1)
[docs] def status(): # check if running by checking for a process with the name workflowmgr me = os.getpid() for proc in psutil.process_iter(attrs=["pid", "cmdline"]): if proc.info["pid"] == me: continue cmd = " ".join(proc.info.get("cmdline") or []) if ("workflowmgr" in cmd or "sns_post_processing" in cmd) and "start" in cmd: logging.info("workflowmgr is running with PID %d", proc.info["pid"]) sys.exit(0) logging.error("workflowmgr is not running") sys.exit(1)
[docs] def run(): """ Interactive run command """ # Start/restart options start_parser = argparse.ArgumentParser(add_help=False) # Workflow verification and recovery options start_parser.add_argument( "-f", metavar="hours", help="number of hours between workflow checks", type=int, dest="check_frequency", ) start_parser.add_argument( "-r", help="try to recover from workflow problems", action="store_true", dest="recover", ) start_parser.add_argument( "--skip_flexible_tasks", help="read task definitions from DB", action="store_true", dest="flexible_tasks", ) parser = argparse.ArgumentParser(description="SNS data workflow manager") subparsers = parser.add_subparsers(dest="command", help="available sub-commands") subparsers.add_parser("start", help="Start workflowmgr [-h for help]", parents=[start_parser]) subparsers.add_parser("status", help="Show running status of workflowmgr") subparsers.add_parser("dump", help="Dump task SQL") parser_task = subparsers.add_parser("add_task", help="Add task definition [-h for help]") parser_task.add_argument( "-i", metavar="instrument", required=True, help="name of the instrument to add the task for", dest="instrument", ) parser_task.add_argument( "-q", metavar="input queue", required=True, help="name of the input queue that triggers the task", dest="input_queue", ) parser_task.add_argument( "-c", metavar="task class", default="", help="name of the class to be instantiated and executed", dest="task_class", ) parser_task.add_argument( "-t", metavar="task queues", nargs="+", help="list of task message queues", dest="task_queues", ) parser_task.add_argument( "-s", metavar="success queues", nargs="+", help="list of task success message queues", dest="success_queues", ) namespace = parser.parse_args() # If we just need to dump the workflow tables, # do it and stop here if namespace.command == "dump": transactions.sql_dump_tasks() sys.exit(0) elif namespace.command == "add_task": transactions.add_task( instrument=namespace.instrument, input_queue=namespace.input_queue, task_class=namespace.task_class, task_queues=namespace.task_queues, success_queues=namespace.success_queues, ) sys.exit(0) if namespace.command == "start": start(namespace.check_frequency, namespace.recover, not namespace.flexible_tasks) elif namespace.command == "status": status()
if __name__ == "__main__": run()