Source code for lib.heartbeat

from threading import Thread, Timer

from requests import post
from requests.exceptions import ConnectionError, ConnectTimeout

from document.exec_env import Exec_Env_Document
from lib.http import HTTP_Status
from lib.token import create_token
from reader.arg import Arg_Reader
from utils.log import Log


[docs]def heartbeat(): """Heartbeat procedure with the LCPs.""" s = Exec_Env_Document.search() res = s[0:s.count()].execute() threads = [] for exec_env in res: if exec_env.lcp: t = Thread(target=heartbeat_exec_env, args=(exec_env,)) threads.append(t) t.start() for t in threads: t.join() t = Timer(Arg_Reader.db.hb_period, heartbeat) t.daemon = True t.start()
def heartbeat_exec_env(exec_env): log = Log.get('heartbeat') try: id = exec_env.meta.id lcp = exec_env.lcp lbl = f'{id} (LCP at {exec_env.hostname}:{lcp.port})' if exec_env.enabled: schema = 'https' if lcp.https else 'http' endpoint_lcp = '/' + exec_env.lcp.endpoint if exec_env.lcp.endpoint else '' resp = post(f'{schema}://{exec_env.hostname}:{lcp.port}{endpoint_lcp}/status', timeout=Arg_Reader.db.hb_timeout, headers={'Authorization': create_token()}, json={'id': id}) if resp.status_code == HTTP_Status.OK: data = resp.json() id = data.pop('id', None) lcp.started = data.get('started', None) lcp.last_heartbeat = data.get('last_heartbeat', None) log.success(f'Connection established with exec-env {lbl}') else: lcp.last_heartbeat = None log.warning(f'Connection reset with exec-env {lbl}') log.notice(f'Response: {resp.content}') if not lcp.https: lcp.https = False exec_env.save() else: log.notice(f'Exec-env {lbl} not enabled') except ConnectTimeout: log.error(f'Connection timeout with exec-env {lbl}') except ConnectionError: log.error(f'Connection refused with exec-env {lbl}') except Exception as exception: log.exception(f'Exception during connection with exec-env {lbl}', exception)