Terarea  2
The automation project
Loading...
Searching...
No Matches
action_management.py
Go to the documentation of this file.
1"""_summary_
2 File in charge of managing the actions
3"""
4
5import os
6import json
7from typing import Any, Dict
8
9from requests import Response
10from display_tty import Disp, TOML_CONF, FILE_DESCRIPTOR, SAVE_TO_FILE, FILE_NAME
11
12from .secrets import Secrets
13from .variables import Variables
14from . import constants as ACONST
15from .logger import ActionLogger
16from .api_querier import APIQuerier
17from .query_boilerplate import QueryEndpoint
18from ..components import constants as CONST
19from ..components.runtime_data import RuntimeData
20
21
23 """_summary_
24 """
25
26 def __init__(self, variable: Variables, logger: ActionLogger, runtime_data: RuntimeData, action_id: int = 0, scope: Any = "default_scope", error: int = 84, success: int = 0, debug: bool = False, delay: int = 10):
27 """_summary_
28 This is the class in charge of checking the actions to be run and storing variables if required.
29 Args:
30 variable (Variables): _description_: The class variable in charge of tracking the variables for the runtime.
31 logger (ActionLogger): _description_: The class logger in charge of logging the actions.
32 runtime_data (RuntimeData): _description_: The class runtime data in charge of containing important connections.
33 action_id (int): _description_: The action ID to log.
34 scope (Any, optional): _description_: The scope of the consequences. Defaults to "default_scope".
35 error (int, optional): _description_. Defaults to 84.: The error value
36 success (int, optional): _description_. Defaults to 0.: The success value
37 debug (bool, optional): _description_. Defaults to False.: Set to True if you wish to activate debug mode.
38 delay (int, optional): _description_. Defaults to 10.: The delay to wait before running the action.
39 """
40 # -------------------------- Inherited values --------------------------
41 self.error: int = error
42 self.scope: Any = scope
43 self.delay: int = delay
44 self.debug: bool = debug
45 self.success: int = success
46 self.action_id: str = str(action_id)
47 self.logger: ActionLogger = logger
48 self.variable: Variables = variable
49 self.runtime_data: RuntimeData = runtime_data
50 # ---------------------- The visual logger class ----------------------
51 self.disp: Disp = Disp(
52 TOML_CONF,
53 SAVE_TO_FILE,
54 FILE_NAME,
55 FILE_DESCRIPTOR,
56 debug=self.debug,
57 logger=self.__class__.__name__
58 )
59 # ------------------ The class containing the secrets ------------------
60 self.secrets: Secrets = Secrets(
61 success=self.success,
62 error=self.error,
63 debug=self.debug
64 )
65 # ------------- The class in charge of managing endpoints -------------
66 self.query_endpoint: QueryEndpoint = QueryEndpoint(
67 host="",
68 port=None,
69 delay=0,
70 debug=self.debug
71 )
72 # ---------------- The class containing the api querier ----------------
73 self.api_querier_initialised: APIQuerier = None
74 self.api_response: Dict[str, Any] = None
75
76 def _log_fatal(self, title: str, msg, action_id: int, raise_item: bool = False, raise_func: object = ValueError) -> int:
77 """_summary_
78 A function that will log a provided fatal error.
79
80 Args:
81 title (str): _description_: the title of the function
82 msg (str): _description_: The message to log
83 raise_item (bool, optional): _description_. Inform if the logger should raise or just return an error. Defaults to False.
84 raise_func (object, optional): _description_. The function to raise if required. Defaults to ValueError.
85
86 Raises:
87 ValueError: _description_: One of the possible errors to raise.
88
89 Returns:
90 int: _description_: Will return self.error if raise_item is False
91 """
92 self.disp.log_error(msg, title)
93 self.logger.log_fatal(
94 ACONST.TYPE_SERVICE_ACTION,
95 action_id=action_id,
96 message=msg,
97 resolved=False
98 )
99 if raise_item is True:
100 raise_func(msg)
101 else:
102 return self.error
103
104 def get_response_verification(self, response_node: Dict[str, Any]) -> Any:
105 """_summary_
106 Get the response verification.
107
108 Args:
109 response_node (Dict[str, Any]): _description_
110
111 Returns:
112 Any: _description_
113 """
114 title = "get_response_verification"
115 self.disp.log_debug(f"response_node: {response_node}", title)
116 if response_node is None or response_node == "":
117 self._log_fatal(
118 title=title,
119 msg="No response node found.",
120 action_id=self.action_id,
121 raise_item=True,
122 raise_func=TypeError
123 )
124
125 for key, values in response_node.items():
126 self.disp.log_debug(f"Key: {key}, Values: {values}", title)
127 if ":" not in key:
128 if key == "response_content":
129 self.disp.log_debug(f"Values: {values}", title)
130 node = values
131 break
132 elif self.api_querier_initialised.strip_descriptor(key) == "response_content":
133 self.disp.log_debug(f"Values: {values}", title)
134 node = values
135 break
136 else:
137 self.disp.log_debug(f"Skipping key: {key}", title)
138
139 self.disp.log_debug(f"Node: {node}", title)
140 return node
141
142 def get_verification_value(self, response_node: Dict[str, Any]) -> Any:
143 """_summary_
144 Get the verification value.
145
146 Args:
147 response_node (Dict[str, Any]): _description_
148
149 Returns:
150 Any: _description_
151 """
152 title = "get_verification_value"
153 self.disp.log_debug(f"response_node: {response_node}", title)
154 if response_node is None or response_node == "":
155 self._log_fatal(
156 title=title,
157 msg="No response node found.",
158 action_id=self.action_id,
159 raise_item=True,
160 raise_func=TypeError
161 )
162
163 node = None
164 for key, values in response_node.items():
165 self.disp.log_debug(f"Key: {key}, Values: {values}", title)
166 if ":" not in key:
167 if key == "verification_value":
168 self.disp.log_debug(f"Values: {values}", title)
169 node = values
170 break
171 elif self.api_querier_initialised.strip_descriptor(key) == "verification_value":
172 self.disp.log_debug(f"Values: {values}", title)
173 node = values
174 break
175 else:
176 self.disp.log_debug(f"Skipping key: {key}", title)
177
178 self.disp.log_debug(f"Node: {node}", title)
179 return node
180
181 def get_response_content(self, variable_name: str) -> Any:
182 """_summary_
183 Get the response content.
184
185 Args:
186 variable_name (str): _description_
187
188 Returns:
189 Any: _description_
190 """
191 title = "get_response_content"
192 self.disp.log_debug(f"Variable name: {variable_name}", title)
193 if self.api_response is None:
194 msg = "No response found"
195 msg += f" in {title} for {variable_name} in {self.api_response}."
196 self.disp.log_critical(msg, title)
197 self._log_fatal(
198 title=title,
199 msg="No response found.",
200 action_id=self.action_id,
201 raise_item=False,
202 raise_func=TypeError
203 )
204 return ""
205 variable_name_list = variable_name.split(".")
206 list_length = len(variable_name_list)
207 self.disp.log_debug(f"List length: {list_length}", title)
208 if variable_name_list[0] == "body":
209 data_type: str = self.api_response.get(
210 ACONST.RESPONSE_NODE_BODY_TYPE_KEY
211 )
212 msg = f"Data type: {data_type}, list_length: {list_length}"
213 msg += f" Variable name list: {variable_name_list}"
214 self.disp.log_debug(msg, title)
215 if data_type is None:
216 self._log_fatal(
217 title=title,
218 msg="No data type found.",
219 action_id=self.action_id,
220 raise_item=False,
221 raise_func=TypeError
222 )
223 return ""
224 if data_type.split(";")[0] not in ACONST.CONTENT_TYPES_JSON and list_length > 1:
225 msg = "Search depth is not possible for"
226 msg += f" this data type {data_type}."
227 self._log_fatal(
228 title=title,
229 msg=msg,
230 action_id=self.action_id,
231 raise_item=False,
232 raise_func=TypeError
233 )
234 return ""
235 else:
236 msg = "Variable name list: "
237 msg += f"{variable_name_list}"
238 self.disp.log_debug(msg, title)
239
240 node = self.api_response.copy()
241 for index, item in enumerate(variable_name_list):
242 if index == 0:
243 if item in ACONST.RESPONSE_NODE_KEY_EQUIVALENCE:
244 node: Dict[str, Any] = node.get(
245 ACONST.RESPONSE_NODE_KEY_EQUIVALENCE[item]
246 )
247 self.disp.log_debug(f"Node[{index}]: {node}", title)
248 continue
249 if item not in node:
250 self.disp.log_error(f"Item: {item} not in node: {node}", title)
251 return ""
252 node: Dict[str, Any] = node.get(item)
253 self.disp.log_debug(f"Node[{index}]: {node}", title)
254 self.disp.log_debug(f"Node: {node}", title)
255 return node
256
257 def get_variable_data_if_required(self, node: str, attempt_bruteforce: bool = True) -> Any:
258 """_summary_
259 Get the variable data if required.
260
261 Args:
262 node (Dict[str, Any]): _description_
263
264 Returns:
265 Any: _description_
266 """
267 title = "get_variable_data_if_required"
268 node_list = node.split("$ref")
269 if len(node_list) > 1:
270 for index, item in enumerate(node_list):
271 if item == "":
272 continue
273 if item[0] == "{":
274 var_name = self.api_querier_initialised.get_variable_name(
275 item[1:]
276 )
277 var_content = self.api_querier_initialised.get_special_content(
278 var_name
279 )
280 self.disp.log_debug(f"var_content: {var_content}", title)
281 if var_content == "":
282 var_content = self.get_response_content(var_name)
283 self.disp.log_debug(
284 f"var_content: {var_content}", title
285 )
286 item_new = f"{var_content}{item[len(var_name) + 2:]}"
287 self.disp.log_debug(f"item_new: {item_new}", title)
288 node_list[index] = item_new
289 node = "".join(node_list)
290 self.disp.log_debug(f"Processed node: {node}", title)
291 node_list = node.split("${")
292 self.disp.log_debug(f"Node list: {node_list}", title)
293 if len(node_list) > 1:
294 self.disp.log_debug(f"node_list: {node_list}", title)
295 for index, item in enumerate(node_list):
296 if item == "":
297 continue
298 if item[0] == "{":
299 var_name = self.api_querier_initialised.get_variable_name(
300 item[1:]
301 )
302 var_content = self.api_querier_initialised.get_normal_content(
303 var_name
304 )
305 self.disp.log_debug(f"var_content: {var_content}", title)
306 item_new = f"{var_content}{item[len(var_name) + 3:]}"
307 self.disp.log_debug(f"item_new: {item_new}", title)
308 node_list[index] = item_new
309 node = "".join(node_list)
310 self.disp.log_debug(f"Node: {node}", title)
311 if attempt_bruteforce is True:
312 node = ACONST.detect_and_convert(node)
313 self.disp.log_warning(f"Node: {node}, type = {type(node)}", title)
314 return node
315
316 def check_data_comparison(self, data: Any, operator: ACONST.operator, verification_value: Any) -> bool:
317 """_summary_
318 Check the data comparison.
319
320 Args:
321 data (Any): _description_
322 operator (Any): _description_
323 verification_value (Any): _description_
324
325 Returns:
326 bool: _description_
327 """
328 title = "check_data_comparison"
329 msg = f"data: {data}, operator: {operator}, "
330 msg += f"verification_value: {verification_value}"
331 self.disp.log_debug(msg, title)
332 try:
333 operation_result = operator(data, verification_value)
334 self.disp.log_debug(f"Operation result: {operation_result}", title)
335 except Exception as e:
336 self._log_fatal(
337 title=title,
338 msg=f"Error while comparing data: {e}",
339 action_id=self.action_id,
340 raise_item=True,
341 raise_func=ValueError
342 )
343 return operation_result
344
345 def set_runtime_variables(self, data: Dict[str, Any]) -> None:
346 """_summary_
347 Set the runtime variables.
348
349 Args:
350 data (Any): _description_
351 """
352 title = "set_runtime_variables"
353 self.disp.log_debug(f"Data: {data}", title)
354 if data is None:
355 self._log_fatal(
356 title=title,
357 msg="No data found.",
358 action_id=self.action_id,
359 raise_item=True,
360 raise_func=TypeError
361 )
362 if isinstance(data, Dict) is False:
363 self._log_fatal(
364 title=title,
365 msg="Data is not a dictionary.",
366 action_id=self.action_id,
367 raise_item=True,
368 raise_func=TypeError
369 )
370 for key, value in data.items():
371 self.disp.log_debug(f"Key: {key}, Value: {value}", title)
372 node = self.get_variable_data_if_required(value)
373 self.variable.add_variable(
374 key, node, type(node), self.scope
375 )
376
377 def process_node(self, consequences: Dict[str, Any]) -> int:
378 """_summary_
379 Process the nodes contained in the action json.
380
381 Args:
382 node (Dict[str, Any]): _description_
383
384 Returns:
385 int: _description_
386 """
387 title = "process_node"
388 self.disp.log_debug(f"consequences data: {consequences}", title)
389 node_of_interest = "service"
390 if node_of_interest not in consequences:
391 self._log_fatal(
392 title=title,
393 msg="No service data found in consequences data.",
394 action_id=self.action_id,
395 raise_item=True,
396 raise_func=ValueError
397 )
398 node: Dict[str, Any] = consequences.get(node_of_interest)
400 service=node,
401 variable=self.variable,
402 scope=self.scope,
403 runtime_data=self.runtime_data,
404 logger=self.logger,
405 action_id=self.action_id,
406 error=self.error,
407 success=self.success,
408 debug=self.debug
409 )
410 self.disp.log_debug("self.api_querier_initialised initialised", title)
411 response: Response = self.api_querier_initialised.query()
412 if response == self.success:
413 self.disp.log_debug("Response is a success", title)
414 return self.success
415 if response is None:
416 self._log_fatal(
417 title=title,
418 msg="No response found from API query.",
419 action_id=self.action_id,
420 raise_item=True,
421 raise_func=ValueError
422 )
423 self.disp.log_debug(f"Response: {response}", title)
424 data = self.query_endpoint.compile_response_data(response)
425 return data
426
427 def run(self, key: str) -> int:
428 """_summary_
429 Run the consequences checking.
430
431 Returns:
432 int: _description_: Returns self.success if the program succeeded, self.error otherwise.
433 """
434 title = "run"
435 self.disp.log_debug("Running consequences management.", title)
436 data = self.variable.get_scope(self.scope)
437 self.disp.log_debug(
438 f"Scope: {self.scope}, scope_content = {data}", title
439 )
440 if self.variable.has_variable(key, self.scope) is False:
441 msg = f"No applet data found for scope {self.scope}"
442 msg += f" in pid {os.getpid()}."
443 self._log_fatal(
444 title, msg, self.action_id, raise_item=True,
445 raise_func=ValueError
446 )
447 action_node = self.variable.get_variable(
448 name=key, scope=self.scope
449 )
450 self.disp.log_debug(f"Action node: {action_node}", title)
451 if "consequences" not in action_node:
452 self._log_fatal(
453 title=title,
454 msg="No consequences data found in applet data.",
455 action_id=self.action_id,
456 raise_item=True,
457 raise_func=ValueError
458 )
459 consequences_node = action_node["consequences"]
460 self.disp.log_debug(f"consequences node: {consequences_node}", title)
461 if isinstance(consequences_node, Dict) is False:
462 try:
463 consequences = json.loads(consequences_node)
464 except json.JSONDecodeError as e:
465 msg = f"Error while decoding consequences data: {e}"
466 self._log_fatal(
467 title, msg, self.action_id, raise_item=True,
468 raise_func=ValueError
469 )
470 else:
471 consequences: Dict[str, Any] = consequences_node
472 run_info = {}
473 if "1" in consequences and isinstance(consequences, Dict) is True:
474 for item_key, item_value in consequences.items():
475 run_info[item_key] = self.process_node(item_value)
476 else:
477 run_info = self.process_node(consequences)
478 self.disp.log_debug(f"Run info: {run_info}", title)
479 return self.success
Any get_variable_data_if_required(self, str node, bool attempt_bruteforce=True)
int process_node(self, Dict[str, Any] consequences)
__init__(self, Variables variable, ActionLogger logger, RuntimeData runtime_data, int action_id=0, Any scope="default_scope", int error=84, int success=0, bool debug=False, int delay=10)
bool check_data_comparison(self, Any data, ACONST.operator operator, Any verification_value)
None set_runtime_variables(self, Dict[str, Any] data)
Any get_response_verification(self, Dict[str, Any] response_node)
int _log_fatal(self, str title, msg, int action_id, bool raise_item=False, object raise_func=ValueError)
Any get_verification_value(self, Dict[str, Any] response_node)