#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json from operator import concat from typing import Any, Dict, Generator, Union from cmk.base.api.agent_based.checking_classes import IgnoreResults, IgnoreResultsError from cmk.base.api.agent_based.type_defs import Parameters from typing import Any, Dict, Generator, Iterable from cmk.base.api.agent_based.type_defs import Parameters, StringTable from cmk.base.plugins.agent_based.agent_based_api.v1 import ( register, HostLabel ) from cmk.base.plugins.agent_based.agent_based_api.v1 import ( register, render, Result, Metric, State, Service, ServiceLabel, check_levels, get_value_store, ) # # Section Plugin # def parse_jobwatch(string_table: StringTable) -> Dict: #print("Agent-PARSE", string_table) js = "" for ln in string_table: js += "".join(ln) data = json.loads(js) res = {} for user in data: uid = user.get("user_id") if not uid: continue entries = {} for e in user.get("entries", []): state = e.get("state") if not state: continue job_full_id = state.get("job_id") if not job_full_id: continue ji = state.get("job_inst_id") if ji: job_full_id += "_" +ji entries[job_full_id] = { "state": state, "log_entries": state.get("log_entries", []) } res[uid] = entries #print(res) return res # return a parsed section register.agent_section( name="jobwatch", parse_function=parse_jobwatch, ) # # Check Plugin # def discover_jobwatch(section: Any) -> Generator[Service, None, None]: #print("D", section) for uid, usr_data in section.items(): for job_full_id, job_data in usr_data.items(): state = job_data.get("state") if not state: continue item = uid + "/" + job_full_id yield Service(item=item) def check_jobwatch(item: str, section: Any) -> Generator[Union[Result,Metric], None, None]: uid, job_full_id = item.split("/", 2) state = section \ .get(uid, {}) \ .get(job_full_id, {}) \ .get("state") if state: yield Metric( name="last_run", value=state.get("last_run_age_seconds"), ) yield Result( state=State(state.get('last_run_age_state', 3)), summary=f"last run at { state.get('last_run')}, {render.timespan(state.get('last_run_age_seconds'))} ago" ) yield Result( state=State(state.get('last_state', 3)), summary=f"last exitcode { state.get('last_exit_code')}" ) register.check_plugin( name="jobwatch", service_name="Job %s", check_function=check_jobwatch, discovery_function=discover_jobwatch, )