Terarea  2
The automation project
Loading...
Searching...
No Matches
main.py
Go to the documentation of this file.
1"""_summary_
2 The main file of this section, in charge of the action processing
3"""
4
5import os
6from typing import Dict, Union, List, Any
7from time import sleep
8from random import uniform, randint
9from string import ascii_letters, digits
10from display_tty import Disp, TOML_CONF, FILE_DESCRIPTOR, SAVE_TO_FILE, FILE_NAME
11
12from ..components.runtime_data import RuntimeData
13from ..components import CONST
14
15from .variables import Variables
16from .logger import ActionLogger
17from .action_management import ActionManagement
18from .trigger_management import TriggerManagement
19from . import constants as ACONST
20
21
23 """_summary_
24 """
25
26 def __init__(self, runtime_data: RuntimeData, error: int = 84, success: int = 0, debug: bool = False):
27 """_summary_
28 Class in charge of processing the actions.
29
30 Args:
31 runtime_data (RuntimeData): _description_
32 error (int, optional): _description_. Defaults to 84.
33 success (int, optional): _description_. Defaults to 0.
34 debug (bool, optional): _description_. Defaults to False.
35 """
36 # -------------------------- Inherited values --------------------------
37 self.error = error
38 self.debug = debug
39 self.success = success
40 self.runtime_data = runtime_data
41 # ---------------------- The visual logger class ----------------------
42 self.disp: Disp = Disp(
43 TOML_CONF,
44 SAVE_TO_FILE,
45 FILE_NAME,
46 FILE_DESCRIPTOR,
47 debug=self.debug,
48 logger=self.__class__.__name__
49 )
50 # -------------------------- Internal classes --------------------------
51 self.variables: Variables = Variables(
52 success=self.success,
53 error=self.error,
54 debug=self.debug
55 )
56 self.logger: ActionLogger = ActionLogger(
57 runtime_data=self.runtime_data,
58 success=self.success,
59 error=self.error,
60 debug=self.debug
61 )
62
63 def cache_busting(self, length: int) -> int:
64 """_summary_
65 Function in charge of generating a cache busting string.
66
67 Args:
68 length (int): _description_: The action id.
69
70 Returns:
71 int: _description_: The result of the cache busting.
72 """
73 node_text = "cache_busting"
74 data = str(ascii_letters + digits)
75 data_length = len(data)-1
76 cache_busting = f"{node_text}_"
77 for i in range(0, length):
78 cache_busting += data[randint(0, data_length)]
79 cache_busting += f"_{node_text}"
80 return cache_busting
81
82 def random_delay(self, max_value: float = 1) -> float:
83 """_summary_
84 Function in charge of generating a random delay.
85 """
86 delay = uniform(0, max_value)
87 delay = int(delay * 100) / 100
88 return delay
89
90 def get_action_ids(self) -> Union[int, List[str]]:
91 """_summary_
92 Function in charge of getting the action ids.
93 """
94 title = "get_action_ids"
95 self.disp.log_debug("Getting the action ids", title)
96 action_ids = self.runtime_data.database_link.get_data_from_table(
97 table=CONST.TAB_ACTIONS,
98 column=["id"],
99 beautify=False
100 )
101 self.disp.log_debug(f"action_ids = {action_ids}", title)
102 if action_ids == self.error:
103 return self.error
104 self.disp.log_debug("Processing ids.", title)
105 res = []
106 for i in action_ids:
107 res.append(i[0])
108 self.disp.log_debug(f"processed ids = {res}", title)
109 return res
110
111 def lock_action(self, node: int) -> int:
112 """_summary_
113 Function in charge of locking an action.
114
115 Args:
116 node (int): _description_: The action to lock.
117
118 Returns:
119 int: _description_: Returns self.success if it succeeded, self.error otherwise
120 """
121 title = "lock_action"
122 self.disp.log_debug(f"Locking action {node}.", title)
123 if self.is_action_locked(node) is True:
124 self.disp.log_debug(f"Action {node} already locked.", title)
125 return self.error
126 locked = self.runtime_data.database_link.update_data_in_table(
127 table=CONST.TAB_ACTIONS,
128 column=["running"],
129 data=["1"],
130 where=f"id={node}"
131 )
132 if locked == self.error:
133 msg = f"Failed to lock action {node} by process {os.getpid()}"
134 self.disp.log_debug(msg, title)
135 self.logger.log_success(
136 log_type=ACONST.TYPE_SERVICE,
137 action_id=node,
138 message=msg,
139 resolved=True
140 )
141 return self.error
142 msg = f"Action {node} locked by process {os.getpid()}."
143 self.disp.log_debug(msg, title)
144 self.logger.log_success(
145 log_type=ACONST.TYPE_SERVICE,
146 action_id=node,
147 message=msg,
148 resolved=True
149 )
150 return self.success
151
152 def unlock_action(self, node: int) -> int:
153 """_summary_
154 Function in charge of unlocking an action.
155
156 Args:
157 node (int): _description_: The action to unlock.
158
159 Returns:
160 int: _description_: Returns self.success if it succeeded, self.error otherwise
161 """
162 title = "unlock_action"
163 self.disp.log_debug(f"Unlocking action {node}.", title)
164 if self.is_action_locked(node) is False:
165 self.disp.log_debug(f"Action {node} already unlocked.", title)
166 return self.error
167 locked = self.runtime_data.database_link.update_data_in_table(
168 table=CONST.TAB_ACTIONS,
169 column=["running"],
170 data=["0"],
171 where=f"id='{node}'"
172 )
173 if locked == self.error:
174 msg = f"Failed to unlock action {node} by process {os.getpid()}"
175 self.disp.log_debug(msg, title)
176 self.logger.log_success(
177 log_type=ACONST.TYPE_SERVICE,
178 action_id=node,
179 message=msg,
180 resolved=True
181 )
182 return self.error
183 msg = f"Action {node} unlocked by process {os.getpid()}."
184 self.disp.log_debug(msg, title)
185 self.logger.log_success(
186 log_type=ACONST.TYPE_SERVICE,
187 action_id=node,
188 message=msg,
189 resolved=True
190 )
191 return self.success
192
193 def dump_scope(self, action_id: int, scope: Any, log_type: str = ACONST.TYPE_RUNTIME_ERROR) -> int:
194 """_summary_
195 Dump the content of the scope to the logging database before clearing the cache.
196
197 Args:
198 scope (Any): _description_: The concerned scope.
199
200 Returns:
201 int: _description_: The status of how the scope dumping went.
202 """
203 title = "dump_scope"
204 # data = self.variables.get_scope(scope=scope)
205 # data = self.variables.sanitize_for_json(data)
206 # try:
207 # data = json.dumps(
208 # data,
209 # skipkeys=False,
210 # ensure_ascii=True,
211 # check_circular=True,
212 # allow_nan=True,
213 # cls=None,
214 # indent=None,
215 # sort_keys=False,
216 # )
217 # except Exception:
218 # data = f"{data}"
219 # self.logger.log_info(
220 # log_type=log_type,
221 # action_id=action_id,
222 # message=data,
223 # resolved=False
224 # )
225 self.disp.log_warning(
226 "Scope dumping is disabled for time reasons.",
227 title
228 )
229 return self.success
230
231 def process_action_node(self, node: int) -> int:
232 """_summary_
233 Function in charge of processing a given action.
234
235 Args:
236 node (int): _description_
237
238 Returns:
239 int: _description_
240 """
241 title = "process_action_node"
242 variable_scope = f"action_{node}"
243 action_detail = self.runtime_data.database_link.get_data_from_table(
244 table=CONST.TAB_ACTIONS,
245 column="*",
246 where=f"id={node}",
247 beautify=True
248 )
249 if action_detail == self.error:
250 msg = f"Failed to get the action {node} details"
251 msg += f"for process {os.getpid()}."
252 self.disp.log_critical(msg, title)
253 self.logger.log_fatal(
254 log_type=ACONST.TYPE_SERVICE,
255 action_id=node,
256 message=msg,
257 resolved=False
258 )
259 return self.error
260 self.disp.log_debug(
261 f"action_detail = {action_detail}",
262 title
263 )
264 self.variables.create_scope(scope_name=variable_scope)
265 self.variables.clear_variables(scope=variable_scope)
266 cache_busting = self.cache_busting(10)
267 node_key = f"node_data_{node}_{cache_busting}"
268 self.variables.add_variable(
269 name=node_key,
270 variable_data=action_detail[0],
271 variable_type=type(action_detail[0]),
272 scope=variable_scope
273 )
274 trigger_node: TriggerManagement = TriggerManagement(
275 variable=self.variables,
276 logger=self.logger,
277 runtime_data=self.runtime_data,
278 action_id=node,
279 error=self.error,
280 success=self.success,
281 scope=variable_scope,
282 debug=self.debug,
283 delay=CONST.API_REQUEST_DELAY
284 )
285 action_node: ActionManagement = ActionManagement(
286 variable=self.variables,
287 logger=self.logger,
288 runtime_data=self.runtime_data,
289 action_id=node,
290 error=self.error,
291 success=self.success,
292 scope=variable_scope,
293 debug=self.debug,
294 delay=CONST.API_REQUEST_DELAY
295 )
296 try:
297 status = trigger_node.run(node_key)
298 except Exception as e:
299 self.logger.log_fatal(
300 log_type=ACONST.TYPE_SERVICE_TRIGGER,
301 action_id=node,
302 message=f"Failed to run the trigger node: {e}",
303 resolved=False
304 )
305 self.dump_scope(
306 action_id=node,
307 scope=variable_scope,
308 log_type=ACONST.TYPE_SERVICE_TRIGGER
309 )
310 self.variables.clear_variables(scope=variable_scope)
311 self.unlock_action(node)
312 return self.error
313 if status == self.error:
314 self.logger.log_fatal(
315 log_type=ACONST.TYPE_SERVICE_TRIGGER,
316 action_id=node,
317 message="Failed to run the trigger node.",
318 resolved=False
319 )
320 self.dump_scope(
321 action_id=node,
322 scope=variable_scope,
323 log_type=ACONST.TYPE_SERVICE_TRIGGER
324 )
325 self.variables.clear_variables(scope=variable_scope)
326 self.unlock_action(node)
327 return self.error
328 try:
329 status = action_node.run(node_key)
330 except Exception as e:
331 self.logger.log_fatal(
332 log_type=ACONST.TYPE_SERVICE_ACTION,
333 action_id=node,
334 message=f"Failed to run the action node: {e}",
335 resolved=False
336 )
337 self.dump_scope(
338 action_id=node,
339 scope=variable_scope,
340 log_type=ACONST.TYPE_SERVICE_ACTION
341 )
342 self.unlock_action(node)
343 return self.error
344 if status == self.error:
345 self.logger.log_fatal(
346 log_type=ACONST.TYPE_SERVICE_ACTION,
347 action_id=node,
348 message="Failed to run the action node.",
349 resolved=False
350 )
351 self.dump_scope(
352 action_id=node,
353 scope=variable_scope,
354 log_type=ACONST.TYPE_SERVICE_ACTION
355 )
356 self.unlock_action(node)
357 return self.error
358 self.unlock_action(node)
359 return self.success
360
361 def is_action_locked(self, node: int) -> bool:
362 """_summary_
363 Function in charge of checking if the action is locked.
364
365 Args:
366 node (int): _description_
367
368 Returns:
369 bool: _description_
370 """
371 title = "is_action_locked"
372 self.disp.log_debug(f"Checking if action {node} is locked.", title)
373 locked = self.runtime_data.database_link.get_data_from_table(
374 table=CONST.TAB_ACTIONS,
375 column=["running"],
376 where=f"id={node}",
377 beautify=False
378 )
379 self.disp.log_debug(f"action info = {locked}", title)
380 if locked == self.error or len(locked) == 0:
381 return True
382 if locked[0][0] == 1:
383 self.disp.log_debug(f"Action {node} is locked.", title)
384 return True
385 self.disp.log_debug(f"Action {node} is not locked.", title)
386 return False
387
388 def execute_actions(self) -> Dict[str, int]:
389 """_summary_
390 The function in charge of processing actions.
391
392 Returns:
393 int: _description_: The statuses of all the runs.
394 """
395 delay = self.random_delay(1)
396 sleep(delay)
397 execution_statuses = {}
398 actions = self.get_action_ids()
399 if actions == self.error:
400 return self.error
401 for action in actions:
402 if self.is_action_locked(action) is True:
403 continue
404 if self.lock_action(action) != self.success:
405 continue
406 execution_statuses[action] = self.process_action_node(action)
407 return execution_statuses
int dump_scope(self, int action_id, Any scope, str log_type=ACONST.TYPE_RUNTIME_ERROR)
Definition main.py:193
Union[int, List[str]] get_action_ids(self)
Definition main.py:90
float random_delay(self, float max_value=1)
Definition main.py:82
__init__(self, RuntimeData runtime_data, int error=84, int success=0, bool debug=False)
Definition main.py:26
int lock_action(self, int node)
Definition main.py:111
Dict[str, int] execute_actions(self)
Definition main.py:388
int unlock_action(self, int node)
Definition main.py:152
int cache_busting(self, int length)
Definition main.py:63
bool is_action_locked(self, int node)
Definition main.py:361
int process_action_node(self, int node)
Definition main.py:231