2 File in charge of cleaning and sanitising sql queries before they are submitted to the database.
5from typing
import List, Dict, Any, Union
7from display_tty
import Disp, TOML_CONF, SAVE_TO_FILE, FILE_NAME
9from .
import sql_constants
as SCONST
10from .sql_time_manipulation
import SQLTimeManipulation
17 def __init__(self, success: int = 0, error: int = 84, debug: bool =
False) ->
None:
19 This is the class that contains functions in charge of sanitising sql queries.
22 debug (bool, optional): _description_. Defaults to False.: enable debug mode
33 logger=self.__class__.__name__
45 This is a function in charge of cleaning by nullifying (escaping) characters that could cause the sql command to break.
48 cells (str): _description_: The cell to be checked
51 str: _description_: A (hopfully) clean string.
55 if char
in (
"'",
'"',
"\\",
'\0',
"\r"):
57 f
"Escaped character '{char}' in '{cell}'.",
67 Escape the risky column names.
70 columns (List[str]): _description_
73 List[str]: _description_
75 title =
"_escape_risky_column_names"
76 self.
disp.log_debug(
"Escaping risky column names.", title)
77 if isinstance(columns, str):
81 for index, item
in enumerate(data):
83 key, value = item.split(
"=", maxsplit=1)
84 self.
disp.log_debug(f
"key = {key}, value = {value}", title)
86 self.
disp.log_warning(
87 f
"Escaping risky column name '{key}'.",
88 "_escape_risky_column_names"
90 data[index] = f
"`{key}`={value}"
92 self.
disp.log_warning(
93 f
"Escaping risky column name '{item}'.",
94 "_escape_risky_column_names"
96 data[index] = f
"`{item}`"
99 self.
disp.log_debug(
"Escaped risky column names.", title)
100 if isinstance(columns, str):
105 """Ensures the value is safely passed as a string in an SQL query.
106 It wraps the value in single quotes and escapes any single quotes within the value.
109 value (str): The value that needs to be protected.
112 str: The value with protection applied, safe for SQL queries.
114 title =
"_protect_value"
115 self.
disp.log_debug(f
"protecting value: {value}", title)
117 self.
disp.log_debug(
"Value is none, thus returning NULL", title)
120 if isinstance(value, str)
is False:
121 self.
disp.log_debug(
"Value is not a string, converting", title)
125 self.
disp.log_debug(
"Value is empty, returning ''", title)
128 if value[0] ==
'`' and value[-1] ==
'`':
130 "string has special backtics, skipping.", title
136 "Value already has a single quote at the start, removing", title
141 "Value already has a single quote at the end, removing", title
146 f
"Value before quote escaping: {value}", title
148 protected_value = value.replace(
"'",
"''")
150 f
"Value after quote escaping: {protected_value}", title
153 protected_value = f
"'{protected_value}'"
155 f
"Value after being converted to a string: {protected_value}.",
158 return protected_value
162 Escape the risky column names in where mode, except for those in keyword_logic_gates.
165 columns (Union[str, List[str]]): Column names to be processed.
168 Union[List[str], str]: Processed column names with risky ones escaped.
170 title =
"_escape_risky_column_names_where_mode"
172 "Escaping risky column names in where mode.", title
175 if isinstance(columns, str):
180 for index, item
in enumerate(data):
182 key, value = item.split(
"=", maxsplit=1)
183 self.
disp.log_debug(f
"key = {key}, value = {value}", title)
187 self.
disp.log_warning(
188 f
"Escaping risky column name '{key}'.", title
190 data[index] = f
"`{key}`={protected_value}"
192 data[index] = f
"{key}={protected_value}"
195 self.
disp.log_warning(
196 f
"Escaping risky column name '{item}'.",
200 data[index] = protected_value
202 self.
disp.log_debug(
"Escaped risky column names in where mode.", title)
204 if isinstance(columns, str):
210 Check if the cell is a string or a number.
213 cell (str): _description_
218 if isinstance(cell, (str, float))
is True:
220 if isinstance(cell, str)
is False:
221 msg =
"The expected type of the input is a string,"
222 msg += f
"but got {type(cell)}"
223 self.
disp.log_error(msg,
"_check_sql_cell")
227 if tmp
in (
"now",
"now()"):
229 elif tmp
in (
"current_date",
"current_date()"):
233 if ";base" not in tmp:
234 self.
disp.log_debug(f
"result = {tmp}",
"_check_sql_cell")
235 return f
"\"{str(tmp)}\""
237 def beautify_table(self, column_names: List[str], table_content: List[List[Any]]) -> Union[List[Dict[str, Any]], int]:
239 Convert the table to an easier version for navigating.
242 column_names (List[str]): _description_
243 table_content (List[List[Any]]): _description_
246 Union[List[Dict[str, Any]], int]: _description_: the formated content or self.error if an error occured.
248 data: List[Dict[str, Any]] = []
250 if len(column_names) == 0:
252 "There are not provided table column names.",
256 if len(table_content) == 0:
258 "There is no table content.",
262 column_length = len(column_names)
263 for i
in table_content:
265 if cell_length != column_length:
266 self.
disp.log_warning(
267 "Table content and column lengths do not correspond.",
271 for index, items
in enumerate(column_names):
272 if index == cell_length:
273 self.
disp.log_warning(
274 "Skipping the rest of the tuple because it is shorter than the column names.",
278 data[v_index][items[0]] = i[index]
280 self.
disp.log_debug(f
"beautified_table = {data}",
"_beautify_table")
285 Compile the line required for an sql update to work.
288 line (List): _description_
289 column (List): _description_
290 column_length (int): _description_
295 title =
"compile_update_line"
297 self.
disp.log_debug(
"Compiling update line.", title)
298 for i
in range(0, column_length):
300 final_line += f
"{column[i]} = {cell_content}"
301 if i < column_length - 1:
303 if i == column_length:
305 self.
disp.log_debug(f
"line = {final_line}", title)
308 def process_sql_line(self, line: List[str], column: List[str], column_length: int = (-1)) -> str:
310 Convert a List of strings to an sql line so that it can be inserted into a table.
313 line (List[str]): _description_
318 if column_length == -1:
319 column_length = len(column)
320 line_length = len(line)
323 if self.
debug is True and ";base" not in str(line):
324 msg = f
"line = {line}"
325 self.
disp.log_debug(msg,
"_process_sql_line")
326 for i
in range(0, column_length):
328 if i < column_length - 1:
330 if i == column_length:
332 msg =
"The line is longer than the number of columns, truncating."
333 self.
disp.log_warning(msg,
"_process_sql_line")
336 if ";base" not in str(line_final):
337 msg = f
"line_final = '{line_final}'"
338 msg += f
", type(line_final) = '{type(line_final)}'"
339 self.
disp.log_debug(msg,
"_process_sql_line")
None __init__(self, int success=0, int error=84, bool debug=False)
str compile_update_line(self, List line, List column, int column_length)
Union[List[Dict[str, Any]], int] beautify_table(self, List[str] column_names, List[List[Any]] table_content)
str process_sql_line(self, List[str] line, List[str] column, int column_length=(-1))
str _protect_value(self, str value)
Union[List[str], str] escape_risky_column_names_where_mode(self, Union[List[str], str] columns)
List[str] keyword_logic_gates
str protect_sql_cell(self, str cell)
SQLTimeManipulation sql_time_manipulation
Union[List[str], str] escape_risky_column_names(self, Union[List[str], str] columns)
str check_sql_cell(self, str cell)