Terarea  2
The automation project
Loading...
Searching...
No Matches
sql_connections.py
Go to the documentation of this file.
1"""
2 File in charge of containing the class that will manage the sql connections.
3"""
4
5from typing import Union, Any
6
7import mysql
8import mysql.connector
9import mysql.connector.cursor
10from display_tty import Disp, TOML_CONF, SAVE_TO_FILE, FILE_NAME
11
12from . import sql_constants as SCONST
13from ..components import constants as CONST
14
15
17 """_summary_
18 """
19
20 def __init__(self, url: str, port: int, username: str, password: str, db_name: str, success: int = 0, error: int = 84, debug: bool = False) -> None:
21 """_summary_
22 This class is in charge of managing the connections to the sql database
23
24 Args:
25 url (str): _description_
26 port (int): _description_
27 username (str): _description_
28 password (str): _description_
29 db_name (str): _description_
30 succes (int, optional): _description_. Defaults to 0.
31 error (int, optional): _description_. Defaults to 84.
32 debug (bool, optional): _description_. Defaults to False.
33 """
34 # -------------------------- Inherited values --------------------------
35 self.error: int = error
36 self.debug: bool = debug
37 self.success: int = success
38 self.url: str = url
39 self.port: int = port
40 self.username: str = username
41 self.password: str = password
42 self.db_name: str = db_name
43 # --------------------------- logger section ---------------------------
44 self.disp: Disp = Disp(
45 TOML_CONF,
46 SAVE_TO_FILE,
47 FILE_NAME,
48 debug=self.debug,
49 logger=self.__class__.__name__
50 )
51 # -------------------------- Pool parameters --------------------------
53 "pool_name": CONST.DATABASE_POOL_NAME,
54 "pool_size": CONST.DATABASE_MAX_POOL_CONNECTIONS,
55 "pool_reset_session": CONST.DATABASE_RESET_POOL_NODE_CONNECTION,
56 "user": self.username,
57 "password": self.password,
58 "host": self.url,
59 "port": self.port,
60 "database": self.db_name,
61 "collation": CONST.DATABASE_COLLATION,
62 "connection_timeout": CONST.DATABASE_CONNECTION_TIMEOUT,
63 "allow_local_infile": CONST.DATABASE_LOCAL_INFILE,
64 "init_command": CONST.DATABASE_INIT_COMMAND,
65 "option_files": CONST.DATABASE_DEFAULT_FILE, # type error
66 "autocommit": CONST.DATABASE_AUTOCOMMIT,
67 "ssl_disabled": not CONST.DATABASE_SSL,
68 "ssl_key": CONST.DATABASE_SSL_KEY,
69 "ssl_cert": CONST.DATABASE_SSL_CERT,
70 "ssl_ca": CONST.DATABASE_SSL_CA,
71 "ssl_cipher": CONST.DATABASE_SSL_CIPHER,
72 "ssl_verify_cert": CONST.DATABASE_SSL_VERIFY_CERT
73 }
74 # ---------------- variables containing the connection ----------------
75 self.poolpool: Union[
76 None,
77 mysql.connector.pooling.MySQLConnectionPool
78 ] = None
79
80 def show_connection_info(self, func_name: str = "show_connection_info") -> None:
81 """
82 Show the connection information
83 """
84 msg = "\n"
85 for key, value in self.pool_parameters.items():
86 msg += f"{key} = '{value}': Type: {type(value)}\n"
87 self.disp.log_debug(msg, func_name)
88
89 def initialise_pool(self) -> int:
90 """_summary_
91 Initialise a connection to the database (but within a pool)
92
93 Raises:
94 RuntimeError: _description_: A runtime error is raised if it fails.
95
96 Returns:
97 int: _description_: Returns self.success if the function succeeds.
98 """
99 title = "initialise_pool"
100 self.disp.log_debug("Initialising the connection pool.", title)
101 try:
102 self.pool_parameters = {
103 "pool_name": CONST.DATABASE_POOL_NAME,
104 "pool_size": CONST.DATABASE_MAX_POOL_CONNECTIONS,
105 "pool_reset_session": CONST.DATABASE_RESET_POOL_NODE_CONNECTION,
106 "user": self.username,
107 "password": self.password,
108 "host": self.url,
109 "port": self.port,
110 "database": self.db_name,
111 "collation": CONST.DATABASE_COLLATION,
112 "connection_timeout": CONST.DATABASE_CONNECTION_TIMEOUT,
113 "allow_local_infile": CONST.DATABASE_LOCAL_INFILE,
114 "init_command": CONST.DATABASE_INIT_COMMAND,
115 "option_files": CONST.DATABASE_DEFAULT_FILE, # type error
116 "autocommit": CONST.DATABASE_AUTOCOMMIT,
117 "ssl_disabled": not CONST.DATABASE_SSL,
118 "ssl_key": CONST.DATABASE_SSL_KEY,
119 "ssl_cert": CONST.DATABASE_SSL_CERT,
120 "ssl_ca": CONST.DATABASE_SSL_CA,
121 "ssl_cipher": CONST.DATABASE_SSL_CIPHER,
122 "ssl_verify_cert": CONST.DATABASE_SSL_VERIFY_CERT
123 }
124 for i in SCONST.UNWANTED_ARGUMENTS:
125 if i in self.pool_parameters and self.pool_parameters[i] is None:
126 self.disp.log_debug(
127 f"Removed '{i}' from the pool parameters.", title
128 )
129 self.pool_parameters.pop(i)
130 self.show_connection_info(title)
131 self.poolpool = mysql.connector.pooling.MySQLConnectionPool(
132 **self.pool_parameters
133 )
134 return self.success
135 except mysql.connector.errors.ProgrammingError as pe:
136 msg = "ProgrammingError: The pool could not be initialized."
137 msg += f"Original error: {str(pe)}"
138 self.disp.log_critical(msg, title)
139 raise RuntimeError(msg) from pe
140 except mysql.connector.errors.IntegrityError as ie:
141 msg = "IntegrityError: Integrity issue while initializing the pool."
142 msg += f" Original error: {str(ie)}"
143 self.disp.log_critical(msg, title)
144 raise RuntimeError(msg) from ie
145 except mysql.connector.errors.OperationalError as oe:
146 msg = "OperationalError: Operational error occurred during pool initialization."
147 msg += f" Original error: {str(oe)}"
148 self.disp.log_critical(msg, title)
149 raise RuntimeError(msg) from oe
150 except mysql.connector.Error as e:
151 msg = "MySQL Error: An unexpected error occurred during pool initialization."
152 msg += f"Original error: {str(e)}"
153 self.disp.log_critical(msg, title)
154 raise RuntimeError(msg) from e
155
156 def get_connection(self) -> mysql.connector.pooling.PooledMySQLConnection:
157 """_summary_
158 Retrieves a connection from the pool.
159
160 Returns:
161 mysql.connector.pooling.PooledMySQLConnection: _description_: A pooled connection
162 """
163 title = "get_connection"
164 if self.poolpool is None:
165 raise RuntimeError("Connection pool is not initialized.")
166 try:
167 self.disp.log_debug("Getting an sql connection", title)
168 return self.poolpool.get_connection()
169 except mysql.connector.errors.OperationalError as oe:
170 msg = "OperationalError: Could not retrieve a connection from the pool."
171 msg += f" Original error: {str(oe)}"
172 self.disp.log_critical(msg, title)
173 raise RuntimeError(msg) from oe
174 except mysql.connector.Error as e:
175 msg = "MySQL Error: An unexpected error occurred while getting the connection."
176 msg += f" Original error: {str(e)}"
177 self.disp.log_critical(msg, title)
178 raise RuntimeError(msg) from e
179
180 def get_cursor(self, connection: mysql.connector.pooling.PooledMySQLConnection) -> mysql.connector.cursor.MySQLCursor:
181 """
182 Retrieves a cursor from the given connection.
183
184 Args:
185 connection (mysql.connector.pooling.PooledMySQLConnection): The active connection.
186
187 Returns:
188 mysql.connector.cursor.MySQLCursor: The cursor object.
189 """
190 if not self.is_connection_active(connection):
191 raise RuntimeError("Cannot get cursor, connection is not active.")
192 return connection.cursor()
193
194 def close_cursor(self, cursor: mysql.connector.cursor.MySQLCursor) -> int:
195 """
196 Closes the given cursor.
197
198 Args:
199 cursor (mysql.connector.cursor.MySQLCursor): The cursor to close.
200 """
201 title = "close_cursor"
202 self.disp.log_debug("Closing cursor, if it is open.", title)
203 if self.is_cursor_active(cursor):
204 self.disp.log_debug("Closing cursor", title)
205 cursor.close()
206 return self.success
207 self.disp.log_error(
208 "The cursor did not have an active connection.", title
209 )
210 return self.error
211
212 def return_connection(self, connection: mysql.connector.pooling.PooledMySQLConnection) -> int:
213 """
214 Returns a connection to the pool by closing it.
215
216 Args:
217 connection (mysql.connector.pooling.PooledMySQLConnection): The connection to close.
218 """
219 title = "return_connection"
220 self.disp.log_debug("Closing a database connection.", title)
221 if self.is_connection_active(connection):
222 self.disp.log_debug("Connection has been closed.", title)
223 connection.close()
224 return self.success
225 self.disp.log_error(
226 "Connection was not open in the first place.", title
227 )
228 return self.error
229
230 def destroy_pool(self) -> int:
231 """_summary_
232 Destroy the connection pool.
233
234 Returns:
235 int: _description_
236 """
237 title = "destroy_pool"
238 self.disp.log_debug("Destroying pool, if it exists.", title)
239 if self.poolpool is not None:
240 self.disp.log_debug("Destroying pool.", title)
241 del self.poolpool
242 self.poolpool = None
243 self.disp.log_warning("There was no pool to be destroyed.", title)
244 return self.success
245
246 def release_connection_and_cursor(self, connection: Union[mysql.connector.pooling.PooledMySQLConnection, None], cursor: Union[mysql.connector.pooling.PooledMySQLConnection, None] = None) -> None:
247 """_summary_
248
249 Args:
250 connection (Union[None]): _description_
251 cursor (Union[None]): _description_
252 """
253 title = "release_connection_and_cursor"
254 msg = "Connections have ended with status: "
255 self.disp.log_debug("Closing cursor.", title)
256 status = self.close_cursor(cursor)
257 msg += f"cursor = {status}, "
258 self.disp.log_debug("Closing connection.", title)
259 status = self.return_connection(connection)
260 msg += f"connection = {status}"
261 self.disp.log_debug(msg, title)
262
263 def run_and_commit(self, query: str, cursor: Union[mysql.connector.cursor.MySQLCursor, None] = None) -> int:
264 """
265 Executes a query and commits changes.
266
267 Args:
268 cursor (mysql.connector.cursor.MySQLCursor): The active cursor.
269 query (str): The query to execute.
270 """
271 title = "run_and_commit"
272 self.disp.log_debug("Running and committing sql query.", title)
273 if cursor is None:
274 self.disp.log_debug("No cursor found, generating one.", title)
275 connection = self.get_connection()
276 if connection is None:
277 self.disp.log_critical(SCONST.CONNECTION_FAILED, title)
278 return self.error
279 internal_cursor = self.get_cursor(connection)
280 if internal_cursor is None:
281 self.disp.log_critical(SCONST.CURSOR_FAILED, title)
282 return self.error
283 else:
284 self.disp.log_debug("Cursor found, using it.", title)
285 internal_cursor = cursor
286 try:
287 self.disp.log_debug(f"Executing query: {query}.", title)
288 internal_cursor.execute(query)
289 self.disp.log_debug("Committing content.", title)
290 internal_cursor._connection.commit()
291 if cursor is None:
292 self.disp.log_debug(
293 "The cursor was generated by us, releasing.", title
294 )
295 self.release_connection_and_cursor(connection, internal_cursor)
296 else:
297 self.disp.log_debug(
298 "The cursor was provided, not releasing.", title
299 )
300 return self.success
301 except mysql.connector.errors.ProgrammingError as pe:
302 msg = "ProgrammingError: Failed to execute the query."
303 msg += f" Original error: {str(pe)}"
304 self.disp.log_error(msg, title)
305 if cursor is None:
306 self.disp.log_debug(
307 "The cursor was generated by us, releasing.", title
308 )
309 self.release_connection_and_cursor(connection, internal_cursor)
310 else:
311 self.disp.log_debug(
312 "The cursor was provided, not releasing.", title
313 )
314 raise RuntimeError(msg) from pe
315 except mysql.connector.errors.IntegrityError as ie:
316 msg = "IntegrityError: Integrity constraint issue occurred during query execution."
317 msg += f" Original error: {str(ie)}"
318 self.disp.log_error(msg, title)
319 if cursor is None:
320 self.disp.log_debug(
321 "The cursor was generated by us, releasing.", title
322 )
323 self.release_connection_and_cursor(connection, internal_cursor)
324 else:
325 self.disp.log_debug(
326 "The cursor was provided, not releasing.", title
327 )
328 raise RuntimeError(msg) from ie
329 except mysql.connector.errors.OperationalError as oe:
330 msg = "OperationalError: Operational error occurred during query execution."
331 msg += f" Original error: {str(oe)}"
332 self.disp.log_error(msg, title)
333 if cursor is None:
334 self.disp.log_debug(
335 "The cursor was generated by us, releasing.", title
336 )
337 self.release_connection_and_cursor(connection, internal_cursor)
338 else:
339 self.disp.log_debug(
340 "The cursor was provided, not releasing.", title
341 )
342 raise RuntimeError(msg) from oe
343 except mysql.connector.Error as e:
344 msg = "MySQL Error: An unexpected error occurred during query execution."
345 msg += f" Original error: {str(e)}"
346 self.disp.log_error(msg, title)
347 if cursor is None:
348 self.disp.log_debug(
349 "The cursor was generated by us, releasing.", title
350 )
351 self.release_connection_and_cursor(connection, internal_cursor)
352 else:
353 self.disp.log_debug(
354 "The cursor was provided, not releasing.", title
355 )
356 raise RuntimeError(msg) from e
357
358 def run_and_fetch_all(self, query: str, cursor: Union[mysql.connector.cursor.MySQLCursor, None] = None) -> Union[int, Any]:
359 """
360 Executes a query and fetches all results.
361
362 Args:
363 cursor (mysql.connector.cursor.MySQLCursor): The active cursor.
364 query (str): The query to execute.
365 """
366 title = "run_and_fetchall"
367 if cursor is None:
368 connection = self.get_connection()
369 if connection is None:
370 self.disp.log_critical(SCONST.CONNECTION_FAILED, title)
371 return self.error
372 internal_cursor = self.get_cursor(connection)
373 if internal_cursor is None:
374 self.disp.log_critical(SCONST.CURSOR_FAILED, title)
375 return self.error
376 else:
377 internal_cursor = cursor
378 try:
379 self.disp.log_debug(f"Executing query: {query}.", title)
380 internal_cursor.execute(query)
381 if internal_cursor is None or internal_cursor.description is None:
382 self.disp.log_error(
383 "Failed to gather data from the table, cursor is invalid.", title
384 )
385 if cursor is None:
386 self.disp.log_debug(
387 "The cursor was generated by us, releasing.", title
388 )
390 connection, internal_cursor
391 )
392 else:
393 self.disp.log_debug(
394 "The cursor was provided, not releasing.", title
395 )
396 return self.error
397 self.disp.log_debug(
398 "Storing a copy of the content of the cursor.", title
399 )
400 raw_data = internal_cursor.fetchall()
401 self.disp.log_debug(f"Raw gathered data {raw_data}", title)
402 data = raw_data.copy()
403 self.disp.log_debug(f"Data gathered: {data}.", title)
404 if cursor is None:
405 self.disp.log_debug(
406 "The cursor was generated by us, releasing.", title
407 )
408 self.release_connection_and_cursor(connection, internal_cursor)
409 else:
410 self.disp.log_debug(
411 "The cursor was provided, not releasing.", title
412 )
413 return data
414 except mysql.connector.errors.ProgrammingError as pe:
415 msg = "ProgrammingError: Failed to execute the query."
416 msg += f" Original error: {str(pe)}"
417 self.disp.log_error(msg, title)
418 if cursor is None:
419 self.disp.log_debug(
420 "The cursor was generated by us, releasing.", title
421 )
422 self.release_connection_and_cursor(connection, internal_cursor)
423 else:
424 self.disp.log_debug(
425 "The cursor was provided, not releasing.", title
426 )
427 raise RuntimeError(msg) from pe
428 except mysql.connector.errors.IntegrityError as ie:
429 msg = "IntegrityError: Integrity constraint issue occurred during query execution."
430 msg += f" Original error: {str(ie)}"
431 self.disp.log_error(msg, title)
432 if cursor is None:
433 self.disp.log_debug(
434 "The cursor was generated by us, releasing.", title
435 )
436 self.release_connection_and_cursor(connection, internal_cursor)
437 else:
438 self.disp.log_debug(
439 "The cursor was provided, not releasing.", title
440 )
441 raise RuntimeError(msg) from ie
442 except mysql.connector.errors.OperationalError as oe:
443 msg = "OperationalError: Operational error occurred during query execution."
444 msg += f" Original error: {str(oe)}"
445 self.disp.log_error(msg, title)
446 if cursor is None:
447 self.disp.log_debug(
448 "The cursor was generated by us, releasing.", title
449 )
450 self.release_connection_and_cursor(connection, internal_cursor)
451 else:
452 self.disp.log_debug(
453 "The cursor was provided, not releasing.", title
454 )
455 raise RuntimeError(msg) from oe
456 except mysql.connector.Error as e:
457 msg = "MySQL Error: An unexpected error occurred during query execution."
458 msg += f" Original error: {str(e)}"
459 self.disp.log_error(msg, title)
460 if cursor is None:
461 self.disp.log_debug(
462 "The cursor was generated by us, releasing.", title
463 )
464 self.release_connection_and_cursor(connection, internal_cursor)
465 else:
466 self.disp.log_debug(
467 "The cursor was provided, not releasing.", title
468 )
469 raise RuntimeError(msg) from e
470
471 def run_editing_command(self, sql_query: str, table: str, action_type: str = "update") -> int:
472 """_summary_
473 Function in charge of running the execute and making sure that the connection to the database is still valid.
474
475 Args:
476 command (str): _description_
477
478 Returns:
479 int: _description_
480 """
481 title = "_run_editing_command"
482 try:
483 resp = self.run_and_commit(query=sql_query)
484 if resp != self.success:
485 self.disp.log_error(
486 f"Failed to {action_type} data in '{table}'.", title
487 )
488 return self.error
489 self.disp.log_debug("command ran successfully.", title)
490 return self.success
491 except mysql.connector.Error as e:
492 self.disp.log_error(
493 f"Failed to {action_type} data in '{table}': {str(e)}", title
494 )
495 return self.error
496
497 def __del__(self) -> None:
498 """_summary_
499 Destructor
500 """
501 self.destroy_pool()
502
503 def is_pool_active(self) -> bool:
504 """_summary_
505 Check if the connection pool is active.
506
507 Returns:
508 bool: _description_
509 """
510 title = "is_pool_active"
511 self.disp.log_debug("Checking if the connection is active.", title)
512 resp = self.poolpool is not None
513 if resp:
514 self.disp.log_debug("The connection is active.", title)
515 return True
516 self.disp.log_error("The connection is not active.", title)
517 return False
518
519 def is_connection_active(self, connection: mysql.connector.pooling.PooledMySQLConnection) -> bool:
520 """
521 Checks if the connection is active.
522
523 Args:
524 connection (mysql.connector.pooling.PooledMySQLConnection): The connection to check.
525
526 Returns:
527 bool: True if the connection is active, False otherwise.
528 """
529 title = "is_connection_active"
530 self.disp.log_debug(
531 "Checking if the connection is active.", title
532 )
533 try:
534 if connection:
535 connection.ping(reconnect=False)
536 self.disp.log_debug("The connection is active.", title)
537 return True
538 except (mysql.connector.Error, mysql.connector.errors.Error):
539 self.disp.log_error("The connection is not active.", title)
540 return False
541 self.disp.log_error("The connection is not active.", title)
542 return False
543
544 def is_cursor_active(self, cursor: mysql.connector.cursor.MySQLCursor) -> bool:
545 """
546 Checks if the cursor is active.
547
548 Args:
549 cursor (mysql.connector.cursor.MySQLCursor): The cursor to check.
550
551 Returns:
552 bool: True if the cursor is active, False otherwise.
553 """
554 title = "is_cursor_active"
555 self.disp.log_debug(
556 "Checking if the provided cursor is active.", title
557 )
558 self.disp.log_debug(f"Content of the cursor: {dir(cursor)}.", title)
559 resp = cursor is not None and cursor._connection is not None
560 if resp:
561 self.disp.log_debug("The cursor is active.", title)
562 return True
563 self.disp.log_error("The cursor is not active.", title)
564 return False
int close_cursor(self, mysql.connector.cursor.MySQLCursor cursor)
bool is_connection_active(self, mysql.connector.pooling.PooledMySQLConnection connection)
None release_connection_and_cursor(self, Union[mysql.connector.pooling.PooledMySQLConnection, None] connection, Union[mysql.connector.pooling.PooledMySQLConnection, None] cursor=None)
Union[int, Any] run_and_fetch_all(self, str query, Union[mysql.connector.cursor.MySQLCursor, None] cursor=None)
None show_connection_info(self, str func_name="show_connection_info")
bool is_cursor_active(self, mysql.connector.cursor.MySQLCursor cursor)
mysql.connector.pooling.PooledMySQLConnection get_connection(self)
None __init__(self, str url, int port, str username, str password, str db_name, int success=0, int error=84, bool debug=False)
int run_and_commit(self, str query, Union[mysql.connector.cursor.MySQLCursor, None] cursor=None)
mysql.connector.cursor.MySQLCursor get_cursor(self, mysql.connector.pooling.PooledMySQLConnection connection)
int return_connection(self, mysql.connector.pooling.PooledMySQLConnection connection)
int run_editing_command(self, str sql_query, str table, str action_type="update")