2 File in charge of managing the actions
7from typing
import Any, Dict
9from requests
import Response
10from display_tty
import Disp, TOML_CONF, FILE_DESCRIPTOR, SAVE_TO_FILE, FILE_NAME
12from .secrets
import Secrets
13from .variables
import Variables
14from .
import constants
as ACONST
15from .logger
import ActionLogger
16from .api_querier
import APIQuerier
17from .query_boilerplate
import QueryEndpoint
18from ..components
import constants
as CONST
19from ..components.runtime_data
import RuntimeData
26 def __init__(self, variable: Variables, logger: ActionLogger, runtime_data: RuntimeData, action_id: int = 0, scope: Any =
"default_scope", error: int = 84, success: int = 0, debug: bool =
False, delay: int = 10):
28 This is the class in charge of checking the actions to be run and storing variables if required.
30 variable (Variables): _description_: The class variable in charge of tracking the variables for the runtime.
31 logger (ActionLogger): _description_: The class logger in charge of logging the actions.
32 runtime_data (RuntimeData): _description_: The class runtime data in charge of containing important connections.
33 action_id (int): _description_: The action ID to log.
34 scope (Any, optional): _description_: The scope of the consequences. Defaults to "default_scope".
35 error (int, optional): _description_. Defaults to 84.: The error value
36 success (int, optional): _description_. Defaults to 0.: The success value
37 debug (bool, optional): _description_. Defaults to False.: Set to True if you wish to activate debug mode.
38 delay (int, optional): _description_. Defaults to 10.: The delay to wait before running the action.
57 logger=self.__class__.__name__
76 def _log_fatal(self, title: str, msg, action_id: int, raise_item: bool =
False, raise_func: object = ValueError) -> int:
78 A function that will log a provided fatal error.
81 title (str): _description_: the title of the function
82 msg (str): _description_: The message to log
83 raise_item (bool, optional): _description_. Inform if the logger should raise or just return an error. Defaults to False.
84 raise_func (object, optional): _description_. The function to raise if required. Defaults to ValueError.
87 ValueError: _description_: One of the possible errors to raise.
90 int: _description_: Will return self.error if raise_item is False
92 self.
disp.log_error(msg, title)
94 ACONST.TYPE_SERVICE_ACTION,
99 if raise_item
is True:
106 Get the response verification.
109 response_node (Dict[str, Any]): _description_
114 title =
"get_response_verification"
115 self.
disp.log_debug(f
"response_node: {response_node}", title)
116 if response_node
is None or response_node ==
"":
119 msg=
"No response node found.",
125 for key, values
in response_node.items():
126 self.
disp.log_debug(f
"Key: {key}, Values: {values}", title)
128 if key ==
"response_content":
129 self.
disp.log_debug(f
"Values: {values}", title)
133 self.
disp.log_debug(f
"Values: {values}", title)
137 self.
disp.log_debug(f
"Skipping key: {key}", title)
139 self.
disp.log_debug(f
"Node: {node}", title)
144 Get the verification value.
147 response_node (Dict[str, Any]): _description_
152 title =
"get_verification_value"
153 self.
disp.log_debug(f
"response_node: {response_node}", title)
154 if response_node
is None or response_node ==
"":
157 msg=
"No response node found.",
164 for key, values
in response_node.items():
165 self.
disp.log_debug(f
"Key: {key}, Values: {values}", title)
167 if key ==
"verification_value":
168 self.
disp.log_debug(f
"Values: {values}", title)
172 self.
disp.log_debug(f
"Values: {values}", title)
176 self.
disp.log_debug(f
"Skipping key: {key}", title)
178 self.
disp.log_debug(f
"Node: {node}", title)
183 Get the response content.
186 variable_name (str): _description_
191 title =
"get_response_content"
192 self.
disp.log_debug(f
"Variable name: {variable_name}", title)
194 msg =
"No response found"
195 msg += f
" in {title} for {variable_name} in {self.api_response}."
196 self.
disp.log_critical(msg, title)
199 msg=
"No response found.",
205 variable_name_list = variable_name.split(
".")
206 list_length = len(variable_name_list)
207 self.
disp.log_debug(f
"List length: {list_length}", title)
208 if variable_name_list[0] ==
"body":
210 ACONST.RESPONSE_NODE_BODY_TYPE_KEY
212 msg = f
"Data type: {data_type}, list_length: {list_length}"
213 msg += f
" Variable name list: {variable_name_list}"
214 self.
disp.log_debug(msg, title)
215 if data_type
is None:
218 msg=
"No data type found.",
224 if data_type.split(
";")[0]
not in ACONST.CONTENT_TYPES_JSON
and list_length > 1:
225 msg =
"Search depth is not possible for"
226 msg += f
" this data type {data_type}."
236 msg =
"Variable name list: "
237 msg += f
"{variable_name_list}"
238 self.
disp.log_debug(msg, title)
241 for index, item
in enumerate(variable_name_list):
243 if item
in ACONST.RESPONSE_NODE_KEY_EQUIVALENCE:
244 node: Dict[str, Any] = node.get(
245 ACONST.RESPONSE_NODE_KEY_EQUIVALENCE[item]
247 self.
disp.log_debug(f
"Node[{index}]: {node}", title)
250 self.
disp.log_error(f
"Item: {item} not in node: {node}", title)
252 node: Dict[str, Any] = node.get(item)
253 self.
disp.log_debug(f
"Node[{index}]: {node}", title)
254 self.
disp.log_debug(f
"Node: {node}", title)
259 Get the variable data if required.
262 node (Dict[str, Any]): _description_
267 title =
"get_variable_data_if_required"
268 node_list = node.split(
"$ref")
269 if len(node_list) > 1:
270 for index, item
in enumerate(node_list):
280 self.
disp.log_debug(f
"var_content: {var_content}", title)
281 if var_content ==
"":
284 f
"var_content: {var_content}", title
286 item_new = f
"{var_content}{item[len(var_name) + 2:]}"
287 self.
disp.log_debug(f
"item_new: {item_new}", title)
288 node_list[index] = item_new
289 node =
"".join(node_list)
290 self.
disp.log_debug(f
"Processed node: {node}", title)
291 node_list = node.split(
"${")
292 self.
disp.log_debug(f
"Node list: {node_list}", title)
293 if len(node_list) > 1:
294 self.
disp.log_debug(f
"node_list: {node_list}", title)
295 for index, item
in enumerate(node_list):
305 self.
disp.log_debug(f
"var_content: {var_content}", title)
306 item_new = f
"{var_content}{item[len(var_name) + 3:]}"
307 self.
disp.log_debug(f
"item_new: {item_new}", title)
308 node_list[index] = item_new
309 node =
"".join(node_list)
310 self.
disp.log_debug(f
"Node: {node}", title)
311 if attempt_bruteforce
is True:
312 node = ACONST.detect_and_convert(node)
313 self.
disp.log_warning(f
"Node: {node}, type = {type(node)}", title)
318 Check the data comparison.
321 data (Any): _description_
322 operator (Any): _description_
323 verification_value (Any): _description_
328 title =
"check_data_comparison"
329 msg = f
"data: {data}, operator: {operator}, "
330 msg += f
"verification_value: {verification_value}"
331 self.
disp.log_debug(msg, title)
333 operation_result = operator(data, verification_value)
334 self.
disp.log_debug(f
"Operation result: {operation_result}", title)
335 except Exception
as e:
338 msg=f
"Error while comparing data: {e}",
341 raise_func=ValueError
343 return operation_result
347 Set the runtime variables.
350 data (Any): _description_
352 title =
"set_runtime_variables"
353 self.
disp.log_debug(f
"Data: {data}", title)
357 msg=
"No data found.",
362 if isinstance(data, Dict)
is False:
365 msg=
"Data is not a dictionary.",
370 for key, value
in data.items():
371 self.
disp.log_debug(f
"Key: {key}, Value: {value}", title)
374 key, node, type(node), self.
scope
379 Process the nodes contained in the action json.
382 node (Dict[str, Any]): _description_
387 title =
"process_node"
388 self.
disp.log_debug(f
"consequences data: {consequences}", title)
389 node_of_interest =
"service"
390 if node_of_interest
not in consequences:
393 msg=
"No service data found in consequences data.",
396 raise_func=ValueError
398 node: Dict[str, Any] = consequences.get(node_of_interest)
410 self.
disp.log_debug(
"self.api_querier_initialised initialised", title)
413 self.
disp.log_debug(
"Response is a success", title)
418 msg=
"No response found from API query.",
421 raise_func=ValueError
423 self.
disp.log_debug(f
"Response: {response}", title)
427 def run(self, key: str) -> int:
429 Run the consequences checking.
432 int: _description_: Returns self.success if the program succeeded, self.error otherwise.
435 self.
disp.log_debug(
"Running consequences management.", title)
438 f
"Scope: {self.scope}, scope_content = {data}", title
441 msg = f
"No applet data found for scope {self.scope}"
442 msg += f
" in pid {os.getpid()}."
444 title, msg, self.
action_id, raise_item=
True,
445 raise_func=ValueError
447 action_node = self.
variable.get_variable(
448 name=key, scope=self.
scope
450 self.
disp.log_debug(f
"Action node: {action_node}", title)
451 if "consequences" not in action_node:
454 msg=
"No consequences data found in applet data.",
457 raise_func=ValueError
459 consequences_node = action_node[
"consequences"]
460 self.
disp.log_debug(f
"consequences node: {consequences_node}", title)
461 if isinstance(consequences_node, Dict)
is False:
463 consequences = json.loads(consequences_node)
464 except json.JSONDecodeError
as e:
465 msg = f
"Error while decoding consequences data: {e}"
467 title, msg, self.
action_id, raise_item=
True,
468 raise_func=ValueError
471 consequences: Dict[str, Any] = consequences_node
473 if "1" in consequences
and isinstance(consequences, Dict)
is True:
474 for item_key, item_value
in consequences.items():
478 self.
disp.log_debug(f
"Run info: {run_info}", title)
Any get_response_content(self, str variable_name)
Any get_variable_data_if_required(self, str node, bool attempt_bruteforce=True)
int process_node(self, Dict[str, Any] consequences)
__init__(self, Variables variable, ActionLogger logger, RuntimeData runtime_data, int action_id=0, Any scope="default_scope", int error=84, int success=0, bool debug=False, int delay=10)
APIQuerier api_querier_initialised
QueryEndpoint query_endpoint
Dict[str, Any] api_response
bool check_data_comparison(self, Any data, ACONST.operator operator, Any verification_value)
None set_runtime_variables(self, Dict[str, Any] data)
Any get_response_verification(self, Dict[str, Any] response_node)
int _log_fatal(self, str title, msg, int action_id, bool raise_item=False, object raise_func=ValueError)
Any get_verification_value(self, Dict[str, Any] response_node)