Source code for workflow.database.report.models

import json

from django.db import models


[docs] class InstrumentManager(models.Manager):
[docs] def find_instrument(self, instrument): """ Get the object associated to an instrument name """ instrument_ids = super(InstrumentManager, self).get_queryset().filter(name=instrument.lower()) if len(instrument_ids) > 0: return instrument_ids[0] return None
[docs] def sql_dump(self): sql = "" instrument_ids = super(InstrumentManager, self).get_queryset() max_instr_id = 1 for item in instrument_ids: max_instr_id = max(max_instr_id, item.id) sql += "INSERT INTO report_instrument(" sql += "id, name) " sql += "VALUES (%d, " % item.id sql += "'%s');\n" % item.name sql += "SELECT pg_catalog.setval('report_instrument_id_seq', %d, true);\n" % max_instr_id return sql
[docs] class Instrument(models.Model): name = models.CharField(max_length=20, unique=True) objects = InstrumentManager() class Meta: app_label = "report" def __str__(self): return self.name
[docs] def number_of_runs(self): """ Returns the total number of runs for this instrument """ return DataRun.objects.filter(instrument_id=self).count()
[docs] def number_of_expts(self): """ Returns the total number of experiments for this instrument """ return IPTS.objects.filter(instruments=self).count()
[docs] class IPTSManager(models.Manager):
[docs] def ipts_for_instrument(self, instrument_id): return super(IPTSManager, self).get_queryset().filter(instruments=instrument_id)
[docs] def get_last_ipts(self, instrument_id): """ Get the last experiment object for a given instrument. Returns None if nothing was found. :param instrument_id: Instrument object """ ipts_query = ( super(IPTSManager, self).get_queryset().filter(instruments=instrument_id).order_by("created_on").reverse() ) if len(ipts_query) > 0: return ipts_query[0] return None
[docs] class IPTS(models.Model): """ Table holding IPTS information """ expt_name = models.CharField(max_length=20, unique=True) instruments = models.ManyToManyField(Instrument, related_name="_ipts_instruments+") created_on = models.DateTimeField("Timestamp", auto_now_add=True) objects = IPTSManager() class Meta: verbose_name_plural = "IPTS" app_label = "report" def __str__(self): return self.expt_name
[docs] def number_of_runs(self, instrument_id=None): """ Returns the total number of runs for this IPTS on the given instrument. :param instrument_id: Instrument object """ if instrument_id is None: return DataRun.objects.filter(ipts_id=self).distinct().count() return DataRun.objects.filter(ipts_id=self, instrument_id=instrument_id).distinct().count()
[docs] class DataRunManager(models.Manager):
[docs] def get_last_run(self, instrument_id, ipts_id=None): """ Get the last run for a given instrument and experiment. Returns None if nothing was found. :param instrument_id: Instrument object :param ipts_id: IPTS object """ if ipts_id is None: last_run_query = super(DataRunManager, self).get_queryset().filter(instrument_id=instrument_id) else: last_run_query = ( super(DataRunManager, self).get_queryset().filter(instrument_id=instrument_id, ipts_id=ipts_id) ) if len(last_run_query) > 0: last_run_query = last_run_query.order_by("created_on").reverse() return last_run_query[0] return None
[docs] def get_last_cached_run(self, instrument_id): """ Try to get the last run from the InstrumentStatus table. If we can't find it, find it the long way and add the result to the cache. :param instrument_id: Instrument object :param ipts_id: IPTS object """ try: status = InstrumentStatus.objects.get(instrument_id=instrument_id) last_run_id = status.last_run_id except: # noqa: E722 last_run_id = DataRun.objects.get_last_run(instrument_id) if last_run_id is not None: instrument = InstrumentStatus(instrument_id=instrument_id, last_run_id=last_run_id) instrument.save() return last_run_id
[docs] class DataRun(models.Model): """ TODO: run number should be unique for a given instrument """ run_number = models.IntegerField() ipts_id = models.ForeignKey(IPTS, on_delete=models.CASCADE) instrument_id = models.ForeignKey(Instrument, on_delete=models.CASCADE) file = models.CharField(max_length=128) created_on = models.DateTimeField("Timestamp", auto_now_add=True) objects = DataRunManager() class Meta: app_label = "report" def __str__(self): return "%s_%d" % (self.instrument_id, self.run_number)
[docs] @classmethod def create_and_save(cls, run_number, ipts_id, instrument_id, file): """ Create a database entry for this run and update the instrument status """ # Create a run object run_id = cls( run_number=run_number, instrument_id=instrument_id, ipts_id=ipts_id, file=file, ) run_id.save() # Update the instrument status try: instrument = InstrumentStatus.objects.get(instrument_id=instrument_id) except: # noqa: E722 instrument = InstrumentStatus(instrument_id=instrument_id) instrument.save() instrument.last_run_id = run_id instrument.save() return run_id
[docs] def is_complete(self): """ Return completion status """ try: s = WorkflowSummary.objects.get(run_id=self) if s.complete is True: return True except: # noqa: E722 # No entry for this run pass return False
[docs] def last_error(self): """ Return last error """ errors = Error.objects.filter(run_status_id__run_id=self) # .order_by('-run_status_id__created_on') if len(errors) > 0: return errors[len(errors) - 1].description return None
[docs] def json_encode(self): """ Encode the object as a JSON dictionnary """ return json.dumps( { "instrument": self.instrument_id.name, "ipts": str(self.ipts_id), "run_number": self.run_number, "data_file": self.file, } )
[docs] class StatusQueue(models.Model): """ Table containing the ActiveMQ queue names used as status """ name = models.CharField(max_length=100, unique=True) is_workflow_input = models.BooleanField(default=False) class Meta: app_label = "report" def __str__(self): return self.name
[docs] class StatusQueueMessageCount(models.Model): queue = models.ForeignKey(StatusQueue, on_delete=models.CASCADE) message_count = models.IntegerField() created_on = models.DateTimeField("Timestamp", auto_now_add=True) class Meta: app_label = "report" def __str__(self): return f"{self.queue}: {self.message_count} {self.created_on}"
[docs] def to_dict(self): return { "queue": str(self.queue), "message_count": self.message_count, "created_on": self.created_on, }
[docs] class RunStatusManager(models.Manager):
[docs] def status(self, run_id, status_description): """ Returns all database entries for a given run and a given status message. :param run_id: DataRun object :param status_description: status message, as a string """ status_ids = StatusQueue.objects.filter(name__startswith=status_description) if len(status_ids) > 0: status_id = status_ids[0] return super(RunStatusManager, self).get_queryset().filter(run_id=run_id, queue_id=status_id) return []
[docs] def last_timestamp(self, run_id): """ Returns the last timestamp for this run :param run_id: DataRun object """ timestamps = super(RunStatusManager, self).get_queryset().filter(run_id=run_id).order_by("-created_on") if len(timestamps) > 0: return timestamps[0].created_on return None
[docs] def get_last_error(self, run_id): errors = super(RunStatusManager, self).get_queryset().filter(run_id=run_id).order_by("-created_on") for item in errors: if item.has_errors(): return item.last_error() return None
[docs] class RunStatus(models.Model): """ Map ActiveMQ messages, which have a header like this: headers: {'expires': '0', 'timestamp': '1344613053723', 'destination': '/queue/POSTPROCESS.DATA_READY', 'persistent': 'true', 'priority': '5', 'message-id': 'ID:mac83086.ornl.gov-59780-1344536680877-8:2:1:1:1'} """ # DataRun this run status belongs to run_id = models.ForeignKey(DataRun, on_delete=models.CASCADE) # Long name for this status queue_id = models.ForeignKey(StatusQueue, on_delete=models.CASCADE) # ActiveMQ message ID message_id = models.CharField(max_length=100, null=True) created_on = models.DateTimeField("Timestamp", auto_now_add=True) objects = RunStatusManager() class Meta: verbose_name_plural = "Run status" app_label = "report" def __str__(self): return "%s: %s" % (str(self.run_id), str(self.queue_id))
[docs] def last_info(self): """ Return the last available information object for this status """ info_list = Information.objects.filter(run_status_id=self) if len(info_list) > 0: return info_list[0] return None
[docs] def last_error(self): """ Return the last available error object for this status """ error_list = Error.objects.filter(run_status_id=self) if len(error_list) > 0: return error_list[0] return None
[docs] def has_errors(self): return Error.objects.filter(run_status_id=self).count() > 0
[docs] class WorkflowSummaryManager(models.Manager):
[docs] def incomplete(self): """ Returns the query set of all incomplete runs """ return super(WorkflowSummaryManager, self).get_queryset().filter(complete=False)
[docs] def get_summary(self, run_id): """ Get the run summary for a given DataRun object :param run_id: DataRun object """ run_list = super(WorkflowSummaryManager, self).get_queryset().filter(run_id=run_id) if len(run_list) > 0: return run_list[0] return None
[docs] class WorkflowSummary(models.Model): """ Overall status of the workflow for a given run """ run_id = models.OneToOneField(DataRun, on_delete=models.CASCADE) # Overall status of the workflow for this run complete = models.BooleanField(default=False) # Cataloging status catalog_started = models.BooleanField(default=False) cataloged = models.BooleanField(default=False) # Automated reduction status reduction_needed = models.BooleanField(default=True) reduction_started = models.BooleanField(default=False) reduced = models.BooleanField(default=False) reduction_cataloged = models.BooleanField(default=False) reduction_catalog_started = models.BooleanField(default=False) objects = WorkflowSummaryManager() class Meta: verbose_name_plural = "Workflow summaries" app_label = "report" def __str__(self): if self.complete is True: return "%s: complete" % str(self.run_id) else: return str(self.run_id)
[docs] def update(self): """ Update status according the messages received """ # We start with an incomplete state. If a run entry is present without # any action from the workflow manager, it is by definition incomplete. self.complete = False # Look for cataloging status if len(RunStatus.objects.status(self.run_id, "CATALOG.ONCAT.COMPLETE")) > 0: self.cataloged = True if len(RunStatus.objects.status(self.run_id, "CATALOG.ONCAT.STARTED")) > 0: self.catalog_started = True # Check whether we need reduction (default is no) if len(RunStatus.objects.status(self.run_id, "REDUCTION.NOT_NEEDED")) > 0: self.reduction_needed = False elif len(RunStatus.objects.status(self.run_id, "REDUCTION.DISABLED")) > 0: self.reduction_needed = False # Look for reduction status if len(RunStatus.objects.status(self.run_id, "REDUCTION.COMPLETE")) > 0: self.reduced = True if len(RunStatus.objects.status(self.run_id, "REDUCTION.STARTED")) > 0: self.reduction_started = True # Look for status of reduced data cataloging if len(RunStatus.objects.status(self.run_id, "REDUCTION_CATALOG.COMPLETE")) > 0: self.reduction_cataloged = True if len(RunStatus.objects.status(self.run_id, "REDUCTION_CATALOG.STARTED")) > 0: self.reduction_catalog_started = True # Determine overall status if self.cataloged is True: if self.reduction_needed is False or (self.reduced is True and self.reduction_cataloged is True): self.complete = True self.save()
[docs] class Error(models.Model): """ Details of a particular error event """ run_status_id = models.ForeignKey(RunStatus, on_delete=models.CASCADE) description = models.CharField(max_length=200, null=True) class Meta: app_label = "report"
[docs] class Information(models.Model): """ Extra information associated with a status update """ run_status_id = models.ForeignKey(RunStatus, on_delete=models.CASCADE) description = models.CharField(max_length=200, null=True) class Meta: app_label = "report"
[docs] class TaskManager(models.Manager):
[docs] def sql_dump(self): """ Get the object associated to an instrument name """ task_ids = super(TaskManager, self).get_queryset() sql = "" max_task_id = 1 for item in task_ids: max_task_id = max(max_task_id, item.id) sql += "INSERT INTO report_task(" sql += "id, instrument_id_id, input_queue_id_id, task_class) " sql += "VALUES (%d, " % item.id sql += "%d, " % item.instrument_id.id sql += "%d, " % item.input_queue_id.id sql += "'%s');\n" % item.task_class for q in item.task_queue_ids.all(): sql += "INSERT INTO report_task_task_queue_ids(" sql += "task_id, statusqueue_id) " sql += "VALUES (%d, " % item.id sql += "%d);\n" % q.id for q in item.success_queue_ids.all(): sql += "INSERT INTO report_task_success_queue_ids(" sql += "task_id, statusqueue_id) " sql += "VALUES (%d, " % item.id sql += "%d);\n" % q.id sql += "SELECT pg_catalog.setval('report_task_id_seq', %d, true);\n" % max_task_id return sql
[docs] class Task(models.Model): """ Define a task """ # Instrument ID instrument_id = models.ForeignKey(Instrument, on_delete=models.CASCADE) # Message queue that starts this task input_queue_id = models.ForeignKey(StatusQueue, on_delete=models.CASCADE) # Python class to be instantiated and run task_class = models.CharField(max_length=50, null=True, blank=True) # Output messages to be sent task_queue_ids = models.ManyToManyField(StatusQueue, related_name="_task_task_queue_ids+", blank=True) # Expected success messages from tasks # Map one-to-one with task queue IDs success_queue_ids = models.ManyToManyField(StatusQueue, related_name="_task_success_queue_ids+", blank=True) objects = TaskManager() class Meta: app_label = "report"
[docs] def task_queues(self): queues = "" for q in self.task_queue_ids.all(): queues += "%s; " % str(q) return queues
[docs] def success_queues(self): queues = "" for q in self.success_queue_ids.all(): queues += "%s; " % str(q) return queues
[docs] def json_encode(self): """ Encode the object as a JSON dictionary """ return json.dumps( { "instrument": self.instrument_id.name, "input_queue": str(self.input_queue_id), "task_class": self.task_class, "task_queues": [str(q) for q in self.task_queue_ids.all()], "success_queues": [str(q) for q in self.success_queue_ids.all()], } )
[docs] class InstrumentStatus(models.Model): """ Cache the latest information for each instrument. This can be used to quickly access status information. """ instrument_id = models.OneToOneField(Instrument, on_delete=models.CASCADE) last_run_id = models.ForeignKey(DataRun, null=True, on_delete=models.CASCADE) class Meta: verbose_name_plural = "Instrument status" app_label = "report"