2 File in charge of checking the trigger rules.
7from typing
import Any, Dict, List
9from requests
import Response
10from display_tty
import Disp, TOML_CONF, FILE_DESCRIPTOR, SAVE_TO_FILE, FILE_NAME
12from .secrets
import Secrets
13from .variables
import Variables
14from .logger
import ActionLogger
15from .
import constants
as ACONST
16from .api_querier
import APIQuerier
17from .query_boilerplate
import QueryEndpoint
19from ..components
import constants
as CONST
20from ..components.runtime_data
import RuntimeData
27 def __init__(self, variable: Variables, logger: ActionLogger, runtime_data: RuntimeData, scope: Any =
"default_scope", action_id: int = 0, error: int = 84, success: int = 0, debug: bool =
False, delay: int = 10):
29 This is the class in charge of checking the triggers and storing variables if required.
32 variable (Variables): _description_: The class variable in charge of tracking the variables for the runtime.
33 logger (ActionLogger): _description_: The class logger in charge of logging the actions.
34 runtime_data (RuntimeData): _description_: The class runtime data in charge of containing important connections.
35 action_id (int): _description_: The action ID to log.
36 scope (Any, optional): _description_: The scope of the trigger. Defaults to "default_scope".
37 error (int, optional): _description_. Defaults to 84.: The error value
38 success (int, optional): _description_. Defaults to 0.: The success value
39 debug (bool, optional): _description_. Defaults to False.: Set to True if you wish to activate debug mode.
58 logger=self.__class__.__name__
77 def _log_fatal(self, title: str, msg, action_id: int, raise_item: bool =
False, raise_func: object = ValueError) -> int:
79 A function that will log a provided fatal error.
82 title (str): _description_: the title of the function
83 msg (str): _description_: The message to log
84 raise_item (bool, optional): _description_. Inform if the logger should raise or just return an error. Defaults to False.
85 raise_func (object, optional): _description_. The function to raise if required. Defaults to ValueError.
88 ValueError: _description_: One of the possible errors to raise.
91 int: _description_: Will return self.error if raise_item is False
93 self.
disp.log_error(msg, title)
95 ACONST.TYPE_SERVICE_TRIGGER,
100 if raise_item
is True:
107 Get the verification operator.
110 operator (str): _description_
115 title =
"get_verification_operator"
116 self.
disp.log_debug(f
"Operator: {operator}", title)
117 if operator
is None or operator ==
"":
120 msg=
"No operator found.",
128 if i.startswith(
"selected:"):
130 if i.startswith(
"default:"):
132 self.
disp.log_debug(f
"Node1: {node1}, Node2: {node2}", title)
133 if node1
is None and node2
is None:
136 msg=
"No node found in operator.",
141 self.
disp.log_debug(f
"Node1: {node1}, Node2: {node2}", title)
142 if node1
not in ACONST.OPERATOR_EXCHANGE:
144 f
"Node1: {node1} not in ACONST.OPERATOR_EXCHANGE", title
146 if node2
not in ACONST.OPERATOR_EXCHANGE:
148 f
"Node2: {node2} not in ACONST.OPERATOR_EXCHANGE", title
152 msg=
"No node found in operator.",
158 response = ACONST.OPERATOR_EXCHANGE[node2]
159 self.
disp.log_debug(f
"Response: {response}", title)
162 response = ACONST.OPERATOR_EXCHANGE[node1]
163 self.
disp.log_debug(f
"Response: {response}", title)
168 Get the response verification.
171 response_node (Dict[str, Any]): _description_
176 title =
"get_response_verification"
177 self.
disp.log_debug(f
"response_node: {response_node}", title)
178 if response_node
is None or response_node ==
"":
181 msg=
"No response node found.",
187 for key, values
in response_node.items():
188 self.
disp.log_debug(f
"Key: {key}, Values: {values}", title)
190 if key ==
"response_content":
191 self.
disp.log_debug(f
"Values: {values}", title)
195 self.
disp.log_debug(f
"Values: {values}", title)
199 self.
disp.log_debug(f
"Skipping key: {key}", title)
201 self.
disp.log_debug(f
"Node: {node}", title)
206 Get the verification value.
209 response_node (Dict[str, Any]): _description_
214 title =
"get_verification_value"
215 self.
disp.log_debug(f
"response_node: {response_node}", title)
216 if response_node
is None or response_node ==
"":
219 msg=
"No response node found.",
226 for key, values
in response_node.items():
227 self.
disp.log_debug(f
"Key: {key}, Values: {values}", title)
229 if key ==
"verification_value":
230 self.
disp.log_debug(f
"Values: {values}", title)
234 self.
disp.log_debug(f
"Values: {values}", title)
235 if isinstance(values, List):
237 if i.startswith(
"selected:"):
240 if i.startswith(
"default:"):
246 self.
disp.log_debug(f
"Skipping key: {key}", title)
248 self.
disp.log_debug(f
"Node: {node}", title)
253 Get the response content.
256 variable_name (str): _description_
261 title =
"get_response_content"
262 self.
disp.log_debug(f
"Variable name: {variable_name}", title)
264 msg =
"No response found"
265 msg += f
" in {title} for {variable_name} in {self.api_response}."
266 self.
disp.log_critical(msg, title)
269 msg=
"No response found.",
275 variable_name_list = variable_name.split(
".")
276 list_length = len(variable_name_list)
277 self.
disp.log_debug(f
"List length: {list_length}", title)
278 if variable_name_list[0] ==
"body":
280 ACONST.RESPONSE_NODE_BODY_TYPE_KEY
282 msg = f
"Data type: {data_type}, list_length: {list_length}"
283 msg += f
" Variable name list: {variable_name_list}"
284 self.
disp.log_debug(msg, title)
285 if data_type
is None:
288 msg=
"No data type found.",
294 if data_type.split(
";")[0]
not in ACONST.CONTENT_TYPES_JSON
and list_length > 1:
295 msg =
"Search depth is not possible for"
296 msg += f
" this data type {data_type}."
306 msg =
"Variable name list: "
307 msg += f
"{variable_name_list}"
308 self.
disp.log_debug(msg, title)
311 for index, item
in enumerate(variable_name_list):
313 if item
in ACONST.RESPONSE_NODE_KEY_EQUIVALENCE:
314 node: Dict[str, Any] = node.get(
315 ACONST.RESPONSE_NODE_KEY_EQUIVALENCE[item]
317 self.
disp.log_debug(f
"Node[{index}]: {node}", title)
320 self.
disp.log_error(f
"Item: {item} not in node: {node}", title)
322 node: Dict[str, Any] = node.get(item)
323 self.
disp.log_debug(f
"Node[{index}]: {node}", title)
324 self.
disp.log_debug(f
"Node: {node}", title)
329 Get the variable data if required.
332 node (Dict[str, Any]): _description_
337 title =
"get_variable_data_if_required"
338 node_list = node.split(
"$ref")
339 if len(node_list) > 1:
340 for index, item
in enumerate(node_list):
350 self.
disp.log_debug(f
"var_content: {var_content}", title)
351 if var_content ==
"":
354 f
"var_content: {var_content}", title
356 item_new = f
"{var_content}{item[len(var_name) + 2:]}"
357 self.
disp.log_debug(f
"item_new: {item_new}", title)
358 node_list[index] = item_new
359 node =
"".join(node_list)
360 self.
disp.log_debug(f
"Processed node: {node}", title)
361 node_list = node.split(
"${")
362 self.
disp.log_debug(f
"Node list: {node_list}", title)
363 if len(node_list) > 1:
364 self.
disp.log_debug(f
"node_list: {node_list}", title)
365 for index, item
in enumerate(node_list):
375 self.
disp.log_debug(f
"var_content: {var_content}", title)
376 item_new = f
"{var_content}{item[len(var_name) + 3:]}"
377 self.
disp.log_debug(f
"item_new: {item_new}", title)
378 node_list[index] = item_new
379 node =
"".join(node_list)
380 self.
disp.log_debug(f
"Node: {node}", title)
381 if attempt_bruteforce
is True:
382 node = ACONST.detect_and_convert(node)
383 self.
disp.log_warning(f
"Node: {node}, type = {type(node)}", title)
388 Check the data comparison.
391 data (Any): _description_
392 operator (Any): _description_
393 verification_value (Any): _description_
398 title =
"check_data_comparison"
399 msg = f
"data: {data}, operator: {operator}, "
400 msg += f
"verification_value: {verification_value}"
401 self.
disp.log_debug(msg, title)
403 operation_result = operator(data, verification_value)
404 self.
disp.log_debug(f
"Operation result: {operation_result}", title)
405 except Exception
as e:
408 msg=f
"Error while comparing data: {e}",
411 raise_func=ValueError
413 return operation_result
417 Set the runtime variables.
420 data (Any): _description_
422 title =
"set_runtime_variables"
423 self.
disp.log_debug(f
"Data: {data}", title)
427 msg=
"No data found.",
432 if isinstance(data, Dict)
is False:
435 msg=
"Data is not a dictionary.",
440 for key, value
in data.items():
441 self.
disp.log_debug(f
"Key: {key}, Value: {value}", title)
444 key, node, type(node), self.
scope
447 def run(self, key: str) -> int:
449 Run the trigger checking.
452 int: _description_: Returns self.success if the program succeeded, self.error otherwise.
455 self.
disp.log_debug(
"Running trigger management.", title)
460 f
"Scope: {self.scope}, scope_content = {data}", title
463 msg = f
"No applet data found for scope {self.scope}"
464 msg += f
" in pid {os.getpid()}."
466 title, msg, self.
action_id, raise_item=
True,
467 raise_func=ValueError
470 action_node = self.
variable.get_variable(name=key, scope=self.
scope)
471 if "trigger" not in action_node:
474 msg=
"No trigger data found in applet data.",
477 raise_func=ValueError
479 trigger_node = action_node[
"trigger"]
480 self.
disp.log_debug(f
"Trigger node: {trigger_node}", title)
483 if isinstance(trigger_node, Dict)
is False:
485 trigger = json.loads(trigger_node)
486 except json.JSONDecodeError
as e:
487 msg = f
"Error while decoding trigger data: {e}"
489 title, msg, self.
action_id, raise_item=
True,
490 raise_func=ValueError
493 trigger: Dict[str, Any] = trigger_node
494 self.
disp.log_debug(f
"Trigger data: {trigger}", title)
497 node_of_interest =
"service"
498 if node_of_interest
not in trigger:
501 msg=
"No service data found in trigger data.",
504 raise_func=ValueError
506 node: Dict[str, Any] = trigger.get(node_of_interest)
520 self.
disp.log_debug(
"self.api_querier_initialised initialised", title)
525 msg=
"No response found from API query.",
528 raise_func=ValueError
530 self.
disp.log_debug(f
"Response: {response}", title)
534 self.
disp.log_debug(f
"Data: {data}, type = {type(data)}", title)
536 "trigger_data", data, type(data), self.
scope
539 self.
disp.log_debug(
"Variable added.", title)
542 self.
disp.log_info(
"Getting content for response_data", title)
543 expected_trigger_response = (node.get(
"response")
or node.get(
"ignore:response"))
544 response_data: Dict = {}
546 for response_key, value
in response_data[
"response"].items():
548 msg = f
"response_data = {response_data}, "
549 self.
disp.log_debug(msg, title)
552 self.
disp.log_info(
"Getting content for verification_value", title)
553 response_verification: Dict[str, Any] = (node.get(
"verification")
or node.get(
"ignore:verification"))
554 for data_key, verification_details
in response_verification.items():
555 verification_operator = (verification_details.get(
"drop:verification_operator")
or verification_details.get(
"ignore:verification_operator"))
557 verification_operator
562 self.
disp.log_debug(
"raw Verification info gathered", title)
563 msg = f
"Verification operator: {verification_operator}, "
564 msg += f
"verification_value = {verification_value}"
570 self.
disp.log_debug(
"Content gathered.", title)
571 self.
disp.log_debug(f
"response_data: {response_data}", title)
572 self.
disp.log_debug(f
"verification_value: {verification_value}", title)
576 data=response_data[
"response"][data_key],
577 operator=verification_operator,
578 verification_value=verification_value
580 if response
is False:
582 self.
disp.log_debug(f
"Data comparison successful for {data_key} verification.", title)
585 var1 = node.get(
"variables")
586 var2 = node.get(
"vars")
__init__(self, Variables variable, ActionLogger logger, RuntimeData runtime_data, Any scope="default_scope", int action_id=0, int error=84, int success=0, bool debug=False, int delay=10)
bool check_data_comparison(self, Any data, ACONST.operator operator, Any verification_value)
APIQuerier api_querier_initialised
Any get_response_content(self, str variable_name)
Any get_verification_value(self, Dict[str, Any] response_node)
None set_runtime_variables(self, Dict[str, Any] data)
Dict[str, Any] api_response
int _log_fatal(self, str title, msg, int action_id, bool raise_item=False, object raise_func=ValueError)
Any get_response_verification(self, Dict[str, Any] response_node)
Any get_variable_data_if_required(self, str node, bool attempt_bruteforce=True)
Any get_verification_operator(self, str operator)
QueryEndpoint query_endpoint