Source code for reporting.report.view_util

"""
Status monitor utilities to support 'report' views

@author: M. Doucet, Oak Ridge National Laboratory
@copyright: 2014 Oak Ridge National Laboratory
"""

import datetime
import hashlib
import json
import logging
import re
import string

import requests
from django.conf import settings
from django.core.cache import cache
from django.db import connection, models, transaction
from django.http import HttpResponseServerError
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse
from django.utils import formats, timezone

import reporting.dasmon.view_util as dasmon_view_util
import reporting.reporting_app.view_util as reporting_view_util
from reporting.report.models import (
    IPTS,
    DataRun,
    Error,
    Instrument,
    RunStatus,
    StatusQueue,
    StatusQueueMessageCount,
    Task,
    WorkflowSummary,
)
from reporting.users.view_util import is_instrument_staff


[docs] def generate_key(instrument, run_id: int): """ Generate a secret key for a run on a given instrument :param instrument: instrument name (string) or Instrument object :param run_id: run number """ if not hasattr(settings, "LIVE_PLOT_SECRET_KEY"): return None secret_key = settings.LIVE_PLOT_SECRET_KEY if len(secret_key) == 0: return None # Handle both string and Instrument object instrument_name = instrument.name if hasattr(instrument, "name") else str(instrument) return hashlib.sha1(f"{instrument_name.upper()}{secret_key}{run_id}".encode("utf-8")).hexdigest()
[docs] def append_key(input_url, instrument, run_id): """ Append a live data secret key to a url :param input_url: url to modify :param instrument: instrument name :param run_id: run number """ client_key = generate_key(instrument, run_id) if client_key is None: return input_url # Determine whether this is the first query string argument of the url delimiter = "&" if "/?" in input_url else "?" return "%s%skey=%s" % (input_url, delimiter, client_key)
[docs] def fill_template_values(request, **template_args): """ Fill the template argument items needed to populate side bars and other satellite items on the pages. Only the arguments common to all pages will be filled. """ if "instrument" not in template_args: return template_args instr = template_args["instrument"].lower() # Get instrument instrument_id = get_object_or_404(Instrument, name=instr) # Get last experiment and last run last_run_id = DataRun.objects.get_last_cached_run(instrument_id) if last_run_id is None: last_expt_id = IPTS.objects.get_last_ipts(instrument_id) else: last_expt_id = last_run_id.ipts_id template_args["last_expt"] = last_expt_id template_args["last_run"] = last_run_id # Get base IPTS URL base_ipts_url = reverse("report:ipts_summary", args=[instr, "0000"]) base_ipts_url = base_ipts_url.replace("/0000", "") template_args["base_ipts_url"] = base_ipts_url # Get base Run URL base_run_url = reverse("report:detail", args=[instr, "0000"]) base_run_url = base_run_url.replace("/0000", "") template_args["base_run_url"] = base_run_url # Get run rate and error rate r_rate, e_rate = retrieve_rates(instrument_id, last_run_id) template_args["run_rate"] = str(r_rate) template_args["error_rate"] = str(e_rate) template_args = dasmon_view_util.fill_template_values(request, **template_args) return template_args
[docs] def needs_reduction(request, run_id): """ Determine whether we need a reduction link to submit a run for automated reduction :param request: HTTP request object :param run_id: DataRun object """ # Get REDUCTION.DATA_READY queue try: red_queue = StatusQueue.objects.get(name="REDUCTION.DATA_READY") except StatusQueue.DoesNotExist: logging.exception("") return False # Check whether we have a task for this queue tasks = Task.objects.filter(instrument_id=run_id.instrument_id, input_queue_id=red_queue) if ( len(tasks) == 1 and (tasks[0].task_class is None or len(tasks[0].task_class) == 0) and len(tasks[0].task_queue_ids.all()) == 0 ): return False return True
[docs] def send_processing_request(instrument_id, run_id, user=None, destination=None, is_complete=False): """ Send an AMQ message to the workflow manager to reprocess the run :param instrument_id: Instrument object :param run_id: DataRun object """ if destination is None: destination = "/queue/POSTPROCESS.DATA_READY" # IPTS name try: ipts = run_id.ipts_id.expt_name.upper() except: # noqa: E722 ipts = str(run_id.ipts_id) # Verify that we have a file path. # If not, look up the online catalog file_path = run_id.file if len(file_path) == 0: from report.catalog import get_run_info run_info = get_run_info(str(instrument_id), "", run_id.run_number) for _file in run_info["data_files"]: if _file.endswith("_event.nxs") or _file.endswith(".nxs.h5"): file_path = _file # If we don't have the IPTS, fill it in too if len(ipts) == 0: ipts = run_info["proposal"] # Get facility from file path toks = file_path.split("/") facility_name = settings.FACILITY_INFO.get(str(instrument_id), "SNS") if len(toks) > 1: facility_name = toks[1].upper() # Sanity check if len(file_path) == 0 or len(ipts) == 0 or ipts is None: logging.error("No catalog information for run %s: message not sent", run_id) raise RuntimeError("Run %s not found in catalog" % str(run_id)) # Build up dictionary data_dict = { "facility": facility_name, "instrument": str(instrument_id), "ipts": ipts, "run_number": run_id.run_number, "data_file": file_path, } if is_complete is True: data_dict["is_complete"] = "true" if user is not None: data_dict["information"] = "Requested by %s" % user data = json.dumps(data_dict) reporting_view_util.send_activemq_message(destination, data) logging.info("Reduction requested: %s", str(data))
[docs] def processing_request(request, instrument, run_id, destination): """ Process a request for post-processing :param instrument: instrument name :param run_id: run number [string] :param destination: outgoing AMQ queue """ # Get instrument instrument_id = get_object_or_404(Instrument, name=instrument.lower()) run_object = get_object_or_404(DataRun, instrument_id=instrument_id, run_number=run_id) # Check if the user has permissions to submit a processing request if is_instrument_staff(request, instrument_id): try: send_processing_request(instrument_id, run_object, request.user, destination=destination) except: # noqa: E722 logging.error("Could not send post-processing request: %s", destination) logging.exception("") return HttpResponseServerError() # return render(request, 'report/processing_request_failure.html', {}) return redirect(reverse("report:detail", args=[instrument, run_id]))
def __generate_empty_rates(n_hours): runs = [] num_runs = 0 # there are no runs for i in range(n_hours): runs.append([-i, num_runs]) return runs
[docs] def retrieve_rates(instrument_id, last_run_id): """ Retrieve the run rate and error rate for an instrument. Try to get it from the cache if possible. :param instrument_id: Instrument object :param last_run_id: DataRun object """ n_hours = 24 last_run = None if last_run_id is not None: last_run = last_run_id.run_number # Check whether we have a cached value and whether it is up to date last_cached_run = cache.get("%s_rate_last_run" % instrument_id.name) def _get_rate(id_name): """ Returns rate for runs or errors :param id_name: 'run' or 'error' """ # do not bother with the cache if it is invalid if last_cached_run is not None and last_run == last_cached_run: # this should either return the cache information or None return cache.get("%s_%s_rate" % (instrument_id.name, id_name)) else: # the cache is not valid return None runs = _get_rate("run") errors = _get_rate("error") # If we didn't find useful information in the cache, recalculate them if runs is None or errors is None: # check for any run in the last n-hours time_oldest = timezone.now() - datetime.timedelta(hours=n_hours + 1) # is +1 needed? # only calculate if there isn't a cache for runs if runs is None: # only query further if there are runs for the instrument have_runs = bool( DataRun.objects.filter(instrument_id=instrument_id, created_on__gte=time_oldest).count() > 0 ) if have_runs: runs = run_rate(instrument_id, n_hours=n_hours) else: runs = __generate_empty_rates(n_hours) # cache the run rate cache.set("%s_run_rate" % instrument_id.name, runs, settings.RUN_RATE_CACHE_TIMEOUT) # only calculate if there isn't a cache for errors if errors is None: have_errors = bool( Error.objects.filter( run_status_id__run_id__instrument_id=instrument_id, run_status_id__created_on__gte=time_oldest ).count() > 0 ) if have_errors: errors = error_rate(instrument_id, n_hours=n_hours) else: errors = __generate_empty_rates(n_hours) # cache the run rate cache.set("%s_error_rate" % instrument_id.name, errors, settings.RUN_RATE_CACHE_TIMEOUT) # add the last run view for this instrument to the cache cache.set("%s_rate_last_run" % instrument_id.name, last_run, settings.RUN_RATE_CACHE_TIMEOUT) return runs, errors
[docs] def run_rate(instrument_id, n_hours=24): """ Returns the rate of new runs for the last n_hours hours. :param instrument_id: Instrument model object :param n_hours: number of hours to track """ # Try calling the stored procedure (faster) msg = "" # start with empty message to help with logging try: # Try calling the stored procedure (faster) with transaction.atomic(): with connection.cursor() as cursor: cursor.callproc("run_rate", (instrument_id.id,)) msg = cursor.fetchone()[0] cursor.execute('FETCH ALL IN "%s"' % msg) rows = cursor.fetchall() return [[int(row[0]), int(row[1])] for row in rows] except Exception: logging.exception("Call to stored procedure run_rate(%s) failed", str(instrument_id)) logging.error("Running query from python") # Do it by hand (slow) time = timezone.now() runs = [] running_sum = 0 for i in range(n_hours): t_i = time - datetime.timedelta(hours=i + 1) n = DataRun.objects.filter(instrument_id=instrument_id, created_on__gte=t_i).count() n -= running_sum running_sum += n runs.append([-i, n]) return runs except SystemExit: if msg: # skip empty string logging.error("message returned from fetchone: %s", str(msg)) logging.exception("Call to stored procedure error_rate(%s) created system exit", str(instrument_id)) raise
[docs] def error_rate(instrument_id, n_hours=24): """ Returns the rate of errors for the last n_hours hours. :param instrument_id: Instrument model object :param n_hours: number of hours to track """ # Try calling the stored procedure (faster) msg = "" # start with empty message to help with logging try: # Try calling the stored procedure (faster) with transaction.atomic(): with connection.cursor() as cursor: cursor.callproc("error_rate", (instrument_id.id,)) msg = cursor.fetchone()[0] cursor.execute('FETCH ALL IN "%s"' % msg) rows = cursor.fetchall() return [[int(row[0]), int(row[1])] for row in rows] except Exception: logging.exception("Call to stored procedure error_rate(%s) failed", str(instrument_id)) logging.error("Running query from python") # Do it by hand (slow) time = timezone.now() errors = [] running_sum = 0 for i in range(n_hours): t_i = time - datetime.timedelta(hours=i + 1) n = Error.objects.filter( run_status_id__run_id__instrument_id=instrument_id, run_status_id__created_on__gte=t_i, ).count() n -= running_sum running_sum += n errors.append([-i, n]) return errors except SystemExit: if msg: # skip empty string logging.error("message returned from fetchone: %s", str(msg)) logging.exception("Call to stored procedure error_rate(%s) created system exit", str(instrument_id)) raise
[docs] def get_current_status(instrument_id): """ Get current status information such as the last experiment/run for a given instrument. Used to populate AJAX response, so must not contain Model objects :param instrument_id: Instrument model object """ # Get last experiment and last run last_run_id = DataRun.objects.get_last_cached_run(instrument_id) if last_run_id is None: last_expt_id = IPTS.objects.get_last_ipts(instrument_id) else: last_expt_id = last_run_id.ipts_id r_rate, e_rate = retrieve_rates(instrument_id, last_run_id) data_dict = {"run_rate": r_rate, "error_rate": e_rate} if last_run_id is not None: localtime = timezone.localtime(last_run_id.created_on) data_dict["last_run_id"] = last_run_id.id data_dict["last_run"] = last_run_id.run_number data_dict["last_run_time"] = formats.localize(localtime) if last_expt_id is not None: data_dict["last_expt_id"] = last_expt_id.id data_dict["last_expt"] = last_expt_id.expt_name.upper() return data_dict
[docs] def is_acquisition_complete(run_id): """ Determine whether the acquisition is complete and post-processing has started :param run_id: run object """ return RunStatus.objects.filter(run_id=run_id, queue_id__name="POSTPROCESS.DATA_READY").count() > 0
[docs] def get_post_processing_status(red_timeout=0.25, yellow_timeout=120): """ Get the health status of post-processing services :param red_timeout: number of hours before declaring a process dead :param yellow_timeout: number of seconds before declaring a process slow """ # The cataloging and reduction status is more confusing than anything, # so we are phasing it out. return {"catalog": 0, "reduction": 0}
[docs] def get_run_status_text_dict(run_list, use_element_id=False): """ Return a dictionary of run status texts {run_id: status} """ complete = dict(WorkflowSummary.objects.filter(run_id__in=run_list).values_list("run_id", "complete")) acquisition_complete = ( RunStatus.objects.filter(run_id__in=run_list, queue_id__name="POSTPROCESS.DATA_READY") .values_list("run_id", flat=True) .distinct() ) errors = ( Error.objects.filter(run_status_id__run_id__in=run_list) .values_list("run_status_id__run_id", flat=True) .distinct() ) run_statuses = {} for r in run_list: status = "unknown" if use_element_id: element_id = "id='run_id_%s'" % r.id else: element_id = "" if r.id not in acquisition_complete: status = "<span %s>acquiring</span>" % element_id elif complete.get(r.id, False) is True: status = "<span %s class='green'>complete</span>" % element_id else: if r.id in errors: status = "<span %s class='red'><b>error</b></span>" % element_id else: status = "<span %s class='red'>incomplete</span>" % element_id run_statuses[r.id] = status return run_statuses
[docs] def get_run_list_dict(run_list): """ Get a list of run object and transform it into a list of dictionaries that can be used to fill a table. :param run_list: list of run object (usually a QuerySet) """ run_dicts = [] # return early if provided an empty list if len(run_list) == 0: return run_dicts run_status_text_dict = get_run_status_text_dict(run_list, use_element_id=True) try: for r in run_list: if r.id not in run_status_text_dict: continue localtime = timezone.localtime(r.created_on) run_url = reverse("report:detail", args=[str(r.instrument_id), r.run_number]) reduce_url = reverse("report:submit_for_reduction", args=[str(r.instrument_id), r.run_number]) instr_url = reverse("dasmon:live_runs", args=[str(r.instrument_id)]) run_dicts.append( { "instrument_id": str("<a href='%s'>%s</a>" % (instr_url, str(r.instrument_id))), "run": str("<a href='%s'>%s</a>" % (run_url, r.run_number)), "reduce_url": str( "<a id='reduce_%s' href='javascript:void(0);' onClick='$.ajax({ url: \"%s\", cache: false }); $(\"#reduce_%s\").remove();'>reduce</a>" # noqa: E501 % (r.run_number, reduce_url, r.run_number) ), "run_id": r.id, "timestamp": formats.localize(localtime), "status": run_status_text_dict.get(r.id, "unknown"), } ) except Exception: logging.exception("report.view_util.get_run_list_dict: %s", str(run_list)) except SystemExit: logging.exception("report.view_util.get_run_list_dict SystemExit: %s", str(run_list)) raise return run_dicts
[docs] def extract_ascii_from_div(html_data, trace_id=None): """ Extract data from an plot <div>. Only returns the first one it finds. :param html_data: <div> string #TODO: allow to specify which trace to return in cases where we have multiple curves """ if isinstance(html_data, bytes): html_data = html_data.decode() try: result = re.search(r"newPlot\((.*)\).*</script>", html_data, re.DOTALL) jsondata_str = "[%s]" % result.group(1).replace("'", '"') data_list = json.loads(jsondata_str) ascii_data = "" for d in data_list: if isinstance(d, list): # Only allow a single trace if trace_id is None and len(d) > 1: logging.debug("Multiple traces found, and no ID was specified") return None for trace in d: if "type" in trace and trace["type"] == "scatter": x = trace["x"] y = trace["y"] dx = [0] * len(x) dy = [0] * len(y) if "error_x" in trace and "array" in trace["error_x"]: dx = trace["error_x"]["array"] if "error_y" in trace and "array" in trace["error_y"]: dy = trace["error_y"]["array"] break for i in range(len(x)): ascii_data += "%g %g %g %g\n" % (x[i], y[i], dy[i], dx[i]) return ascii_data except: # noqa: E722 # Unable to extract data from <div> logging.debug("Unable to extract data from <div>:", exc_info=True) return None
[docs] def get_plot_template_dict(run_object=None, instrument=None, run_id=None): """ Get template dictionary for plots :param run_object: DataRun object :param instrument: instrument name :param run_id: run_number """ plot_dict = { "html_data": None, "update_url": None, "key": generate_key(instrument, run_id), } url_template = string.Template(settings.LIVE_DATA_SERVER) live_data_url = url_template.substitute(instrument=instrument, run_number=run_id) live_data_url = f"https://{settings.LIVE_DATA_SERVER_DOMAIN}:{settings.LIVE_DATA_SERVER_PORT}{live_data_url}" html_data = get_plot_data_from_server(instrument, run_id) if html_data is not None: plot_dict["html_data"] = html_data plot_dict["update_url"] = append_key(live_data_url + "/html/", instrument, run_id) if extract_ascii_from_div(html_data) is not None: plot_dict["data_url"] = reverse("report:download_reduced_data", args=[instrument, run_id]) return plot_dict
[docs] def get_plot_data_from_server(instrument, run_id): """ Get html data from the live data server :param instrument: instrument name :param run_id: run number """ html_data = None try: url_template = string.Template(settings.LIVE_DATA_SERVER) live_data_url = url_template.substitute(instrument=instrument, run_number=run_id) live_data_url = ( f"https://{settings.LIVE_DATA_SERVER_DOMAIN}:{settings.LIVE_DATA_SERVER_PORT}{live_data_url}/html/" ) live_data_url = append_key(live_data_url, instrument, run_id) data_request = requests.get(live_data_url, timeout=1.5) if data_request.status_code == 200: html_data = data_request.text except: # noqa: E722 logging.exception("Could not pull data from live data server: %s", str(instrument)) return html_data
[docs] def find_skipped_runs(instrument_id, start_run_number=0): """ Find run numbers that were skipped for a given instrument :param instrument_id: Instrument object :param start_run_number: run number to start from """ missing_runs = [] try: last_run_id = DataRun.objects.get_last_run(instrument_id) last_run_number = last_run_id.run_number # Use a reasonable default for the start run number if start_run_number == 0: start_run_number = max(0, last_run_number - 1000) for i in range(start_run_number, last_run_number): query_set = DataRun.objects.filter(instrument_id=instrument_id, run_number=i) if len(query_set) == 0: missing_runs.append(i) except: # noqa: E722 logging.exception("Error finding missing runs: %s", str(instrument_id)) return missing_runs
[docs] def reduction_queue_sizes(): """ Get the size of the message queues """ queue_sizes = [] for q in StatusQueueMessageCount.objects.values_list("queue_id").distinct(): queue_sizes.append(StatusQueueMessageCount.objects.filter(queue_id=q[0]).latest("created_on").to_dict()) return queue_sizes
[docs] def get_experiment_list(instrument_id, offset, limit, order_column, reverse_dir): """ Get a list of experiments for a given instrument """ # Get list of IPTS ipts = IPTS.objects.filter(instruments=instrument_id) instrument = str(instrument_id) count = ipts.count() # order by column if order_column == "number_of_runs": ipts = ipts.annotate(num_runs=models.Count("datarun")).order_by("num_runs") else: ipts = ipts.order_by(order_column) if reverse_dir: ipts = ipts.reverse() ipts = ipts[offset : offset + limit] # noqa E203 expt_list = [] for expt in ipts: localtime = timezone.localtime(expt.created_on) expt_list.append( { "experiment": str( "<a href='%s'>%s</a>" % ( reverse("report:ipts_summary", args=[instrument, expt.expt_name]), expt.expt_name, ) ), "total": expt.number_of_runs(), "created_on": formats.localize(localtime), } ) return expt_list, count
[docs] def get_error_list(instrument_id, offset, limit): """ Get a list of errors for a given instrument in the last 30 days """ instrument = str(instrument_id) time_period = 30 delta_time = datetime.timedelta(days=time_period) oldest_time = timezone.now() - delta_time error_query = ( Error.objects.filter( run_status_id__created_on__gte=oldest_time, run_status_id__run_id__instrument_id=instrument_id, ) .order_by("run_status_id__created_on") .reverse() ) count = error_query.count() error_query = error_query[offset : offset + limit] # noqa E203 error_list = [] for err in error_query: localtime = timezone.localtime(err.run_status_id.created_on) error_list.append( { "experiment": str( "<a href='%s'>%s</a>" % ( reverse( "report:ipts_summary", args=[ instrument, err.run_status_id.run_id.ipts_id.expt_name, ], ), err.run_status_id.run_id.ipts_id.expt_name, ) ), "run": str( "<a href='%s'>%s</a>" % ( reverse( "report:detail", args=[instrument, err.run_status_id.run_id.run_number], ), err.run_status_id.run_id.run_number, ) ), "info": str(err.description), "created_on": formats.localize(localtime), } ) return error_list, count