Terarea  2
The automation project
Loading...
Searching...
No Matches
trigger_management.py
Go to the documentation of this file.
1"""_summary_
2 File in charge of checking the trigger rules.
3"""
4
5import os
6import json
7from typing import Any, Dict, List
8
9from requests import Response
10from display_tty import Disp, TOML_CONF, FILE_DESCRIPTOR, SAVE_TO_FILE, FILE_NAME
11
12from .secrets import Secrets
13from .variables import Variables
14from .logger import ActionLogger
15from . import constants as ACONST
16from .api_querier import APIQuerier
17from .query_boilerplate import QueryEndpoint
18
19from ..components import constants as CONST
20from ..components.runtime_data import RuntimeData
21
22
24 """_summary_
25 """
26
27 def __init__(self, variable: Variables, logger: ActionLogger, runtime_data: RuntimeData, scope: Any = "default_scope", action_id: int = 0, error: int = 84, success: int = 0, debug: bool = False, delay: int = 10):
28 """_summary_
29 This is the class in charge of checking the triggers and storing variables if required.
30
31 Args:
32 variable (Variables): _description_: The class variable in charge of tracking the variables for the runtime.
33 logger (ActionLogger): _description_: The class logger in charge of logging the actions.
34 runtime_data (RuntimeData): _description_: The class runtime data in charge of containing important connections.
35 action_id (int): _description_: The action ID to log.
36 scope (Any, optional): _description_: The scope of the trigger. Defaults to "default_scope".
37 error (int, optional): _description_. Defaults to 84.: The error value
38 success (int, optional): _description_. Defaults to 0.: The success value
39 debug (bool, optional): _description_. Defaults to False.: Set to True if you wish to activate debug mode.
40 """
41 # -------------------------- Inherited values --------------------------
42 self.error: int = error
43 self.scope: Any = scope
44 self.delay: int = delay
45 self.debug: bool = debug
46 self.success: int = success
47 self.action_id: str = str(action_id)
48 self.logger: ActionLogger = logger
49 self.variable: Variables = variable
50 self.runtime_data: RuntimeData = runtime_data
51 # ---------------------- The visual logger class ----------------------
52 self.disp: Disp = Disp(
53 TOML_CONF,
54 SAVE_TO_FILE,
55 FILE_NAME,
56 FILE_DESCRIPTOR,
57 debug=self.debug,
58 logger=self.__class__.__name__
59 )
60 # ------------------ The class containing the secrets ------------------
61 self.secrets: Secrets = Secrets(
62 success=self.success,
63 error=self.error,
64 debug=self.debug
65 )
66 # ------------- The class in charge of managing endpoints -------------
67 self.query_endpoint: QueryEndpoint = QueryEndpoint(
68 host="",
69 port=None,
70 delay=0,
71 debug=self.debug
72 )
73 # ---------------- The class containing the api querier ----------------
74 self.api_querier_initialised: APIQuerier = None
75 self.api_responseapi_response: Dict[str, Any] = None
76
77 def _log_fatal(self, title: str, msg, action_id: int, raise_item: bool = False, raise_func: object = ValueError) -> int:
78 """_summary_
79 A function that will log a provided fatal error.
80
81 Args:
82 title (str): _description_: the title of the function
83 msg (str): _description_: The message to log
84 raise_item (bool, optional): _description_. Inform if the logger should raise or just return an error. Defaults to False.
85 raise_func (object, optional): _description_. The function to raise if required. Defaults to ValueError.
86
87 Raises:
88 ValueError: _description_: One of the possible errors to raise.
89
90 Returns:
91 int: _description_: Will return self.error if raise_item is False
92 """
93 self.disp.log_error(msg, title)
94 self.logger.log_fatal(
95 ACONST.TYPE_SERVICE_TRIGGER,
96 action_id=action_id,
97 message=msg,
98 resolved=False
99 )
100 if raise_item is True:
101 raise_func(msg)
102 else:
103 return self.error
104
105 def get_verification_operator(self, operator: str) -> Any:
106 """_summary_
107 Get the verification operator.
108
109 Args:
110 operator (str): _description_
111
112 Returns:
113 Any: _description_
114 """
115 title = "get_verification_operator"
116 self.disp.log_debug(f"Operator: {operator}", title)
117 if operator is None or operator == "":
118 self._log_fatal(
119 title=title,
120 msg="No operator found.",
121 action_id=self.action_id,
122 raise_item=True,
123 raise_func=TypeError
124 )
125 node1 = None
126 node2 = None
127 for i in operator:
128 if i.startswith("selected:"):
129 node1 = self.api_querier_initialised.strip_descriptor(i)
130 if i.startswith("default:"):
131 node2 = self.api_querier_initialised.strip_descriptor(i)
132 self.disp.log_debug(f"Node1: {node1}, Node2: {node2}", title)
133 if node1 is None and node2 is None:
134 self._log_fatal(
135 title=title,
136 msg="No node found in operator.",
137 action_id=0,
138 raise_item=True,
139 raise_func=TypeError
140 )
141 self.disp.log_debug(f"Node1: {node1}, Node2: {node2}", title)
142 if node1 not in ACONST.OPERATOR_EXCHANGE:
143 self.disp.log_debug(
144 f"Node1: {node1} not in ACONST.OPERATOR_EXCHANGE", title
145 )
146 if node2 not in ACONST.OPERATOR_EXCHANGE:
147 self.disp.log_debug(
148 f"Node2: {node2} not in ACONST.OPERATOR_EXCHANGE", title
149 )
150 self._log_fatal(
151 title=title,
152 msg="No node found in operator.",
153 action_id=self.action_id,
154 raise_item=True,
155 raise_func=TypeError
156 )
157 else:
158 response = ACONST.OPERATOR_EXCHANGE[node2]
159 self.disp.log_debug(f"Response: {response}", title)
160 return response
161 else:
162 response = ACONST.OPERATOR_EXCHANGE[node1]
163 self.disp.log_debug(f"Response: {response}", title)
164 return response
165
166 def get_response_verification(self, response_node: Dict[str, Any]) -> Any:
167 """_summary_
168 Get the response verification.
169
170 Args:
171 response_node (Dict[str, Any]): _description_
172
173 Returns:
174 Any: _description_
175 """
176 title = "get_response_verification"
177 self.disp.log_debug(f"response_node: {response_node}", title)
178 if response_node is None or response_node == "":
179 self._log_fatal(
180 title=title,
181 msg="No response node found.",
182 action_id=self.action_id,
183 raise_item=True,
184 raise_func=TypeError
185 )
186
187 for key, values in response_node.items():
188 self.disp.log_debug(f"Key: {key}, Values: {values}", title)
189 if ":" not in key:
190 if key == "response_content":
191 self.disp.log_debug(f"Values: {values}", title)
192 node = values
193 break
194 elif self.api_querier_initialised.strip_descriptor(key) == "response_content":
195 self.disp.log_debug(f"Values: {values}", title)
196 node = values
197 break
198 else:
199 self.disp.log_debug(f"Skipping key: {key}", title)
200
201 self.disp.log_debug(f"Node: {node}", title)
202 return node
203
204 def get_verification_value(self, response_node: Dict[str, Any]) -> Any:
205 """_summary_
206 Get the verification value.
207
208 Args:
209 response_node (Dict[str, Any]): _description_
210
211 Returns:
212 Any: _description_
213 """
214 title = "get_verification_value"
215 self.disp.log_debug(f"response_node: {response_node}", title)
216 if response_node is None or response_node == "":
217 self._log_fatal(
218 title=title,
219 msg="No response node found.",
220 action_id=self.action_id,
221 raise_item=True,
222 raise_func=TypeError
223 )
224
225 node = None
226 for key, values in response_node.items():
227 self.disp.log_debug(f"Key: {key}, Values: {values}", title)
228 if ":" not in key:
229 if key == "verification_value":
230 self.disp.log_debug(f"Values: {values}", title)
231 node = values
232 break
233 elif self.api_querier_initialised.strip_descriptor(key) == "verification_value":
234 self.disp.log_debug(f"Values: {values}", title)
235 if isinstance(values, List):
236 for i in values:
237 if i.startswith("selected:"):
238 node = self.api_querier_initialised.strip_descriptor(i)
239 break
240 if i.startswith("default:"):
241 node = self.api_querier_initialised.strip_descriptor(i)
242 else:
243 node = values
244 break
245 else:
246 self.disp.log_debug(f"Skipping key: {key}", title)
247
248 self.disp.log_debug(f"Node: {node}", title)
249 return node
250
251 def get_response_content(self, variable_name: str) -> Any:
252 """_summary_
253 Get the response content.
254
255 Args:
256 variable_name (str): _description_
257
258 Returns:
259 Any: _description_
260 """
261 title = "get_response_content"
262 self.disp.log_debug(f"Variable name: {variable_name}", title)
263 if self.api_responseapi_response is None:
264 msg = "No response found"
265 msg += f" in {title} for {variable_name} in {self.api_response}."
266 self.disp.log_critical(msg, title)
267 self._log_fatal(
268 title=title,
269 msg="No response found.",
270 action_id=self.action_id,
271 raise_item=False,
272 raise_func=TypeError
273 )
274 return ""
275 variable_name_list = variable_name.split(".")
276 list_length = len(variable_name_list)
277 self.disp.log_debug(f"List length: {list_length}", title)
278 if variable_name_list[0] == "body":
279 data_type: str = self.api_responseapi_response.get(
280 ACONST.RESPONSE_NODE_BODY_TYPE_KEY
281 )
282 msg = f"Data type: {data_type}, list_length: {list_length}"
283 msg += f" Variable name list: {variable_name_list}"
284 self.disp.log_debug(msg, title)
285 if data_type is None:
286 self._log_fatal(
287 title=title,
288 msg="No data type found.",
289 action_id=self.action_id,
290 raise_item=False,
291 raise_func=TypeError
292 )
293 return ""
294 if data_type.split(";")[0] not in ACONST.CONTENT_TYPES_JSON and list_length > 1:
295 msg = "Search depth is not possible for"
296 msg += f" this data type {data_type}."
297 self._log_fatal(
298 title=title,
299 msg=msg,
300 action_id=self.action_id,
301 raise_item=False,
302 raise_func=TypeError
303 )
304 return ""
305 else:
306 msg = "Variable name list: "
307 msg += f"{variable_name_list}"
308 self.disp.log_debug(msg, title)
309
310 node = self.api_responseapi_response.copy()
311 for index, item in enumerate(variable_name_list):
312 if index == 0:
313 if item in ACONST.RESPONSE_NODE_KEY_EQUIVALENCE:
314 node: Dict[str, Any] = node.get(
315 ACONST.RESPONSE_NODE_KEY_EQUIVALENCE[item]
316 )
317 self.disp.log_debug(f"Node[{index}]: {node}", title)
318 continue
319 if item not in node:
320 self.disp.log_error(f"Item: {item} not in node: {node}", title)
321 return ""
322 node: Dict[str, Any] = node.get(item)
323 self.disp.log_debug(f"Node[{index}]: {node}", title)
324 self.disp.log_debug(f"Node: {node}", title)
325 return node
326
327 def get_variable_data_if_required(self, node: str, attempt_bruteforce: bool = True) -> Any:
328 """_summary_
329 Get the variable data if required.
330
331 Args:
332 node (Dict[str, Any]): _description_
333
334 Returns:
335 Any: _description_
336 """
337 title = "get_variable_data_if_required"
338 node_list = node.split("$ref")
339 if len(node_list) > 1:
340 for index, item in enumerate(node_list):
341 if item == "":
342 continue
343 if item[0] == "{":
344 var_name = self.api_querier_initialised.get_variable_name(
345 item[1:]
346 )
347 var_content = self.api_querier_initialised.get_special_content(
348 var_name
349 )
350 self.disp.log_debug(f"var_content: {var_content}", title)
351 if var_content == "":
352 var_content = self.get_response_content(var_name)
353 self.disp.log_debug(
354 f"var_content: {var_content}", title
355 )
356 item_new = f"{var_content}{item[len(var_name) + 2:]}"
357 self.disp.log_debug(f"item_new: {item_new}", title)
358 node_list[index] = item_new
359 node = "".join(node_list)
360 self.disp.log_debug(f"Processed node: {node}", title)
361 node_list = node.split("${")
362 self.disp.log_debug(f"Node list: {node_list}", title)
363 if len(node_list) > 1:
364 self.disp.log_debug(f"node_list: {node_list}", title)
365 for index, item in enumerate(node_list):
366 if item == "":
367 continue
368 if item[0] == "{":
369 var_name = self.api_querier_initialised.get_variable_name(
370 item[1:]
371 )
372 var_content = self.api_querier_initialised.get_normal_content(
373 var_name
374 )
375 self.disp.log_debug(f"var_content: {var_content}", title)
376 item_new = f"{var_content}{item[len(var_name) + 3:]}"
377 self.disp.log_debug(f"item_new: {item_new}", title)
378 node_list[index] = item_new
379 node = "".join(node_list)
380 self.disp.log_debug(f"Node: {node}", title)
381 if attempt_bruteforce is True:
382 node = ACONST.detect_and_convert(node)
383 self.disp.log_warning(f"Node: {node}, type = {type(node)}", title)
384 return node
385
386 def check_data_comparison(self, data: Any, operator: ACONST.operator, verification_value: Any) -> bool:
387 """_summary_
388 Check the data comparison.
389
390 Args:
391 data (Any): _description_
392 operator (Any): _description_
393 verification_value (Any): _description_
394
395 Returns:
396 bool: _description_
397 """
398 title = "check_data_comparison"
399 msg = f"data: {data}, operator: {operator}, "
400 msg += f"verification_value: {verification_value}"
401 self.disp.log_debug(msg, title)
402 try:
403 operation_result = operator(data, verification_value)
404 self.disp.log_debug(f"Operation result: {operation_result}", title)
405 except Exception as e:
406 self._log_fatal(
407 title=title,
408 msg=f"Error while comparing data: {e}",
409 action_id=self.action_id,
410 raise_item=True,
411 raise_func=ValueError
412 )
413 return operation_result
414
415 def set_runtime_variables(self, data: Dict[str, Any]) -> None:
416 """_summary_
417 Set the runtime variables.
418
419 Args:
420 data (Any): _description_
421 """
422 title = "set_runtime_variables"
423 self.disp.log_debug(f"Data: {data}", title)
424 if data is None:
425 self._log_fatal(
426 title=title,
427 msg="No data found.",
428 action_id=self.action_id,
429 raise_item=True,
430 raise_func=TypeError
431 )
432 if isinstance(data, Dict) is False:
433 self._log_fatal(
434 title=title,
435 msg="Data is not a dictionary.",
436 action_id=self.action_id,
437 raise_item=True,
438 raise_func=TypeError
439 )
440 for key, value in data.items():
441 self.disp.log_debug(f"Key: {key}, Value: {value}", title)
442 node = self.get_variable_data_if_required(value)
443 self.variable.add_variable(
444 key, node, type(node), self.scope
445 )
446
447 def run(self, key: str) -> int:
448 """_summary_
449 Run the trigger checking.
450
451 Returns:
452 int: _description_: Returns self.success if the program succeeded, self.error otherwise.
453 """
454 title = "run"
455 self.disp.log_debug("Running trigger management.", title)
456
457 # Setup variables
458 data = self.variable.get_scope(self.scope)
459 self.disp.log_debug(
460 f"Scope: {self.scope}, scope_content = {data}", title
461 )
462 if self.variable.has_variable(key, self.scope) is False:
463 msg = f"No applet data found for scope {self.scope}"
464 msg += f" in pid {os.getpid()}."
465 self._log_fatal(
466 title, msg, self.action_id, raise_item=True,
467 raise_func=ValueError
468 )
469
470 action_node = self.variable.get_variable(name=key, scope=self.scope)
471 if "trigger" not in action_node:
472 self._log_fatal(
473 title=title,
474 msg="No trigger data found in applet data.",
475 action_id=self.action_id,
476 raise_item=True,
477 raise_func=ValueError
478 )
479 trigger_node = action_node["trigger"]
480 self.disp.log_debug(f"Trigger node: {trigger_node}", title)
481
482 # Get the json of the trigger
483 if isinstance(trigger_node, Dict) is False:
484 try:
485 trigger = json.loads(trigger_node)
486 except json.JSONDecodeError as e:
487 msg = f"Error while decoding trigger data: {e}"
488 self._log_fatal(
489 title, msg, self.action_id, raise_item=True,
490 raise_func=ValueError
491 )
492 else:
493 trigger: Dict[str, Any] = trigger_node
494 self.disp.log_debug(f"Trigger data: {trigger}", title)
495
496 # Get the service part in the json
497 node_of_interest = "service"
498 if node_of_interest not in trigger:
499 self._log_fatal(
500 title=title,
501 msg="No service data found in trigger data.",
502 action_id=self.action_id,
503 raise_item=True,
504 raise_func=ValueError
505 )
506 node: Dict[str, Any] = trigger.get(node_of_interest)
507
508 # Call the API
510 service=node,
511 variable=self.variable,
512 scope=self.scope,
513 runtime_data=self.runtime_data,
514 logger=self.logger,
515 action_id=self.action_id,
516 error=self.error,
517 success=self.success,
518 debug=self.debug
519 )
520 self.disp.log_debug("self.api_querier_initialised initialised", title)
521 response: Response = self.api_querier_initialised.query()
522 if response is None:
523 self._log_fatal(
524 title=title,
525 msg="No response found from API query.",
526 action_id=self.action_id,
527 raise_item=True,
528 raise_func=ValueError
529 )
530 self.disp.log_debug(f"Response: {response}", title)
531
532 # Compile the API response
533 data = self.query_endpoint.compile_response_data(response)
534 self.disp.log_debug(f"Data: {data}, type = {type(data)}", title)
535 self.variable.add_variable(
536 "trigger_data", data, type(data), self.scope
537 )
539 self.disp.log_debug("Variable added.", title)
540
541 # Get the verification operator and verification value
542 self.disp.log_info("Getting content for response_data", title)
543 expected_trigger_response = (node.get("response") or node.get("ignore:response"))
544 response_data: Dict = {}
545 response_data["response"] = self.get_response_verification(expected_trigger_response)
546 for response_key, value in response_data["response"].items():
547 response_data["response"][response_key] = self.get_variable_data_if_required(value)
548 msg = f"response_data = {response_data}, "
549 self.disp.log_debug(msg, title)
550
551 # Get the verifications data in verify it with the expected response
552 self.disp.log_info("Getting content for verification_value", title)
553 response_verification: Dict[str, Any] = (node.get("verification") or node.get("ignore:verification"))
554 for data_key, verification_details in response_verification.items():
555 verification_operator = (verification_details.get("drop:verification_operator") or verification_details.get("ignore:verification_operator"))
556 verification_operator = self.get_verification_operator(
557 verification_operator
558 )
559 verification_value = self.get_verification_value(
560 verification_details
561 )
562 self.disp.log_debug("raw Verification info gathered", title)
563 msg = f"Verification operator: {verification_operator}, "
564 msg += f"verification_value = {verification_value}"
565
566 # Apply variable from the API response if needed
567 verification_value = self.get_variable_data_if_required(
568 verification_value
569 )
570 self.disp.log_debug("Content gathered.", title)
571 self.disp.log_debug(f"response_data: {response_data}", title)
572 self.disp.log_debug(f"verification_value: {verification_value}", title)
573
574 # Check if the comparison work
575 response = self.check_data_comparison(
576 data=response_data["response"][data_key],
577 operator=verification_operator,
578 verification_value=verification_value
579 )
580 if response is False:
581 return self.error
582 self.disp.log_debug(f"Data comparison successful for {data_key} verification.", title)
583
584 # Set the runtime data
585 var1 = node.get("variables")
586 var2 = node.get("vars")
587 if var1 is not None:
588 self.set_runtime_variables(var1)
589 if var2 is not None:
590 self.set_runtime_variables(var2)
591 return self.success
__init__(self, Variables variable, ActionLogger logger, RuntimeData runtime_data, Any scope="default_scope", int action_id=0, int error=84, int success=0, bool debug=False, int delay=10)
bool check_data_comparison(self, Any data, ACONST.operator operator, Any verification_value)
Any get_verification_value(self, Dict[str, Any] response_node)
int _log_fatal(self, str title, msg, int action_id, bool raise_item=False, object raise_func=ValueError)
Any get_response_verification(self, Dict[str, Any] response_node)
Any get_variable_data_if_required(self, str node, bool attempt_bruteforce=True)