2 File in charge of querying the data from the API
5from typing
import Dict, Any, List, Union
9from requests
import Response
10from display_tty
import Disp, TOML_CONF, FILE_DESCRIPTOR, SAVE_TO_FILE, FILE_NAME
12from .secrets
import Secrets
13from .variables
import Variables
14from .logger
import ActionLogger
15from .
import constants
as ACONST
16from .query_boilerplate
import QueryEndpoint
18from ..components
import constants
as CONST
19from ..components.runtime_data
import RuntimeData
26 def __init__(self, service: Dict[str, Any], variable: Variables, scope: str, runtime_data: RuntimeData, logger: ActionLogger, action_id: int, error: int = 84, success: int = 0, debug: bool =
False) ->
None:
28 Class in charge of querying the data from the API
31 service (Dict[str, Any]): _description_
32 variable (Variables): _description_: The class variable in charge of tracking the variables for the runtime.
33 scope (str): _description_: The scope of the trigger.
34 runtime_data (RuntimeData): _description_
35 logger (ActionLogger): _description_: The class logger in charge of logging the actions.
36 action_id (int): _description_: The action ID to log.
37 error (int, optional): _description_. Defaults to 84.
38 success (int, optional): _description_. Defaults to 0.
39 debug (bool, optional): _description_. Defaults to False.
58 logger=self.__class__.__name__
60 self.
disp.log_debug(f
"service: {self.service}", title)
73 self.
disp.log_debug(f
"API info: {self.api_info}", title)
75 def _log_fatal(self, title: str, msg, action_id: int, raise_item: bool =
False, raise_func: object = ValueError) -> int:
77 A function that will log a provided fatal error.
80 title (str): _description_: the title of the function
81 msg (str): _description_: The message to log
82 raise_item (bool, optional): _description_. Inform if the logger should raise or just return an error. Defaults to False.
83 raise_func (object, optional): _description_. The function to raise if required. Defaults to ValueError.
86 ValueError: _description_: One of the possible errors to raise.
89 int: _description_: Will return self.error if raise_item is False
91 self.
disp.log_error(msg, title)
93 ACONST.TYPE_SERVICE_TRIGGER,
98 if raise_item
is True:
105 A function that will strip the descriptor of the given node.
108 descriptor (str): _description_: The descriptor to strip.
111 str: _description_: The stripped descriptor.
113 title =
"strip_descriptor"
114 self.
disp.log_debug(f
"Node: {node}", title)
115 node = node.split(
':')[1]
116 self.
disp.log_debug(f
"Node (stripped): {node}", title)
121 Get the API information
124 api_id (int): _description_
127 Dict[str, Any]: _description_
129 title =
"get_api_info"
133 msg=
"The API ID is None",
136 raise_func=ValueError
138 data = self.
runtime_data.database_link.get_data_from_table(
139 table=CONST.TAB_SERVICES,
141 where=f
"id='{api_id}'",
144 if isinstance(data, int)
is True or len(data) == 0:
145 msg =
"Failed to get the API information for"
146 msg += f
" the API ID: {api_id}"
152 raise_func=ValueError
154 self.
disp.log_debug(f
"Gathered API info: {data[0]}", title)
162 api_info (Dict[str, Any]): _description_
164 title =
"update_secrets"
168 msg=
"The API info is None",
171 raise_func=ValueError
174 oauth = api_info.get(
"oauth")
175 if str(oauth) ==
"0":
176 node = api_info.get(
"api_key")
177 elif str(oauth) ==
"1":
179 service_id = api_info.get(
"id")
180 if service_id
is None or usr_id
is None:
183 msg=
"The service ID or user ID is None",
186 raise_func=ValueError
188 data: Dict[str, Any] = self.
runtime_data.database_link.get_data_from_table(
189 table=CONST.TAB_ACTIVE_OAUTHS,
192 f
"user_id='{usr_id}'",
193 f
"service_id='{service_id}'"
197 node = data.get(
"token")
201 msg=
"The oauth token is not valid",
204 raise_func=ValueError
209 msg=
"The token is None",
212 raise_func=ValueError
218 Sanitize the parameter value
221 url_param (str): _description_
226 title =
"sanitize_parameter_value"
227 self.
disp.log_debug(f
"url_param: {url_param}", title)
228 if url_param
is None:
230 url_param = urllib.parse.quote(url_param)
231 self.
disp.log_debug(f
"url_param (sanitised): {url_param}", title)
236 Get the variable name
239 var (str): _description_
244 title =
"get_variable_name"
245 self.
disp.log_debug(f
"var: {var}", title)
253 self.
disp.log_debug(f
"result: {result}", title)
258 Get the content of a special variable
261 var_name (str): _description_
266 title =
"get_special_content"
267 self.
disp.log_debug(f
"var_name: {var_name}", title)
269 lvar_name = var_name.lower()
270 if lvar_name
in (
"secrets.token",
"secret.token",
"token"):
271 node = self.
secrets.get_token()
272 if lvar_name
in (
"secrets.bearer",
"secret.bearer",
"bearer"):
273 node = self.
secrets.get_bearer()
274 if lvar_name
in ACONST.SECRETS_EQUIVALENCE:
275 msg = f
"lvar_name: {lvar_name} found in "
276 msg +=
"ACONST.SECRETS_EQUIVALENCE"
277 self.
disp.log_debug(msg, title)
278 node = ACONST.SECRETS_EQUIVALENCE[lvar_name]()
279 self.
disp.log_debug(f
"node: {node}", title)
284 Get the content of a normal variable
287 var_name (str): _description_
292 title =
"get_normal_content"
293 self.
disp.log_debug(f
"var_name: {var_name}", title)
294 if self.
variable.has_variable(var_name, self.
scope)
is False:
295 self.
disp.log_debug(f
"Variable {var_name} not found", title)
298 self.
disp.log_debug(f
"{var_name}: {data}", title)
306 var (str): _description_
311 title =
"check_special_vars"
312 if var
is None or var ==
"":
313 self.
disp.log_debug(
"var is None or empty", title)
315 var_list = var.split(
"$ref")
316 self.
disp.log_debug(f
"var_list: {var_list}", title)
317 for index, item
in enumerate(var_list):
323 item_new = f
"{var_content}{item[len(var_name) + 2:]}"
324 self.
disp.log_debug(f
"item_new: {item_new}", title)
325 var_list[index] = item_new
326 data =
"".join(var_list)
327 self.
disp.log_debug(f
"data: {data}", title)
332 Check the normal variables
335 var (str): _description_
340 title =
"check_normal_vars"
341 if var
is None or var ==
"":
342 self.
disp.log_debug(
"var is None or empty", title)
344 var_list = var.split(
"${")
345 self.
disp.log_debug(f
"var_list: {var_list}", title)
346 for index, item
in enumerate(var_list):
352 item_new = f
"{var_content}{item[len(var_name) + 3:]}"
353 self.
disp.log_debug(f
"item_new: {item_new}", title)
354 var_list[index] = item_new
355 data =
"".join(var_list)
356 self.
disp.log_debug(f
"data: {data}", title)
361 Compile the URL parameters
364 url_params (Dict[str, Any]): _description_
369 title =
"compile_url_parameters"
374 self.
disp.log_debug(f
"url_params: {url_params}", title)
375 for key, value
in url_params.items():
376 if key
in (
"input:additional_params",
"ignore:additional_params"):
380 if isinstance(value, Dict):
381 for key2, value2
in value.items():
385 elif isinstance(value, List):
390 if i.startswith(
"selected:"):
395 if i.startswith(
"default:"):
405 self.
disp.log_debug(f
"url: {url}", title)
406 url += f
"{key_stripped}={self.sanitize_parameter_value(value)}"
408 self.
disp.log_debug(f
"url: {url}", title)
413 Process the extra headers
416 header (Dict[str, Any]): _description_
419 Dict[str, Any]: _description_
421 title =
"process_extra_headers"
422 if header
is None or header ==
"":
424 self.
disp.log_debug(f
"header: {header}", title)
426 header: Dict[str, Any] = json.loads(header)
427 except json.JSONDecodeError
as e:
428 self.
disp.log_error(f
"Error processing header: {e}", title)
430 self.
disp.log_debug(f
"header: {header}", title)
432 for key, value
in header.items():
436 self.
disp.log_debug(f
"key: {key_stripped}, value: {value}", title)
437 result[key_stripped] = value
438 self.
disp.log_debug(f
"header: {result}", title)
446 header (Dict[str, Any]): _description_
449 Dict[str, Any]: _description_
451 title =
"process_headers"
453 self.
disp.log_debug(
"header: None", title)
455 self.
disp.log_debug(f
"header: {header}", title)
459 for key, value
in header.items():
460 if key
in (
"input:additional_header",
"ignore:additional_header"):
463 if isinstance(value, Dict):
464 for key2, value2
in value.items():
468 elif isinstance(value, List):
473 if i.startswith(
"selected:"):
476 if i.startswith(
"default:"):
478 if default
is not None:
480 if selected
is not None:
486 self.
disp.log_debug(f
"key: {key_stripped}, value: {value}", title)
487 result[key_stripped] = value
488 self.
disp.log_debug(f
"header: {result}", title)
493 Process the extra body
496 body (str): _description_
499 Dict[str, Any]: _description_
501 title =
"process_extra_body"
502 if body
is None or body ==
"":
504 self.
disp.log_debug(f
"body: {body}", title)
506 body: Dict[str, Any] = json.loads(body)
507 except json.JSONDecodeError
as e:
508 self.
disp.log_error(f
"Error processing body: {e}", title)
510 self.
disp.log_debug(f
"body: {body}", title)
512 for key, value
in body.items():
516 self.
disp.log_debug(f
"key: {key_stripped}, value: {value}", title)
517 result[key_stripped] = value
518 self.
disp.log_debug(f
"body: {result}", title)
526 body (Dict[str, Any]): _description_
529 Dict[str, Any]: _description_
531 title =
"process_body"
533 self.
disp.log_debug(
"body: None", title)
538 for key, value
in body.items():
539 if key
in (
"input:additional_body",
"ignore:additional_body"):
542 if isinstance(value, Dict):
543 for key2, value2
in value.items():
547 elif isinstance(value, List):
552 if i.startswith(
"selected:"):
555 if i.startswith(
"default:"):
557 if default
is not None:
559 if selected
is not None:
565 self.
disp.log_debug(f
"key: {key_stripped}, value: {value}", title)
566 result[key_stripped] = value
567 self.
disp.log_debug(f
"body: {result}", title)
575 method (str): _description_
585 if i.startswith(
"selected:"):
588 if i.startswith(
"default:"):
590 if selected
is not None:
592 if default
is not None:
598 Extract the code code
601 code (Dict[str, Any]): _description_
606 title =
"extract_response_code"
607 default_response: int = 200
608 node: str = code.get(
"int:code")
610 self.
disp.log_debug(f
"code: {code}", title)
611 return default_response
612 if isinstance(node, int)
is True:
613 self.
disp.log_debug(f
"code: {node}", title)
615 if isinstance(node, str)
is True:
616 if node.startswith(
"default:"):
617 self.
disp.log_debug(f
"code: {node}", title)
619 if node.startswith(
"selected:"):
620 self.
disp.log_debug(f
"code: {node}", title)
624 self.
disp.log_debug(f
"code: {node}", title)
626 except ValueError
as e:
627 self.
disp.log_error(f
"code: {node}, err: {e}", title)
628 return default_response
629 if isinstance(node, List)
is False:
630 self.
disp.log_error(f
"code: {node}", title)
634 if i.startswith(
"default:"):
636 if i.startswith(
"selected:"):
638 if node2
is not None:
639 self.
disp.log_debug(f
"code: {node2}", title)
641 if node1
is not None:
642 self.
disp.log_debug(f
"code: {node1}", title)
644 return default_response
648 Check the response code
651 response (Response): _description_
652 expected_response (int): _description_
654 title =
"check_response_code"
658 msg=
"Response is None",
661 raise_func=ValueError
663 if response.status_code != expected_response:
664 msg = f
"Expected response code: {expected_response},"
665 msg += f
" got: {response.status_code}"
671 raise_func=ValueError
674 def compile_url(self, url_extra: str, url_params: Dict[str, Any]) -> str:
679 url_extra (str): _description_
680 url_params (Dict[str, Any]): _description_
686 if url_extra
is not None and len(url_extra) > 0:
687 if url_extra[0] ==
"/":
688 url_extra = url_extra[1:]
690 if url_params
is not None and len(url_params) > 0:
698 Replaces placeholders in the URL with values from the given dictionary.
701 url_extra: The URL containing placeholders in the format {param}.
702 url_body_params: A dictionary containing the parameters and their values.
705 The URL with the placeholders replaced by their corresponding values.
707 If url_body_params is None, the function returns url_extra unchanged.
709 if url_body_params
is None:
712 for key, value
in url_body_params.items():
714 placeholder = f
"{{{real_key}}}"
715 url_extra = url_extra.replace(placeholder, str(value))
723 Response: _description_
726 self.
disp.log_debug(
"In query", title)
727 self.
disp.log_debug(f
"API info: {self.api_info}", title)
728 self.
disp.log_debug(f
"Service info: {self.service}", title)
729 self.
disp.log_debug(
"Processing url", title)
732 url_extra = (self.
service.get(
"input:url_extra")
or self.
service.get(
"ignore:url_extra"))
733 url_body_params = (self.
service.get(
"url_body_params")
or self.
service.get(
"ignore:url_body_params"))
734 url_query_params = (self.
service.get(
"url_params")
or self.
service.get(
"ignore:url_params"))
736 f
"url_extra: {url_extra}, url_body_params: {url_body_params}, url_query_params: {url_query_params}", title
739 self.
disp.log_debug(
"Processing body", title)
740 body = (self.
service.get(
"body")
or self.
service.get(
"ignore:body"))
744 self.
disp.log_debug(
"Processing method", title)
745 method = (self.
service.get(
"drop:method")
or self.
service.get(
"ignore:method"))
746 if isinstance(method, List):
749 self.
disp.log_debug(
"Processing headers", title)
750 headers = (self.
service.get(
"header")
or self.
service.get(
"ignore:header"))
751 if headers
is not None:
753 self.
disp.log_debug(
"Compiling url", title)
756 if url_base[-1] !=
"/":
758 msg =
"Processed data: '"
759 msg += f
"url_extra: {url_extra}, url_query_params: {url_query_params},"
760 msg += f
" body: {body}, method: {method}, headers: {headers},"
761 msg += f
" url: {url}, url_base: {url_base}'"
762 self.
disp.log_debug(msg, title)
765 expected_response: Union[Dict[str, Any],
None] = (self.
service.get(
"response")
or self.
service.get(
"ignore:response"))
766 if expected_response
is not None:
771 msg=
"Response section not found in service.",
774 raise_func=ValueError
776 msg = f
"Expected response: {expected_response},"
777 msg += f
" type: {type(expected_response)}"
778 self.
disp.log_debug(msg, title)
782 delay=CONST.API_REQUEST_DELAY,
785 self.
disp.log_debug(f
"qei = {qei}", title)
791 f
"final_body = {body}, final_headers = {headers}", title
795 response = qei.get_endpoint(url, content=body, header=headers)
796 elif method ==
"POST":
797 response = qei.post_endpoint(url, content=body, header=headers)
798 elif method ==
"PUT":
799 response = qei.put_endpoint(url, content=body, header=headers)
800 elif method ==
"PATCH":
801 response = qei.patch_endpoint(url, content=body, header=headers)
802 elif method ==
"DELETE":
803 response = qei.delete_endpoint(url, content=body, header=headers)
804 elif method ==
"HEAD":
805 response = qei.head_endpoint(url, content=body, header=headers)
806 elif method ==
"OPTIONS":
807 response = qei.options_endpoint(url, content=body, header=headers)
809 msg = f
"Method {method} is not supported"
815 raise_func=ValueError
817 self.
disp.log_debug(f
"Response: {response}", title)
819 f
"Response status_code: {response.status_code}", title
821 self.
disp.log_debug(f
"Expected response: {expected_response}", title)
str compile_url_parameters(self, Dict[str, Any] url_params)
Dict[str, Any] process_headers(self, Dict[str, Any] header)
str strip_descriptor(self, str node)
str get_special_content(self, str var_name)
str get_variable_name(self, str var)
str replace_url_body_params(self, str url_extra, Union[Dict[str, Any], None] url_body_params)
None check_response_code(self, Response response, int expected_response)
str compile_url(self, str url_extra, Dict[str, Any] url_params)
Dict[str, Any] process_extra_headers(self, str header)
str get_normal_content(self, str var_name)
int extract_response_code(self, Dict[str, Any] code)
str check_normal_vars(self, str var)
None update_secrets(self, Dict[str, Any] api_info)
Dict[str, Any] get_api_info(self, int api_id)
str check_special_vars(self, str var)
int _log_fatal(self, str title, msg, int action_id, bool raise_item=False, object raise_func=ValueError)
str sanitize_parameter_value(self, str url_param)
str get_method(self, List[str] method)
Dict[str, Any] process_body(self, Dict[str, Any] body)
Dict[str, Any] process_extra_body(self, str body)
None __init__(self, Dict[str, Any] service, Variables variable, str scope, RuntimeData runtime_data, ActionLogger logger, int action_id, int error=84, int success=0, bool debug=False)