Source code for workflow.workflow_process

"""
Actual process that each data run must go through.
"""

import datetime
import json
import logging

from django.utils.timezone import utc

from workflow.database.report.models import RunStatus, WorkflowSummary

from .database import transactions
from .settings import (  # noqa: F401  # noqa: F401
    CATALOG_DATA_READY,
    POSTPROCESS_INFO,
    REDUCTION_CATALOG_DATA_READY,
    REDUCTION_DATA_READY,
)
from .states import StateAction  # noqa: F401


[docs] class WorkflowProcess(StateAction): def __init__(self, connection=None, recovery=True, allowed_lag=3600): """ :param connection: AMQ connection :param recovery: if True, the system will try to recover from workflow problems :param allowed_lag: minimum number of seconds since last activity needed before identifying a problem """ super().__init__(connection=connection) self._recovery = recovery # Amount of time allowed before we start worrying about workflow issues if allowed_lag is None: self._allowed_lag = datetime.timedelta(days=1) else: self._allowed_lag = datetime.timedelta(seconds=allowed_lag) def __call__(self, *args, **kwargs): return self.verify_workflow()
[docs] def verify_workflow(self): """ Walk through the data runs and make sure they have gone through the whole workflow. """ logging.info("Verifying workflow completeness") # Get a list of run with an incomplete workflow run_list = WorkflowSummary.objects.incomplete() logging.info(" - list generated") # Dummy header for information logging logging_headers = { "destination": "/queue/%s" % POSTPROCESS_INFO, "message-id": "", } for r in run_list: r.update() # Identify a problem only if the last message received is more # than a minimum amount of time now = datetime.datetime.utcnow().replace(tzinfo=utc) if r.complete is False and now - RunStatus.objects.last_timestamp(r.run_id) > self._allowed_lag: # The workflow for this run is still incomplete # Generate a JSON description of the run, to be used # when sending a message message = r.run_id.json_encode() data_dict = json.loads(message) # Run is not cataloged if r.cataloged is False: data_dict["information"] = "Cataloging incomplete for %s" % str(r) logging.warn(data_dict["information"]) message = json.dumps(data_dict) # Log this information transactions.add_status_entry(logging_headers, message) if self._recovery: self.send( destination="/queue/%s" % CATALOG_DATA_READY, message=message, persistent="true", ) # Run hasn't been reduced if r.reduction_needed is True and r.reduced is False: data_dict["information"] = "Reduction incomplete for %s" % str(r) logging.warn(data_dict["information"]) message = json.dumps(data_dict) # Log this information transactions.add_status_entry(logging_headers, message) if self._recovery: self.send( destination="/queue/%s" % REDUCTION_DATA_READY, message=message, persistent="true", ) # Reduced data hasn't been cataloged if r.reduction_needed is True and r.reduced is True and r.reduction_cataloged is False: data_dict["information"] = "Reduction cataloging incomplete for %s" % str(r) logging.warn(data_dict["information"]) message = json.dumps(data_dict) # Log this information transactions.add_status_entry(logging_headers, message) if self._recovery: self.send( destination="/queue/%s" % REDUCTION_CATALOG_DATA_READY, message=message, persistent="true", ) logging.info(" - verification completed")