Terarea  2
The automation project
Loading...
Searching...
No Matches
non_web.py
Go to the documentation of this file.
1"""_summary_
2 This is the the class in charge of containing the non-http boilerplates.
3"""
4
5import re
6import uuid
7import json
8from random import randint
9from datetime import datetime, timedelta
10from typing import Union, List, Dict, Any
11
12from fastapi import Response
13from display_tty import Disp, TOML_CONF, FILE_DESCRIPTOR, SAVE_TO_FILE, FILE_NAME
14
15from ..components import RuntimeData, CONST
16from ..sql.sql_manager import SQL
17
18
20 """_summary_
21 """
22
23 def __init__(self, runtime_data_initialised: RuntimeData, success: int = 0, error: int = 84, debug: bool = False) -> None:
24 """_summary_
25 """
26 self.debug: bool = debug
27 self.error: int = error
28 self.success: int = success
29 self.runtime_data_initialised: RuntimeData = runtime_data_initialised
30 # ------------------------ The logging function ------------------------
31 self.disp: Disp = Disp(
32 TOML_CONF,
33 FILE_DESCRIPTOR,
34 SAVE_TO_FILE,
35 FILE_NAME,
36 debug=self.debug,
37 logger=self.__class__.__name__
38 )
39
40 def pause(self) -> str:
41 """_summary_
42 This is a pause function that works in the same wat as the batch pause command.
43 It pauses the program execution until the user presses the enter key.
44
45 Returns:
46 str: _description_: The input from the user
47 """
48 return input("Press enter to continue...")
49
50 def set_lifespan(self, seconds: int) -> datetime:
51 """
52 The function to set the lifespan of the user token
53 Args:
54 seconds (int): Seconds
55
56 Returns:
57 datetime: The datetime of the lifespan of the token
58 """
59 current_time = datetime.now()
60 offset_time = current_time + timedelta(seconds=seconds)
61 return offset_time
62
63 def is_token_correct(self, token: str) -> bool:
64 """_summary_
65 Check if the token is correct.
66 Args:
67 token (str): _description_: The token to check
68
69 Returns:
70 bool: _description_: True if the token is correct, False otherwise
71 """
72 title = "is_token_correct"
73 self.disp.log_debug("Checking if the token is correct.", title)
74 if isinstance(token, str) is False:
75 return False
76 login_table = self.runtime_data_initialised.database_link.get_data_from_table(
77 CONST.TAB_CONNECTIONS,
78 "*",
79 where=f"token={token}",
80 beautify=False
81 )
82 if isinstance(login_table, int):
83 return False
84 if len(login_table) != 1:
85 return False
86 self.disp.log_debug(f"login_table = {login_table}", title)
87 if datetime.now() > login_table[0][-1]:
88 return False
89 new_date = self.runtime_data_initialised.boilerplate_non_http_initialised.set_lifespan(
90 CONST.UA_TOKEN_LIFESPAN
91 )
92 new_date_str = self.runtime_data_initialised.database_link.datetime_to_string(
93 datetime_instance=new_date,
94 date_only=False,
95 sql_mode=True
96 )
97 self.disp.log_debug(f"string date: {new_date_str}", title)
98 status = self.runtime_data_initialised.database_link.update_data_in_table(
99 table=CONST.TAB_CONNECTIONS,
100 data=new_date_str,
101 column="expiration_date",
102 where=f"token={token}"
103 )
104 if status != self.success:
105 self.disp.log_warning(
106 f"Failed to update expiration_date for {token}.",
107 title
108 )
109 return True
110
111 def generate_token(self) -> str:
112 """_summary_
113 This is a function that will generate a token for the user.
114 Returns:
115 str: _description_: The token generated
116 """
117 title = "generate_token"
118 token = str(uuid.uuid4())
119 user_token = self.runtime_data_initialised.database_link.get_data_from_table(
120 table=CONST.TAB_CONNECTIONS,
121 column="token",
122 where=f"token='{token}'",
123 beautify=False
124 )
125 self.disp.log_debug(f"user_token = {user_token}", title)
126 while len(user_token) > 0:
127 token = str(uuid.uuid4())
128 user_token = self.runtime_data_initialised.database_link.get_data_from_table(
129 table=CONST.TAB_CONNECTIONS,
130 column="token",
131 where=f"token='{token}'",
132 beautify=False
133 )
134 self.disp.log_debug(f"user_token = {user_token}", title)
135 if isinstance(user_token, int) is True and user_token == self.error:
136 return token
137 if len(user_token) == 0:
138 return token
139 return token
140
141 def server_show_item_content(self, function_name: str = "show_item_content", item_name: str = "", item: object = None, show: bool = True) -> None:
142 """_summary_
143 This is a function that will display the content of an item.
144 The purpose of this function is more for debugging purposes.
145 Args:
146 function_name (str, optional): _description_. Defaults to "show_item_content".
147 item (object, optional): _description_. Defaults to None.
148 """
149 if show is False:
150 return
151 self.disp.log_debug(
152 f"({function_name}) dir({item_name}) = {dir(item)}",
153 "pet_server_show_item_content"
154 )
155 for i in dir(item):
156 if i in ("auth", "session", "user"):
157 self.disp.log_debug(
158 f"({function_name}) skipping {item_name}.{i}"
159 )
160 continue
161 self.disp.log_debug(
162 f"({function_name}) {item_name}.{i} = {getattr(item, i)}"
163 )
164
165 def check_date(self, date: str = "DD/MM/YYYY") -> bool:
166 """_summary_
167 This is a function that will check if the date is correct or not.
168 Args:
169 date (str, optional): _description_: The date to check. Defaults to "DD/MM/YYYY".
170
171 Returns:
172 bool: _description_: True if the date is correct, False otherwise
173 """
174 pattern = re.compile(
175 r"^(0[1-9]|[12][0-9]|3[01])/(0[1-9]|1[0-2])/\d{4}$"
176 )
177 match = pattern.match(date)
178 return bool(match)
179
180 def check_database_health(self) -> None:
181 """_summary_
182 This function will reconnect to the database in case it has been disconnected.
183 """
184 if self.runtime_data_initialised.database_link is None:
185 try:
186 self.disp.log_debug(
187 "database_link is none, initialising sql.",
188 "check_database_health"
189 )
190 self.runtime_data_initialised.database_link = SQL(
191 url=CONST.DB_HOST,
192 port=CONST.DB_PORT,
193 username=CONST.DB_USER,
194 password=CONST.DB_PASSWORD,
195 db_name=CONST.DB_DATABASE,
196 success=self.success,
197 error=self.error,
198 debug=self.debug
199 )
200 except RuntimeError as e:
201 msg = "Could not connect to the database."
202 raise RuntimeError(msg) from e
203
204 if self.runtime_data_initialised.database_link.is_connected() is False:
205 if self.runtime_data_initialised.database_link.connect_to_db() is False:
206 try:
207 self.disp.log_debug(
208 "database_link is none, initialising sql.",
209 "check_database_health"
210 )
211 self.runtime_data_initialised.database_link = SQL(
212 url=CONST.DB_HOST,
213 port=CONST.DB_PORT,
214 username=CONST.DB_USER,
215 password=CONST.DB_PASSWORD,
216 db_name=CONST.DB_DATABASE,
217 success=self.success,
218 error=self.error,
219 debug=self.debug
220 )
221 except RuntimeError as e:
222 msg = "(check_database_health) Could not connect to the database."
223 raise RuntimeError(msg) from e
224
225 def is_token_admin(self, token: str) -> bool:
226 """_summary_
227 Check if the user's token has admin privileges.
228 Args:
229 token (str): _description_
230
231 Returns:
232 bool: _description_
233 """
234 title = "is_token_admin"
235 user_id = self.runtime_data_initialised.database_link.get_data_from_table(
236 table=CONST.TAB_CONNECTIONS,
237 column="user_id",
238 where=f"token='{token}'",
239 beautify=False
240 )
241 if isinstance(user_id, int) is True and user_id == self.error or not user_id:
242 self.disp.log_error(
243 f"Failed to find token {token} in the database.", title
244 )
245 return False
246 self.disp.log_debug(f"usr_id = {user_id}", title)
247 user_info = self.runtime_data_initialised.database_link.get_data_from_table(
248 table=CONST.TAB_ACCOUNTS,
249 column="admin",
250 where=f"id={user_id[0][0]}",
251 beautify=False
252 )
253 if isinstance(user_info, int) is True and user_info == self.error:
254 self.disp.log_error(
255 f"Failed to find user {user_id[0][0]} in the database.", title
256 )
257 return False
258 self.disp.log_debug(f"usr_info = {user_info}", title)
259 return user_info[0][0] == 1
260
261 def generate_check_token(self, token_size: int = 4) -> str:
262 """_summary_
263 Create a token that can be used for e-mail verification.
264
265 Returns:
266 str: _description_
267 """
268 if isinstance(token_size, (int, float)) is False:
269 token_size = 4
270 token_size = int(token_size)
271 token_size = max(token_size, 0)
272 code = f"{randint(CONST.RANDOM_MIN, CONST.RANDOM_MAX)}"
273 for i in range(token_size):
274 code += f"-{randint(CONST.RANDOM_MIN, CONST.RANDOM_MAX)}"
275 return code
276
277 def get_user_id_from_token(self, title: str, token: str) -> Union[str, Response]:
278 """_summary_
279 The function in charge of getting the user id based of the provided content.
280
281 Args:
282 title (str): _description_: The title of the endpoint calling it
283 token (str): _description_: The token of the user account
284
285 Returns:
286 Union[str, Response]: _description_: Returns as string id if success, otherwise, a pre-made response for the endpoint.
287 """
288 function_title = "get_user_id_from_token"
289 usr_id_node: str = "user_id"
290 self.disp.log_debug(
291 f"Getting user id based on {token}", function_title
292 )
293 current_user: List[Dict[str]] = self.runtime_data_initialised.database_link.get_data_from_table(
294 table=CONST.TAB_CONNECTIONS,
295 column="*",
296 where=f"token='{token}'",
297 beautify=True
298 )
299 self.disp.log_debug(f"current_user = {current_user}", function_title)
300 if current_user == self.error:
301 return self.runtime_data_initialised.boilerplate_responses_initialised.user_not_found(title, token)
302 self.disp.log_debug(
303 f"user_length = {len(current_user)}", function_title
304 )
305 if len(current_user) == 0 or len(current_user) > 1:
306 return self.runtime_data_initialised.boilerplate_responses_initialised.user_not_found(title, token)
307 self.disp.log_debug(
308 f"current_user[0] = {current_user[0]}", function_title
309 )
310 if usr_id_node not in current_user[0]:
311 return self.runtime_data_initialised.boilerplate_responses_initialised.user_not_found(title, token)
312 msg = "str(current_user[0]["
313 msg += f"{usr_id_node}]) = {str(current_user[0][usr_id_node])}"
314 self.disp.log_debug(msg, function_title)
315 return str(current_user[0][usr_id_node])
316
317 def update_user_data(self, title: str, usr_id: str, line_content: List[str]) -> Union[int, Response]:
318 """_summary_
319 Update the account information based on the provided line.
320
321 Args:
322 title (str): _description_: This is the title of the endpoint
323 usr_id (str): _description_: This is the id of the user that needs to be updated
324 line_content (List[str]): _description_: The content of the line to be edited.
325
326 Returns:
327 Union[int, Response]: _description_
328 """
329 self.disp.log_debug(f"Compile line_content: {line_content}.", title)
330 columns: List[str] = self.runtime_data_initialised.database_link.get_table_column_names(
331 CONST.TAB_ACCOUNTS
332 )
333 self.disp.log_debug(f"Removing id from columns: {columns}.", title)
334 columns.pop(0)
335 status = self.runtime_data_initialised.database_link.update_data_in_table(
336 table=CONST.TAB_ACCOUNTS,
337 data=line_content,
338 column=columns,
339 where=f"id='{usr_id}'"
340 )
341 if status == self.error:
342 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, usr_id)
343 return self.success
344
345 def remove_user_from_tables(self, where: str, tables: List[str]) -> Union[int, Dict[str, int]]:
346 """_summary_
347 Remove the user from the provided tables.
348
349 Args:
350 where (str): _description_: The id of the user to remove
351 tables (List[str]): _description_: The tables to remove the user from
352
353 Returns:
354 int: _description_: The status of the operation
355 """
356 title = "remove_user_from_tables"
357 if isinstance(tables, (List, tuple, str)) is False:
358 self.disp.log_error(
359 f"Expected tables to be of type list but got {type(tables)}",
360 title
361 )
362 return self.error
363 if isinstance(tables, str) is True:
364 self.disp.log_warning(
365 "Tables is of type str, converting to list[str].", title
366 )
367 tables = [tables]
368 deletion_status = {}
369 for table in tables:
370 status = self.runtime_data_initialised.database_link.remove_data_from_table(
371 table=table,
372 where=where
373 )
374 deletion_status[str(table)] = status
375 if status == self.error:
376 self.disp.log_warning(
377 f"Failed to remove data from table: {table}",
378 title
379 )
380 return deletion_status
381
382 def update_single_data(self, table: str, column_finder: str, column_to_update: str, data_finder: str, request_body: dict) -> int:
383 """
384 The function in charge of updating the data in the database
385 """
386 if self.runtime_data_initialised.database_link.update_data_in_table(
387 table,
388 [request_body[column_to_update]],
389 [column_to_update],
390 f"{column_finder}='{data_finder}'"
391 ) == self.error:
392 return self.error
393 return self.success
394
395 def get_actions(self, service_id: str) -> List[Dict[str, Any]]:
396 """_summary_
397 Get the actions that are available for the service
398
399 Args:
400 service_id (str): _description_
401
402 Returns:
403 List[Dict[str, Any]]: _description_
404 """
405 title = "get_actions"
406 result = []
407 self.disp.log_debug("Gathering actions", title)
408 actions = self.runtime_data_initialised.database_link.get_data_from_table(
409 table=CONST.TAB_ACTION_TEMPLATE,
410 column="*",
411 where=f"service_id='{service_id}' AND type='trigger'",
412 beautify=True
413 )
414 self.disp.log_debug(f"actions = {actions}", title)
415 if actions == self.error:
416 self.disp.log_error("Failed to get actions", title)
417 return result
418 for index, item in enumerate(actions):
419 try:
420 if "json" not in item:
421 self.disp.log_error(f"json not in item {index}", title)
422 self.disp.log_warning("Skipping action", title)
423 continue
424 if item["type"].lower() != "trigger":
425 self.disp.log_debug(
426 f"item {index} is not a trigger", title
427 )
428 self.disp.log_debug("Skipping action", title)
429 continue
430 self.disp.log_debug(f"item = {item}", title)
431 self.disp.log_debug("Getting json node.", title)
432 node_str = item["json"]
433 node = json.loads(node_str)
434 self.disp.log_debug("data has been converted to json", title)
435 self.disp.log_debug(f"node = {node}", title)
436 self.disp.log_debug(f"node keys = {list(node)}", title)
437 self.disp.log_debug("Adding reaction to result", title)
438 result.append(
439 {
440 "name": node["name"],
441 "description": node["description"],
442 }
443 )
444 self.disp.log_debug("Reaction gathered.", title)
445 except Exception as e:
446 self.disp.log_error(f"Failed to parse reaction {index}", title)
447 self.disp.log_error(f"Error: {e}", title)
448 self.disp.log_info("Skipping reaction", title)
449 self.disp.log_debug(f"result = {result}", title)
450 return result
451
452 def get_reactions(self, service_id: str) -> List[Dict[str, Any]]:
453 """_summary_
454 Get the reactions that are available for the service
455
456 Args:
457 service_id (str): _description_
458
459 Returns:
460 List[Dict[str, Any]]: _description_
461 """
462 title = "get_reactions"
463 self.disp.log_debug("Gathering reactions", title)
464 result = []
465 reactions = self.runtime_data_initialised.database_link.get_data_from_table(
466 table=CONST.TAB_ACTION_TEMPLATE,
467 column="*",
468 where=f"service_id='{service_id}' AND type='action'",
469 beautify=True
470 )
471 self.disp.log_debug(f"reactions = {reactions}", title)
472 if reactions == self.error:
473 self.disp.log_error("Failed to get reactions", title)
474 return result
475 for index, item in enumerate(reactions):
476 try:
477 if "json" not in item:
478 self.disp.log_error(f"json not in item {index}", title)
479 self.disp.log_warning("Skipping action", title)
480 continue
481 if item["type"].lower() != "action":
482 self.disp.log_debug(
483 f"item {index} is not a action", title
484 )
485 self.disp.log_debug("Skipping trigger", title)
486 continue
487 self.disp.log_debug(f"item = {item}", title)
488 self.disp.log_debug("Getting json node.", title)
489 node_str = item["json"]
490 node = json.loads(node_str)
491 self.disp.log_debug("data has been converted to json", title)
492 self.disp.log_debug(f"node = {node}", title)
493 self.disp.log_debug(f"node keys = {list(node)}", title)
494 self.disp.log_debug("Adding reaction to result", title)
495 result.append(
496 {
497 "name": node["name"],
498 "description": node["description"],
499 }
500 )
501 self.disp.log_debug("Reaction gathered.", title)
502 except Exception as e:
503 self.disp.log_error(f"Failed to parse reaction {index}", title)
504 self.disp.log_error(f"Error: {e}", title)
505 self.disp.log_info("Skipping reaction", title)
506 self.disp.log_debug(f"result = {result}", title)
507 return result
508
509 def get_services(self) -> List[Dict[str, Any]]:
510 """_summary_
511 Get the services that are available.
512
513 Returns:
514 List[Dict[str, Any]]: _description_
515 """
516 title = "get_services"
517 self.disp.log_debug("Gathering services", title)
518 result = []
519 services = self.runtime_data_initialised.database_link.get_data_from_table(
520 table=CONST.TAB_SERVICES,
521 column="*",
522 where="",
523 beautify=True
524 )
525 self.disp.log_debug(f"services = {services}", title)
526 if services == self.error:
527 self.disp.log_error("Failed to get services", title)
528 return result
529 for index, item in enumerate(services):
530 result.append(
531 {
532 "name": item["name"],
533 "actions": self.get_actions(services[index]["id"]),
534 "reactions": self.get_reactions(services[index]["id"])
535 }
536 )
537 self.disp.log_debug(f"result = {result}", title)
538 return result
539
540 def hide_api_key(self, api_key: str) -> str:
541 """_summary_
542 Hide the api key from the user.
543
544 Args:
545 api_key (str): _description_: The api key to hide
546
547 Returns:
548 str: _description_: The hidden api key
549 """
550 title = "hide_api_key"
551 self.disp.log_debug(f"api_key = {api_key}", title)
552 if api_key is None:
553 api_key = "No api key"
554 else:
555 api_key = "Some api key"
556 self.disp.log_debug(f"api_key after: {api_key}", title)
557 return api_key
Union[int, Response] update_user_data(self, str title, str usr_id, List[str] line_content)
Definition non_web.py:317
None server_show_item_content(self, str function_name="show_item_content", str item_name="", object item=None, bool show=True)
Definition non_web.py:141
bool check_date(self, str date="DD/MM/YYYY")
Definition non_web.py:165
List[Dict[str, Any]] get_actions(self, str service_id)
Definition non_web.py:395
List[Dict[str, Any]] get_services(self)
Definition non_web.py:509
None __init__(self, RuntimeData runtime_data_initialised, int success=0, int error=84, bool debug=False)
Definition non_web.py:23
datetime set_lifespan(self, int seconds)
Definition non_web.py:50
Union[str, Response] get_user_id_from_token(self, str title, str token)
Definition non_web.py:277
int update_single_data(self, str table, str column_finder, str column_to_update, str data_finder, dict request_body)
Definition non_web.py:382
Union[int, Dict[str, int]] remove_user_from_tables(self, str where, List[str] tables)
Definition non_web.py:345
str generate_check_token(self, int token_size=4)
Definition non_web.py:261
List[Dict[str, Any]] get_reactions(self, str service_id)
Definition non_web.py:452