Source code for samlab.dashboard.service.notify

# Copyright 2018, National Technology & Engineering Solutions of Sandia, LLC
# (NTESS).  Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
# Government retains certain rights in this software.

import queue

import flask

from samlab.dashboard.server import application, socketio, require_auth, require_permissions


[docs]@application.route("/notify", methods=["POST"]) @require_auth def post_notify(): require_permissions(["read"]) def delayed_emit(delay, params): socketio.sleep(delay) socketio.emit("notify", params) if "delay" in flask.request.json: socketio.start_background_task(delayed_emit, flask.request.json.pop("delay"), flask.request.json) return flask.jsonify() socketio.emit("notify", flask.request.json) return flask.jsonify()
[docs]def emit(message, data=None): emit.queue.put((message, data))
emit.queue = queue.Queue()
[docs]def emit_loop(): while True: try: message, data = emit.queue.get_nowait() socketio.emit(message, data) except: socketio.sleep(0.1)
socketio.start_background_task(emit_loop)