Terarea  2
The automation project
Loading...
Searching...
No Matches
sql_query_boilerplates.py
Go to the documentation of this file.
1"""
2 File in charge of storing the functions that will interract directly with the database.
3"""
4
5from typing import List, Dict, Union, Any
6
7import mysql
8import mysql.connector
9from display_tty import Disp, TOML_CONF, SAVE_TO_FILE, FILE_NAME
10
11
12from . import sql_constants as SCONST
13from .sql_injection import SQLInjection
14from .sql_connections import SQLManageConnections
15from .sql_sanitisation_functions import SQLSanitiseFunctions
16
17
19 """_summary_
20 """
21
22 def __init__(self, sql_pool: SQLManageConnections, success: int = 0, error: int = 84, debug: bool = False) -> None:
23 """_summary_
24 The class in charge of managing sql queries (it contains the required boilerplate functions).
25
26 Args:
27 sql_pool (SQLManageConnections): _description_
28 success (int, optional): _description_. Defaults to 0.
29 error (int, optional): _description_. Defaults to 84.
30 debug (bool, optional): _description_. Defaults to False.
31 """
32 # -------------------------- Inherited values --------------------------
33 self.sql_pool: SQLManageConnections = sql_pool
34 self.error: int = error
35 self.debug: bool = debug
36 self.success: int = success
37 # --------------------------- logger section ---------------------------
38 self.disp: Disp = Disp(
39 TOML_CONF,
40 SAVE_TO_FILE,
41 FILE_NAME,
42 debug=self.debug,
43 logger=self.__class__.__name__
44 )
45 # ---------------------- The anty injection class ----------------------
46 self.sql_injection: SQLInjection = SQLInjection(
47 self.error,
48 self.success,
49 self.debug
50 )
51 # -------------------- Keyword sanitizing functions --------------------
52 self.sanitize_functions: SQLSanitiseFunctions = SQLSanitiseFunctions(
53 success=self.success, error=self.error, debug=self.debug
54 )
55
56 def get_table_column_names(self, table_name: str) -> Union[List[str], int]:
57 """_summary_
58 Get the names of the columns in a table.
59
60 Args:
61 table_name (str): _description_
62
63 Returns:
64 List[str]: _description_
65 """
66 title = "get_table_column_names"
67 try:
68 columns = self.describe_table(table_name)
69 if isinstance(columns, int) is True:
70 self.disp.log_error(
71 f"Failed to describe table {table_name}.",
72 title
73 )
74 return self.error
75 data = []
76 for i in columns:
77 data.append(i[0])
78 return data
79 except RuntimeError as e:
80 msg = "Error: Failed to get column names of the tables.\n"
81 msg += f"\"{str(e)}\""
82 self.disp.log_error(msg, "get_table_column_names")
83 return self.error
84
85 def get_table_names(self) -> Union[int, List[str]]:
86 """_summary_
87 Get the names of the tables in the database.
88
89 Returns:
90 List[str]: _description_
91 """
92 title = "get_table_names"
93 self.disp.log_debug("Getting table names.", title)
94 resp = self.sql_pool.run_and_fetch_all(query="SHOW TABLES")
95 if isinstance(resp, int) is True:
96 self.disp.log_error(
97 "Failed to fetch the table names.",
98 title
99 )
100 return self.error
101 data = []
102 for i in resp:
103 data.append(i[0])
104 self.disp.log_debug("Tables fetched", title)
105 return data
106
107 def describe_table(self, table: str) -> Union[int, List[Any]]:
108 """_summary_
109 Fetch the headers (description) of a table from the database.
110
111 Args:
112 table (str): _description_: The name of the table to describe.
113
114 Raises:
115 RuntimeError: _description_: If there is a critical issue with the table or the database connection.
116
117 Returns:
118 Union[int, List[Any]]: _description_: A list containing the description of the table, or self.error if an error occurs.
119 """
120 title = "describe_table"
121 self.disp.log_debug(f"Describing table {table}", title)
122 if self.sql_injection.check_if_sql_injection(table) is True:
123 self.disp.log_error("Injection detected.", "sql")
124 return self.error
125 try:
126 resp = self.sql_pool.run_and_fetch_all(query=f"DESCRIBE {table}")
127 if isinstance(resp, int) is True:
128 self.disp.log_error(
129 f"Failed to describe table {table}", title
130 )
131 return self.error
132 return resp
133 except mysql.connector.errors.ProgrammingError as pe:
134 msg = f"ProgrammingError: The table '{table}'"
135 msg += "does not exist or the query failed."
136 self.disp.log_critical(msg, title)
137 raise RuntimeError(msg) from pe
138 except mysql.connector.errors.IntegrityError as ie:
139 msg = "IntegrityError: There was an integrity constraint "
140 msg += f"issue while describing the table '{table}'."
141 self.disp.log_critical(msg, title)
142 raise RuntimeError(msg) from ie
143 except mysql.connector.errors.OperationalError as oe:
144 msg = "OperationalError: There was an operational error "
145 msg += f"while describing the table '{table}'."
146 self.disp.log_critical(msg, title)
147 raise RuntimeError(msg) from oe
148 except mysql.connector.Error as e:
149 msg = "MySQL Error: An unexpected error occurred while "
150 msg += f"describing the table '{table}'."
151 self.disp.log_critical(msg, title)
152 raise RuntimeError(msg) from e
153 except RuntimeError as e:
154 msg = "A runtime error occurred during the table description process."
155 self.disp.log_critical(msg, title)
156 raise RuntimeError(msg) from e
157
158 def insert_data_into_table(self, table: str, data: Union[List[List[str]], List[str]], column: Union[List[str], None] = None) -> int:
159 """_summary_
160 Insert data into a table.
161
162 Args:
163 table (str): _description_
164 data (List[List[str]]): _description_
165 column (List[str]): _description_
166
167 Returns:
168 int: _description_
169 """
170 title = "insert_data_into_table"
171 self.disp.log_debug("Inserting data into the table.", title)
172 if column is None:
173 column = ""
174 if self.sql_injection.check_if_injections_in_strings([table, data, column]) is True:
175 self.disp.log_error("Injection detected.", "sql")
176 return self.error
177
178 if column == "":
179 column = self.get_table_column_names(table)
180
181 column = self.sanitize_functions.escape_risky_column_names(column)
182
183 column_str = ", ".join(column)
184 column_length = len(column)
185
186 if isinstance(data, List) is True and (len(data) > 0 and isinstance(data[0], List) is True):
187 self.disp.log_debug("processing double array", title)
188 values = ""
189 max_lengths = len(data)
190 for index, line in enumerate(data):
191 values += self.sanitize_functions.process_sql_line(
192 line, column, column_length
193 )
194 if index < max_lengths - 1:
195 values += ", "
196 if index == max_lengths - 1:
197 break
198
199 elif isinstance(data, List) is True:
200 self.disp.log_debug("processing single array", title)
201 values = self.sanitize_functions.process_sql_line(
202 data, column, column_length
203 )
204 else:
205 self.disp.log_error(
206 "data is expected to be, either of type: List[str] or List[List[str]]",
207 title
208 )
209 return self.error
210 sql_query = f"INSERT INTO {table} ({column_str}) VALUES {values}"
211 self.disp.log_debug(f"sql_query = '{sql_query}'", title)
212 return self.sql_pool.run_editing_command(sql_query, table, "insert")
213
214 def get_data_from_table(self, table: str, column: Union[str, List[str]], where: Union[str, List[str]] = "", beautify: bool = True) -> Union[int, List[Dict[str, Any]]]:
215 """_summary_
216
217 Args:
218 table (str): _description_
219 column (Union[str, List[str]]): _description_
220 where (Union[str, List[str]]): _description_
221
222 Returns:
223 Union[int, List[Dict[str, Any]]]: _description_: Will return the data you requested, self.error otherwise
224 """
225 title = "get_data_from_table"
226 self.disp.log_debug(f"fetching data from the table {table}", title)
227 if self.sql_injection.check_if_injections_in_strings([table, column]) is True or self.sql_injection.check_if_symbol_and_command_injection(where) is True:
228 self.disp.log_error("Injection detected.", "sql")
229 return self.error
230 if isinstance(column, list) is True:
231 column = self.sanitize_functions.escape_risky_column_names(column)
232 column = ", ".join(column)
233 sql_command = f"SELECT {column} FROM {table}"
234 if isinstance(where, (str, List)) is True:
235 where = self.sanitize_functions.escape_risky_column_names_where_mode(
236 where
237 )
238 if isinstance(where, List) is True:
239 where = " AND ".join(where)
240 if where != "":
241 sql_command += f" WHERE {where}"
242 self.disp.log_debug(f"sql_query = '{sql_command}'", title)
243 resp = self.sql_pool.run_and_fetch_all(query=sql_command)
244 if isinstance(resp, int) is True and resp != self.success:
245 self.disp.log_error(
246 "Failed to fetch the data from the table.", title
247 )
248 return self.error
249 self.disp.log_debug(f"Queried data: {resp}", title)
250 if beautify is False:
251 return resp
252 data = self.describe_table(table)
253 return self.sanitize_functions.beautify_table(data, resp)
254
255 def get_table_size(self, table: str, column: Union[str, List[str]], where: Union[str, List[str]] = "") -> Union[int]:
256 """_summary_
257 Get the size of a table.
258
259 Args:
260 table (str): _description_
261 column (Union[str, List[str]]): _description_
262 where (Union[str, List[str]]): _description_
263
264 Returns:
265 int: _description_: Return the size of the table, -1 if an error occurred.
266 """
267 title = "get_table_size"
268 self.disp.log_debug(f"fetching data from the table {table}", title)
269 if self.sql_injection.check_if_injections_in_strings([table, column]) is True or self.sql_injection.check_if_symbol_and_command_injection(where) is True:
270 self.disp.log_error("Injection detected.", "sql")
271 return SCONST.GET_TABLE_SIZE_ERROR
272 if isinstance(column, list) is True:
273 column = ", ".join(column)
274 sql_command = f"SELECT COUNT({column}) FROM {table}"
275 if isinstance(where, (str, List)) is True:
276 where = self.sanitize_functions.escape_risky_column_names_where_mode(
277 where
278 )
279 if isinstance(where, List) is True:
280 where = " AND ".join(where)
281 if where != "":
282 sql_command += f" WHERE {where}"
283 self.disp.log_debug(f"sql_query = '{sql_command}'", title)
284 resp = self.sql_pool.run_and_fetch_all(query=sql_command)
285 if isinstance(resp, int) is True and resp != self.success:
286 self.disp.log_error(
287 "Failed to fetch the data from the table.", title
288 )
289 return SCONST.GET_TABLE_SIZE_ERROR
290 if len(resp) == 0:
291 self.disp.log_error(
292 "There was no data returned by the query.", title
293 )
294 return SCONST.GET_TABLE_SIZE_ERROR
295 if isinstance(resp[0], tuple) is False:
296 self.disp.log_error("The data returned is not a tuple.", title)
297 return SCONST.GET_TABLE_SIZE_ERROR
298 return resp[0][0]
299
300 def update_data_in_table(self, table: str, data: List[str], column: List, where: Union[str, List[str]] = "") -> int:
301 """_summary_
302 Update the data contained in a table.
303
304 Args:
305 table (str): _description_
306 data (Union[List[List[str]], List[str]]): _description_
307 column (List): _description_
308
309 Returns:
310 int: _description_
311 """
312 title = "update_data_in_table"
313 msg = f"Updating the data contained in the table: {table}"
314 self.disp.log_debug(msg, title)
315 if column is None:
316 column = ""
317
318 if self.sql_injection.check_if_injections_in_strings([table, column, data]) is True or self.sql_injection.check_if_symbol_and_command_injection(where) is True:
319 self.disp.log_error("Injection detected.", "sql")
320 return self.error
321
322 if column == "":
323 column = self.get_table_column_names(table)
324
325 column = self.sanitize_functions.escape_risky_column_names(column)
326
327 if isinstance(column, str) and isinstance(data, str):
328 data = [data]
329 column = [column]
330 column_length = len(column)
331
332 column_length = len(column)
333 self.disp.log_debug(
334 f"data = {data}, column = {column}, length = {column_length}",
335 title
336 )
337
338 where = self.sanitize_functions.escape_risky_column_names_where_mode(
339 where
340 )
341
342 if isinstance(where, List) is True:
343 where = " AND ".join(where)
344
345 update_line = self.sanitize_functions.compile_update_line(
346 data, column, column_length
347 )
348
349 sql_query = f"UPDATE {table} SET {update_line}"
350
351 if where != "":
352 sql_query += f" WHERE {where}"
353
354 self.disp.log_debug(f"sql_query = '{sql_query}'", title)
355
356 return self.sql_pool.run_editing_command(sql_query, table, "update")
357
358 def insert_or_update_data_into_table(self, table: str, data: Union[List[List[str]], List[str]], columns: Union[List[str], None] = None) -> int:
359 """_summary_
360 Insert or update data into a given table.
361
362 Args:
363 table (str): Table name.
364 data (Union[List[List[str]], List[str]]): Data to insert or update.
365 columns (Union[List[str], None], optional): Column names. Defaults to None.
366
367 Returns:
368 int: Success or error code.
369 """
370 title = "insert_or_update_data_into_table"
371 self.disp.log_debug(
372 "Inserting or updating data into the table.", title)
373
374 if self.sql_injection.check_if_injections_in_strings([table] + (columns or [])) is True:
375 self.disp.log_error("SQL Injection detected.", "sql")
376 return self.error
377
378 if columns is None:
379 columns = self.get_table_column_names(table)
380
381 table_content = self.get_data_from_table(
382 table=table, column=columns, where="", beautify=False
383 )
384 if isinstance(table_content, int) and table_content != self.success:
385 self.disp.log_critical(
386 f"Failed to retrieve data from table {table}", title
387 )
388 return self.error
389
390 if isinstance(data, list) and data and isinstance(data[0], list):
391 self.disp.log_debug("Processing double data list", title)
392 table_content_dict = {str(line[0]): line for line in table_content}
393
394 for line in data:
395 if not line:
396 self.disp.log_warning("Empty line, skipping.", title)
397 continue
398 node0 = str(line[0])
399 if node0 in table_content_dict:
401 table, line, columns,
402 f"{columns[0]} = {node0}"
403 )
404 else:
405 self.insert_data_into_table(table, line, columns)
406
407 elif isinstance(data, list):
408 self.disp.log_debug("Processing single data list", title)
409 if not data:
410 self.disp.log_warning("Empty data list, skipping.", title)
411 return self.success
412
413 node0 = str(data[0])
414 for line in table_content:
415 if str(line[0]) == node0:
416 return self.update_data_in_table(table, data, columns, f"{columns[0]} = {node0}")
417 return self.insert_data_into_table(table, data, columns)
418
419 else:
420 self.disp.log_error(
421 "Data must be of type List[str] or List[List[str]]", title
422 )
423 return self.error
424
425 def remove_data_from_table(self, table: str, where: Union[str, List[str]] = "") -> int:
426 """_summary_
427 Remove the data from a table.
428 Args:
429 table (str): _description_
430 data (List): _description_
431 column (List): _description_
432
433 Returns:
434 int: _description_
435 """
436 self.disp.log_debug(
437 f"Removing data from table {table}",
438 "remove_data_from_table"
439 )
440 if self.sql_injection.check_if_sql_injection(table) is True or self.sql_injection.check_if_symbol_and_command_injection(where) is True:
441 self.disp.log_error("Injection detected.", "sql")
442 return self.error
443
444 if isinstance(where, List) is True:
445 where = " AND ".join(where)
446
447 sql_query = f"DELETE FROM {table}"
448
449 if where != "":
450 sql_query += f" WHERE {where}"
451
452 self.disp.log_debug(
453 f"sql_query = '{sql_query}'",
454 "remove_data_from_table"
455 )
456
457 return self.sql_pool.run_editing_command(sql_query, table, "delete")
int insert_data_into_table(self, str table, Union[List[List[str]], List[str]] data, Union[List[str], None] column=None)
Union[int] get_table_size(self, str table, Union[str, List[str]] column, Union[str, List[str]] where="")
int update_data_in_table(self, str table, List[str] data, List column, Union[str, List[str]] where="")
Union[int, List[Dict[str, Any]]] get_data_from_table(self, str table, Union[str, List[str]] column, Union[str, List[str]] where="", bool beautify=True)
int insert_or_update_data_into_table(self, str table, Union[List[List[str]], List[str]] data, Union[List[str], None] columns=None)
None __init__(self, SQLManageConnections sql_pool, int success=0, int error=84, bool debug=False)
Union[List[str], int] get_table_column_names(self, str table_name)
int remove_data_from_table(self, str table, Union[str, List[str]] where="")