"""
Status monitor utilities to support 'report' views
@author: M. Doucet, Oak Ridge National Laboratory
@copyright: 2014 Oak Ridge National Laboratory
"""
import datetime
import hashlib
import json
import logging
import re
import string
import requests
from django.conf import settings
from django.contrib import messages
from django.core.cache import cache
from django.db import connection, models, transaction
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse
from django.utils import formats, timezone
from django.utils.html import escape
import reporting.dasmon.view_util as dasmon_view_util
import reporting.reporting_app.view_util as reporting_view_util
from reporting.report.models import (
IPTS,
DataRun,
Error,
Instrument,
RunStatus,
StatusQueue,
StatusQueueMessageCount,
Task,
WorkflowSummary,
)
from reporting.users.view_util import is_instrument_staff
[docs]
def generate_key(instrument, run_id: int):
"""
Generate a secret key for a run on a given instrument
:param instrument: instrument name (string) or Instrument object
:param run_id: run number
"""
if not hasattr(settings, "LIVE_PLOT_SECRET_KEY"):
return None
secret_key = settings.LIVE_PLOT_SECRET_KEY
if len(secret_key) == 0:
return None
# Handle both string and Instrument object
instrument_name = instrument.name if hasattr(instrument, "name") else str(instrument)
return hashlib.sha1(f"{instrument_name.upper()}{secret_key}{run_id}".encode("utf-8")).hexdigest()
[docs]
def append_key(input_url, instrument, run_id):
"""
Append a live data secret key to a url
:param input_url: url to modify
:param instrument: instrument name
:param run_id: run number
"""
client_key = generate_key(instrument, run_id)
if client_key is None:
return input_url
# Determine whether this is the first query string argument of the url
delimiter = "&" if "/?" in input_url else "?"
return "%s%skey=%s" % (input_url, delimiter, client_key)
[docs]
def fill_template_values(request, **template_args):
"""
Fill the template argument items needed to populate
side bars and other satellite items on the pages.
Only the arguments common to all pages will be filled.
"""
if "instrument" not in template_args:
return template_args
instr = template_args["instrument"].lower()
# Get instrument
instrument_id = get_object_or_404(Instrument, name=instr)
# Get last experiment and last run
last_run_id = DataRun.objects.get_last_cached_run(instrument_id)
if last_run_id is None:
last_expt_id = IPTS.objects.get_last_ipts(instrument_id)
else:
last_expt_id = last_run_id.ipts_id
template_args["last_expt"] = last_expt_id
template_args["last_run"] = last_run_id
# Get base IPTS URL
base_ipts_url = reverse("report:ipts_summary", args=[instr, "0000"])
base_ipts_url = base_ipts_url.replace("/0000", "")
template_args["base_ipts_url"] = base_ipts_url
# Get base Run URL
base_run_url = reverse("report:detail", args=[instr, "0000"])
base_run_url = base_run_url.replace("/0000", "")
template_args["base_run_url"] = base_run_url
# Get run rate and error rate
r_rate, e_rate = retrieve_rates(instrument_id, last_run_id)
template_args["run_rate"] = str(r_rate)
template_args["error_rate"] = str(e_rate)
template_args = dasmon_view_util.fill_template_values(request, **template_args)
return template_args
[docs]
def needs_reduction(request, run_id):
"""
Determine whether we need a reduction link to
submit a run for automated reduction
:param request: HTTP request object
:param run_id: DataRun object
"""
# Get REDUCTION.DATA_READY queue
try:
red_queue = StatusQueue.objects.get(name="REDUCTION.DATA_READY")
except StatusQueue.DoesNotExist:
logging.exception("")
return False
# Check whether we have a task for this queue
tasks = Task.objects.filter(instrument_id=run_id.instrument_id, input_queue_id=red_queue)
if (
len(tasks) == 1
and (tasks[0].task_class is None or len(tasks[0].task_class) == 0)
and len(tasks[0].task_queue_ids.all()) == 0
):
return False
return True
[docs]
def send_processing_request(instrument_id, run_id, user=None, destination=None, is_complete=False):
"""
Send an AMQ message to the workflow manager to reprocess
the run
:param instrument_id: Instrument object
:param run_id: DataRun object
"""
if destination is None:
destination = "/queue/POSTPROCESS.DATA_READY"
# IPTS name
try:
ipts = run_id.ipts_id.expt_name.upper()
except: # noqa: E722
ipts = str(run_id.ipts_id)
# Verify that we have a file path.
# If not, look up the online catalog
file_path = run_id.file
if len(file_path) == 0:
from reporting.report.catalog import get_run_info
run_info = get_run_info(str(instrument_id), "", run_id.run_number)
for _file in run_info["data_files"]:
if _file.endswith("_event.nxs") or _file.endswith(".nxs.h5"):
file_path = _file
# If we don't have the IPTS, fill it in too
if len(ipts) == 0:
ipts = run_info["proposal"]
# Get facility from file path
toks = file_path.split("/")
facility_name = settings.FACILITY_INFO.get(str(instrument_id), "SNS")
if len(toks) > 1:
facility_name = toks[1].upper()
# Sanity check
if len(file_path) == 0 or len(ipts) == 0 or ipts is None:
logging.error("No catalog information for run %s: message not sent", run_id)
raise RuntimeError("Run %s not found in catalog" % str(run_id))
# Build up dictionary
data_dict = {
"facility": facility_name,
"instrument": str(instrument_id),
"ipts": ipts,
"run_number": run_id.run_number,
"data_file": file_path,
}
if is_complete is True:
data_dict["is_complete"] = "true"
if user is not None:
data_dict["information"] = "Requested by %s" % user
data = json.dumps(data_dict)
reporting_view_util.send_activemq_message(destination, data)
logging.info("Reduction requested: %s", str(data))
[docs]
def processing_request(request, instrument, run_id, destination):
"""
Process a request for post-processing
:param instrument: instrument name
:param run_id: run number [string]
:param destination: outgoing AMQ queue
"""
# Get instrument
instrument_id = get_object_or_404(Instrument, name=instrument.lower())
run_object = get_object_or_404(DataRun, instrument_id=instrument_id, run_number=run_id)
# Check if the user has permissions to submit a processing request
if is_instrument_staff(request, instrument_id):
try:
send_processing_request(instrument_id, run_object, request.user, destination=destination)
except RuntimeError as e:
# Catalog not found - the Nexus file hasn't been created yet
logging.warning("Run not ready for post-processing: %s", str(e))
messages.error(
request,
"This run cannot be submitted for post-processing yet because the data file is not available "
"in the catalog. Please wait for the run to complete and the file to be saved, or contact the "
"data acquisition team if this error persists.",
)
except Exception:
# Unexpected error - log and show generic message
logging.exception("Could not send post-processing request: %s", destination)
messages.error(
request,
"An unexpected error occurred while submitting the post-processing request. "
"Please try again or contact support if the problem persists.",
)
# return render(request, 'report/processing_request_failure.html', {})
return redirect(reverse("report:detail", args=[instrument, run_id]))
def __generate_empty_rates(n_hours):
runs = []
num_runs = 0 # there are no runs
for i in range(n_hours):
runs.append([-i, num_runs])
return runs
[docs]
def retrieve_rates(instrument_id, last_run_id):
"""
Retrieve the run rate and error rate for an instrument.
Try to get it from the cache if possible.
:param instrument_id: Instrument object
:param last_run_id: DataRun object
"""
n_hours = 24
last_run = None
if last_run_id is not None:
last_run = last_run_id.run_number
# Check whether we have a cached value and whether it is up to date
last_cached_run = cache.get("%s_rate_last_run" % instrument_id.name)
def _get_rate(id_name):
"""
Returns rate for runs or errors
:param id_name: 'run' or 'error'
"""
# do not bother with the cache if it is invalid
if last_cached_run is not None and last_run == last_cached_run:
# this should either return the cache information or None
return cache.get("%s_%s_rate" % (instrument_id.name, id_name))
else: # the cache is not valid
return None
runs = _get_rate("run")
errors = _get_rate("error")
# If we didn't find useful information in the cache, recalculate them
if runs is None or errors is None:
# check for any run in the last n-hours
time_oldest = timezone.now() - datetime.timedelta(hours=n_hours + 1) # is +1 needed?
# only calculate if there isn't a cache for runs
if runs is None:
# only query further if there are runs for the instrument
have_runs = bool(
DataRun.objects.filter(instrument_id=instrument_id, created_on__gte=time_oldest).count() > 0
)
if have_runs:
runs = run_rate(instrument_id, n_hours=n_hours)
else:
runs = __generate_empty_rates(n_hours)
# cache the run rate
cache.set("%s_run_rate" % instrument_id.name, runs, settings.RUN_RATE_CACHE_TIMEOUT)
# only calculate if there isn't a cache for errors
if errors is None:
have_errors = bool(
Error.objects.filter(
run_status_id__run_id__instrument_id=instrument_id, run_status_id__created_on__gte=time_oldest
).count()
> 0
)
if have_errors:
errors = error_rate(instrument_id, n_hours=n_hours)
else:
errors = __generate_empty_rates(n_hours)
# cache the run rate
cache.set("%s_error_rate" % instrument_id.name, errors, settings.RUN_RATE_CACHE_TIMEOUT)
# add the last run view for this instrument to the cache
cache.set("%s_rate_last_run" % instrument_id.name, last_run, settings.RUN_RATE_CACHE_TIMEOUT)
return runs, errors
[docs]
def run_rate(instrument_id, n_hours=24):
"""
Returns the rate of new runs for the last n_hours hours.
:param instrument_id: Instrument model object
:param n_hours: number of hours to track
"""
# Try calling the stored procedure (faster)
msg = "" # start with empty message to help with logging
try:
# Try calling the stored procedure (faster)
with transaction.atomic():
with connection.cursor() as cursor:
cursor.callproc("run_rate", (instrument_id.id,))
msg = cursor.fetchone()[0]
cursor.execute('FETCH ALL IN "%s"' % msg)
rows = cursor.fetchall()
return [[int(row[0]), int(row[1])] for row in rows]
except Exception:
logging.exception("Call to stored procedure run_rate(%s) failed", str(instrument_id))
logging.error("Running query from python")
# Do it by hand (slow)
time = timezone.now()
runs = []
running_sum = 0
for i in range(n_hours):
t_i = time - datetime.timedelta(hours=i + 1)
n = DataRun.objects.filter(instrument_id=instrument_id, created_on__gte=t_i).count()
n -= running_sum
running_sum += n
runs.append([-i, n])
return runs
except SystemExit:
if msg: # skip empty string
logging.error("message returned from fetchone: %s", str(msg))
logging.exception("Call to stored procedure error_rate(%s) created system exit", str(instrument_id))
raise
[docs]
def error_rate(instrument_id, n_hours=24):
"""
Returns the rate of errors for the last n_hours hours.
:param instrument_id: Instrument model object
:param n_hours: number of hours to track
"""
# Try calling the stored procedure (faster)
msg = "" # start with empty message to help with logging
try:
# Try calling the stored procedure (faster)
with transaction.atomic():
with connection.cursor() as cursor:
cursor.callproc("error_rate", (instrument_id.id,))
msg = cursor.fetchone()[0]
cursor.execute('FETCH ALL IN "%s"' % msg)
rows = cursor.fetchall()
return [[int(row[0]), int(row[1])] for row in rows]
except Exception:
logging.exception("Call to stored procedure error_rate(%s) failed", str(instrument_id))
logging.error("Running query from python")
# Do it by hand (slow)
time = timezone.now()
errors = []
running_sum = 0
for i in range(n_hours):
t_i = time - datetime.timedelta(hours=i + 1)
n = Error.objects.filter(
run_status_id__run_id__instrument_id=instrument_id,
run_status_id__created_on__gte=t_i,
).count()
n -= running_sum
running_sum += n
errors.append([-i, n])
return errors
except SystemExit:
if msg: # skip empty string
logging.error("message returned from fetchone: %s", str(msg))
logging.exception("Call to stored procedure error_rate(%s) created system exit", str(instrument_id))
raise
[docs]
def get_current_status(instrument_id):
"""
Get current status information such as the last
experiment/run for a given instrument.
Used to populate AJAX response, so must not contain Model objects
:param instrument_id: Instrument model object
"""
# Get last experiment and last run
last_run_id = DataRun.objects.get_last_cached_run(instrument_id)
if last_run_id is None:
last_expt_id = IPTS.objects.get_last_ipts(instrument_id)
else:
last_expt_id = last_run_id.ipts_id
r_rate, e_rate = retrieve_rates(instrument_id, last_run_id)
data_dict = {"run_rate": r_rate, "error_rate": e_rate}
if last_run_id is not None:
localtime = timezone.localtime(last_run_id.created_on)
data_dict["last_run_id"] = last_run_id.id
data_dict["last_run"] = last_run_id.run_number
data_dict["last_run_time"] = formats.localize(localtime)
if last_expt_id is not None:
data_dict["last_expt_id"] = last_expt_id.id
data_dict["last_expt"] = last_expt_id.expt_name.upper()
return data_dict
[docs]
def is_acquisition_complete(run_id):
"""
Determine whether the acquisition is complete and post-processing
has started
:param run_id: run object
"""
return RunStatus.objects.filter(run_id=run_id, queue_id__name="POSTPROCESS.DATA_READY").count() > 0
[docs]
def get_post_processing_status(red_timeout=0.25, yellow_timeout=120):
"""
Get the health status of post-processing services
:param red_timeout: number of hours before declaring a process dead
:param yellow_timeout: number of seconds before declaring a process slow
"""
# The cataloging and reduction status is more confusing than anything,
# so we are phasing it out.
return {"catalog": 0, "reduction": 0}
[docs]
def get_run_status_text_dict(run_list, use_element_id=False):
"""
Return a dictionary of run status texts {run_id: status}
"""
complete = dict(WorkflowSummary.objects.filter(run_id__in=run_list).values_list("run_id", "complete"))
acquisition_complete = (
RunStatus.objects.filter(run_id__in=run_list, queue_id__name="POSTPROCESS.DATA_READY")
.values_list("run_id", flat=True)
.distinct()
)
errors = (
Error.objects.filter(run_status_id__run_id__in=run_list)
.values_list("run_status_id__run_id", flat=True)
.distinct()
)
run_statuses = {}
for r in run_list:
status = "unknown"
if use_element_id:
element_id = "id='run_id_%s'" % r.id
else:
element_id = ""
if r.id not in acquisition_complete:
status = "<span %s>acquiring</span>" % element_id
elif complete.get(r.id, False) is True:
status = "<span %s class='green'>complete</span>" % element_id
else:
if r.id in errors:
status = "<span %s class='red'><b>error</b></span>" % element_id
else:
status = "<span %s class='red'>incomplete</span>" % element_id
run_statuses[r.id] = status
return run_statuses
[docs]
def get_run_list_dict(run_list):
"""
Get a list of run object and transform it into a list of
dictionaries that can be used to fill a table.
:param run_list: list of run object (usually a QuerySet)
"""
run_dicts = []
# return early if provided an empty list
if len(run_list) == 0:
return run_dicts
run_status_text_dict = get_run_status_text_dict(run_list, use_element_id=True)
try:
for r in run_list:
if r.id not in run_status_text_dict:
continue
localtime = timezone.localtime(r.created_on)
run_url = reverse("report:detail", args=[str(r.instrument_id), r.run_number])
reduce_url = reverse("report:submit_for_reduction", args=[str(r.instrument_id), r.run_number])
instr_url = reverse("dasmon:live_runs", args=[str(r.instrument_id)])
# Format run_title with truncation and tooltip
run_title_display = ""
if r.run_title:
# Use the same pruning function used in DASMON views
pruned_title = dasmon_view_util._prune_title_string(r.run_title)
# Truncate long titles for display with escaped HTML
if len(r.run_title) > 50:
truncated = r.run_title[:47] + "..."
# Escape both the title attribute and the content to prevent XSS
run_title_display = f'<span title="{escape(r.run_title)}">{escape(truncated)}</span>'
else:
run_title_display = escape(pruned_title)
# Construct ONCat link
oncat_link = ""
if hasattr(settings, "CATALOG_URL") and settings.CATALOG_URL:
# FACILITY_INFO is keyed by lowercase instrument names
instrument_name = str(r.instrument_id)
facility = settings.FACILITY_INFO.get(instrument_name.lower(), "SNS")
# Use web interface format: /runs/SNS/ARCS/IPTS-36590/343513
oncat_url = (
f"{settings.CATALOG_URL}/runs/{facility}/"
f"{instrument_name.upper()}/"
f"{str(r.ipts_id).upper()}/{r.run_number}"
)
oncat_link = f'<a href="{oncat_url}" target="_blank" rel="noopener noreferrer">View</a>'
run_dicts.append(
{
"instrument_id": str("<a href='%s'>%s</a>" % (instr_url, str(r.instrument_id))),
"run": str("<a href='%s'>%s</a>" % (run_url, r.run_number)),
"reduce_url": str(
"<a id='reduce_%s' href='javascript:void(0);' onClick='$.ajax({ url: \"%s\", cache: false }); $(\"#reduce_%s\").remove();'>reduce</a>" # noqa: E501
% (r.run_number, reduce_url, r.run_number)
),
"run_id": r.id,
"timestamp": formats.localize(localtime),
"status": run_status_text_dict.get(r.id, "unknown"),
"run_title": run_title_display,
"oncat_link": oncat_link,
}
)
except Exception:
logging.exception("report.view_util.get_run_list_dict: %s", str(run_list))
except SystemExit:
logging.exception("report.view_util.get_run_list_dict SystemExit: %s", str(run_list))
raise
return run_dicts
[docs]
def get_plot_template_dict(run_object=None, instrument=None, run_id=None):
"""
Get template dictionary for plots
:param run_object: DataRun object
:param instrument: instrument name
:param run_id: run_number
"""
plot_dict = {
"html_data": None,
"update_url": None,
"key": generate_key(instrument, run_id),
}
url_template = string.Template(settings.LIVE_DATA_SERVER)
live_data_url = url_template.substitute(instrument=instrument, run_number=run_id)
live_data_url = f"https://{settings.LIVE_DATA_SERVER_DOMAIN}:{settings.LIVE_DATA_SERVER_PORT}{live_data_url}"
html_data = get_plot_data_from_server(instrument, run_id)
if html_data is not None:
plot_dict["html_data"] = html_data
plot_dict["update_url"] = append_key(live_data_url + "/html/", instrument, run_id)
if extract_ascii_from_div(html_data) is not None:
plot_dict["data_url"] = reverse("report:download_reduced_data", args=[instrument, run_id])
return plot_dict
[docs]
def get_plot_data_from_server(instrument, run_id):
"""
Get html data from the live data server
:param instrument: instrument name
:param run_id: run number
"""
html_data = None
try:
url_template = string.Template(settings.LIVE_DATA_SERVER)
live_data_url = url_template.substitute(instrument=instrument, run_number=run_id)
live_data_url = (
f"https://{settings.LIVE_DATA_SERVER_DOMAIN}:{settings.LIVE_DATA_SERVER_PORT}{live_data_url}/html/"
)
live_data_url = append_key(live_data_url, instrument, run_id)
data_request = requests.get(live_data_url, timeout=1.5)
if data_request.status_code == 200:
html_data = data_request.text
except: # noqa: E722
logging.exception("Could not pull data from live data server: %s", str(instrument))
return html_data
[docs]
def find_skipped_runs(instrument_id, start_run_number=0):
"""
Find run numbers that were skipped for a given instrument
:param instrument_id: Instrument object
:param start_run_number: run number to start from
"""
missing_runs = []
try:
last_run_id = DataRun.objects.get_last_run(instrument_id)
last_run_number = last_run_id.run_number
# Use a reasonable default for the start run number
if start_run_number == 0:
start_run_number = max(0, last_run_number - 1000)
for i in range(start_run_number, last_run_number):
query_set = DataRun.objects.filter(instrument_id=instrument_id, run_number=i)
if len(query_set) == 0:
missing_runs.append(i)
except: # noqa: E722
logging.exception("Error finding missing runs: %s", str(instrument_id))
return missing_runs
[docs]
def reduction_queue_sizes():
"""
Get the size of the message queues
"""
queue_sizes = []
for q in StatusQueueMessageCount.objects.values_list("queue_id").distinct():
queue_sizes.append(StatusQueueMessageCount.objects.filter(queue_id=q[0]).latest("created_on").to_dict())
return queue_sizes
[docs]
def get_experiment_list(instrument_id, offset, limit, order_column, reverse_dir):
"""
Get a list of experiments for a given instrument
"""
# Get list of IPTS
ipts = IPTS.objects.filter(instruments=instrument_id)
instrument = str(instrument_id)
count = ipts.count()
# order by column
if order_column == "number_of_runs":
ipts = ipts.annotate(num_runs=models.Count("datarun")).order_by("num_runs")
else:
ipts = ipts.order_by(order_column)
if reverse_dir:
ipts = ipts.reverse()
ipts = ipts[offset : offset + limit] # noqa E203
expt_list = []
for expt in ipts:
localtime = timezone.localtime(expt.created_on)
expt_list.append(
{
"experiment": str(
"<a href='%s'>%s</a>"
% (
reverse("report:ipts_summary", args=[instrument, expt.expt_name]),
expt.expt_name,
)
),
"total": expt.number_of_runs(),
"created_on": formats.localize(localtime),
}
)
return expt_list, count
[docs]
def get_error_list(instrument_id, offset, limit):
"""
Get a list of errors for a given instrument in the last 30 days
"""
instrument = str(instrument_id)
time_period = 30
delta_time = datetime.timedelta(days=time_period)
oldest_time = timezone.now() - delta_time
error_query = (
Error.objects.filter(
run_status_id__created_on__gte=oldest_time,
run_status_id__run_id__instrument_id=instrument_id,
)
.order_by("run_status_id__created_on")
.reverse()
)
count = error_query.count()
error_query = error_query[offset : offset + limit] # noqa E203
error_list = []
for err in error_query:
localtime = timezone.localtime(err.run_status_id.created_on)
error_list.append(
{
"experiment": str(
"<a href='%s'>%s</a>"
% (
reverse(
"report:ipts_summary",
args=[
instrument,
err.run_status_id.run_id.ipts_id.expt_name,
],
),
err.run_status_id.run_id.ipts_id.expt_name,
)
),
"run": str(
"<a href='%s'>%s</a>"
% (
reverse(
"report:detail",
args=[instrument, err.run_status_id.run_id.run_number],
),
err.run_status_id.run_id.run_number,
)
),
"info": str(err.description),
"created_on": formats.localize(localtime),
}
)
return error_list, count