Terarea  2
The automation project
Loading...
Searching...
No Matches
oauth_authentication.py
Go to the documentation of this file.
1"""
2The file that contains all the methods for the OAuth authentication
3"""
4
5import uuid
6from datetime import datetime, timedelta
7from typing import Union, Dict
8import requests
9from fastapi import Response, Request
10from display_tty import Disp, TOML_CONF, FILE_DESCRIPTOR, SAVE_TO_FILE, FILE_NAME
11from . import constants as CONST
12from . import RuntimeData, HCI
13
14
16 """
17 The class that handle the oauth authentication
18 """
19
20 def __init__(self, runtime_data: RuntimeData, success: int = 0, error: int = 84, debug: bool = False) -> None:
21 """_summary_
22 The constructor of the OAuth authentication class
23
24 Args:
25 runtime_data (RuntimeData): _description_: The initialised version of the runtime_data class.
26 success (int, optional): _description_. Defaults to 0.: The status code for success.
27 error (int, optional): _description_. Defaults to 84.: The status code for error.
28 debug (bool, optional): _description_. Defaults to False.: The info on if to activate debug mode.
29 """
30 self.debug: bool = debug
31 self.success: int = success
32 self.error: int = error
33 self.runtime_data_initialised: RuntimeData = runtime_data
34 # --------------------------- logger section ---------------------------
35 self.disp: Disp = Disp(
36 TOML_CONF,
37 SAVE_TO_FILE,
38 FILE_NAME,
39 FILE_DESCRIPTOR,
40 debug=self.debug,
41 logger=self.__class__.__name__
42 )
43
44 def _generate_oauth_authorization_url(self, provider: str) -> Union[int, str]:
45 """
46 Generate an OAuth authorization url depends on the given provider
47 """
48 title = "generate_oauth_authorization_url"
49 retrived_provider = self.runtime_data_initialised.database_link.get_data_from_table(
50 CONST.TAB_USER_OAUTH_CONNECTION,
51 "*",
52 f"provider_name='{provider}'"
53 )
54 self.disp.log_debug(f"Retrived provider: {retrived_provider}", title)
55 if isinstance(retrived_provider, int):
56 self.disp.log_error("Unknown or Unsupported OAuth provider", title)
57 return self.error
58 base_url = retrived_provider[0]["authorisation_base_url"]
59 client_id = retrived_provider[0]["client_id"]
60 scope = retrived_provider[0]["provider_scope"]
61 redirect_uri = CONST.REDIRECT_URI
62 state = str(uuid.uuid4())
63 columns = self.runtime_data_initialised.database_link.get_table_column_names(
64 CONST.TAB_VERIFICATION)
65 self.disp.log_debug(f"Columns list: {columns}", title)
66 if isinstance(columns, int):
67 return self.error
68 columns.pop(0)
69 expiration_time = self.runtime_data_initialised.boilerplate_non_http_initialised.set_lifespan(
70 CONST.EMAIL_VERIFICATION_DELAY)
71 et_str = self.runtime_data_initialised.database_link.datetime_to_string(
72 expiration_time, False)
73 self.disp.log_debug(f"Expiration time: {et_str}", et_str)
74 data: list = []
75 data.append("state")
76 data.append(state)
77 data.append(et_str)
78 if self.runtime_data_initialised.database_link.insert_data_into_table(CONST.TAB_VERIFICATION, data, columns) == self.error:
79 return self.error
80 state += ":"
81 state += provider
82 if provider == "google":
83 url = f"{base_url}?access_type=offline&client_id={client_id}&redirect_uri={redirect_uri}&prompt=consent"
84 else:
85 url = f"{base_url}?client_id={client_id}&redirect_uri={redirect_uri}"
86 url += f"&response_type=code&scope={scope}&state={state}"
87 url = url.replace(" ", "%20")
88 url = url.replace(":", "%3A")
89 url = url.replace("/", "%2F")
90 url = url.replace("?", "%3F")
91 url = url.replace("&", "%26")
92 self.disp.log_debug(f"url = {url}", title)
93 return url
94
95 def _exchange_code_for_token(self, provider: str, code: str):
96 """
97 Exchange the OAuth authorization code for an access token
98 """
99 title = "exchange_code_for_token"
100
101 retrieved_provider = self.runtime_data_initialised.database_link.get_data_from_table(
102 CONST.TAB_USER_OAUTH_CONNECTION,
103 "*",
104 f"provider_name='{provider}'"
105 )
106 if isinstance(retrieved_provider, int):
107 return self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
108 "exchange_code_for_token",
109 "Internal server error.",
110 "Internal server error.",
111 None,
112 True
113 )
114 headers: dict = {}
115 headers["Accept"] = "application/json"
116 headers["Content-Type"] = "application/x-www-form-urlencoded"
117 token_url = retrieved_provider[0]["token_grabber_base_url"]
118
119 data: dict = {}
120 data["client_id"] = retrieved_provider[0]["client_id"]
121 data["client_secret"] = retrieved_provider[0]["client_secret"]
122 data["code"] = code
123 data["redirect_uri"] = CONST.REDIRECT_URI
124 data["grant_type"] = "authorization_code"
125 try:
126 response = requests.post(
127 token_url, data=data, headers=headers, timeout=10
128 )
129 self.disp.log_debug(f"Exchange response = {response}", title)
130 response.raise_for_status()
131 token_response = response.json()
132 if "error" in token_response:
133 msg = "OAuth error: "
134 msg += f"{token_response['error_description']}"
135 self.disp.log_error(msg, title)
136 return self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
137 "exchange_code_for_token",
138 "Failed to get the token",
139 f"{token_response['error']}",
140 None,
141 True
142 )
143 return token_response
144 except requests.RequestException as e:
145 self.disp.log_error(f"RequestException: {str(e)}", title)
146 return self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
147 "exchange_code_for_token",
148 "HTTP request failed.",
149 f"{str(e)}",
150 None,
151 True
152 )
153 except ValueError:
154 self.disp.log_error("Failed to parse response JSON.", title)
155 return self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
156 "exchange_code_for_token",
157 "Invalid JSON response from provider.",
158 "Invalid JSON",
159 None,
160 True
161 )
162
163 def _get_user_info(self, provider: str, access_token: str):
164 """
165 Get a user information depending
166 """
167 title: str = "get_user_info"
168 retrieved_data = self.runtime_data_initialised.database_link.get_data_from_table(
169 CONST.TAB_USER_OAUTH_CONNECTION,
170 "*",
171 f"provider_name='{provider}'"
172 )
173 self.disp.log_debug(f"Retrieved oauth provider data: {retrieved_data}", title)
174 if isinstance(retrieved_data, int):
175 return self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
176 "get_user_info",
177 "Failed to fetch the OAuth provider information.",
178 "Failed to fetch the OAuth provider information.",
179 None,
180 True
181 )
182 headers = {
183 "Authorization": f"Bearer {access_token}"
184 }
185 user_info_url = retrieved_data[0]["user_info_base_url"]
186 self.disp.log_debug(f"User info headers: {headers}", title)
187 self.disp.log_debug(f"User info url: {user_info_url}", title)
188 response = requests.get(user_info_url, headers=headers, timeout=10)
189 if response.status_code != 200:
190 return self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
191 "get_user_info",
192 "Failed to retrieve the user email from the provider",
193 response,
194 None,
195 True
196 )
197 user_info = response.json()
198 self.disp.log_debug(f"User info: {user_info}", title)
199 if provider == "github":
200 for _, info in enumerate(user_info):
201 if info["primary"]:
202 email: dict = {}
203 email["email"] = info["email"]
204 return email
205 return user_info
206
207 def _oauth_user_logger(self, user_info: Dict, provider: str, connection_data: list) -> Response:
208 """
209 The function to insert or update the user information in the database
210 """
211 title: str = "oauth_user_logger"
212 email: str = user_info["email"]
213 retrieved_user = self.runtime_data_initialised.database_link.get_data_from_table(
214 CONST.TAB_ACCOUNTS,
215 "*",
216 f"email='{email}'"
217 )
218 self.disp.log_debug(f"Retrieved user: {retrieved_user}", title)
219 if isinstance(retrieved_user, int) is False:
220 retrieved_provider = self.runtime_data_initialised.database_link.get_data_from_table(
221 CONST.TAB_SERVICES,
222 "*",
223 f"name='{provider}'"
224 )
225 msg = "Retrieved provider: "
226 msg += f"{retrieved_provider}"
227 self.disp.log_debug(msg, title)
228 if isinstance(retrieved_user, int):
229 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
230 connection_data.append(str(retrieved_provider[0]["id"]))
231 connection_data.append(str(retrieved_user[0]["id"]))
232 self.disp.log_debug(f"Connection data: {connection_data}", title)
233
234 provider_id = str(retrieved_provider[0]["id"])
235 user_id = str(retrieved_user[0]["id"])
236 if isinstance(self.runtime_data_initialised.database_link.get_data_from_table(
237 CONST.TAB_ACTIVE_OAUTHS,
238 "*",
239 f"service_id='{provider_id}' AND user_id='{user_id}'"
240 ), int):
241 columns = self.runtime_data_initialised.database_link.get_table_column_names(
242 CONST.TAB_ACTIVE_OAUTHS)
243 if isinstance(columns, int):
244 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
245 columns.pop(0)
246 self.disp.log_debug(f"Columns list = {columns}", title)
247 if self.runtime_data_initialised.database_link.insert_data_into_table(
248 CONST.TAB_ACTIVE_OAUTHS,
249 connection_data,
250 columns
251 ) == self.error:
252 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
253 user_data = self.runtime_data_initialised.boilerplate_incoming_initialised.log_user_in(
254 email
255 )
256 if user_data["status"] == self.error:
257 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
258 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
259 title=title,
260 message="User successfully logged in.",
261 resp="success.",
262 token=user_data["token"],
263 error=False
264 )
265 body["token"] = user_data["token"]
266 return HCI.accepted(
267 body,
268 content_type=CONST.CONTENT_TYPE,
269 headers=self.runtime_data_initialised.json_header
270 )
271 columns = self.runtime_data_initialised.database_link.get_table_column_names(
272 CONST.TAB_ACCOUNTS)
273 if isinstance(columns, int):
274 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
275 columns.pop(0)
276 self.disp.log_debug(f"Columns list = {columns}", title)
277 username: str = email.split('@')[0]
278 user_data: list = []
279 user_data.append(username)
280 user_data.append(email)
281 user_data.append(provider)
282 user_data.append(provider)
283 user_data.append("NULL")
284 user_data.append(str(int(False)))
285 self.disp.log_debug(f"Data list = {user_data}", title)
286 if self.runtime_data_initialised.database_link.insert_data_into_table(CONST.TAB_ACCOUNTS, user_data, columns) == self.error:
287 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
288 retrieved_user = self.runtime_data_initialised.database_link.get_data_from_table(
289 CONST.TAB_ACCOUNTS,
290 "*",
291 f"email='{email}'"
292 )
293 self.disp.log_debug(f"Retrieved user: {retrieved_user}", title)
294 if isinstance(retrieved_user, int):
295 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
296 retrieved_provider = self.runtime_data_initialised.database_link.get_data_from_table(
297 CONST.TAB_SERVICES,
298 "*",
299 f"name='{provider}'"
300 )
301 self.disp.log_debug(f"Retrieved provider: {retrieved_provider}", title)
302 if isinstance(retrieved_user, int):
303 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
304 connection_data.append(str(retrieved_provider[0]["id"]))
305 connection_data.append(str(retrieved_user[0]["id"]))
306 self.disp.log_debug(f"Connection data: {connection_data}", title)
307 columns = self.runtime_data_initialised.database_link.get_table_column_names(
308 CONST.TAB_ACTIVE_OAUTHS)
309 if isinstance(columns, int):
310 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
311 columns.pop(0)
312 self.disp.log_debug(f"Columns list = {columns}", title)
313 if self.runtime_data_initialised.database_link.insert_data_into_table(
314 CONST.TAB_ACTIVE_OAUTHS,
315 connection_data,
316 columns
317 ) == self.error:
318 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
319 user_data = self.runtime_data_initialised.boilerplate_incoming_initialised.log_user_in(
320 email)
321 if user_data["status"] == self.error:
322 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
323 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
324 title=title,
325 message="User successfully logged in.",
326 resp="success.",
327 token=user_data["token"],
328 error=False
329 )
330 body["token"] = user_data["token"]
331 return HCI.accepted(
332 body,
333 content_type=CONST.CONTENT_TYPE,
334 headers=self.runtime_data_initialised.json_header
335 )
336
337 def _handle_token_response(self, token_response: Dict, provider: str) -> Response:
338 """
339 The function that handle the response given by the provider for the oauth token
340 """
341 title = "handle_token_response"
342 data: list = []
343 access_token: str = token_response["access_token"]
344 if not access_token:
345 return self.runtime_data_initialised.boilerplate_responses_initialised.no_access_token(title, None)
346 data.append(access_token)
347 self.disp.log_debug(f"Gotten access token: {access_token}", title)
348 if provider == "github":
349 data.append(
350 self.runtime_data_initialised.database_link.datetime_to_string(
351 datetime.now()
352 )
353 )
354 data.append("0")
355 data.append("NULL")
356 else:
357 expires: int = token_response["expires_in"]
358 if not expires:
359 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
360 title,
361 "The expiration time was not found in the provided response.",
362 "Expiration time not found.",
363 token=access_token,
364 error=True
365 )
366 return HCI.bad_request(
367 body,
368 content_type=CONST.CONTENT_TYPE,
369 headers=self.runtime_data_initialised.json_header
370 )
371 current_time = datetime.now()
372 new_time = current_time + timedelta(seconds=expires)
373 expiration_date = self.runtime_data_initialised.database_link.datetime_to_string(
374 new_time
375 )
376 data.append(expiration_date)
377 data.append(str(expires))
378 refresh_link = token_response["refresh_token"]
379 if not refresh_link:
380 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
381 title,
382 "The refresh link was not found in the provided response.",
383 "Refresh link not found.",
384 token=access_token,
385 error=True
386 )
387 return HCI.bad_request(
388 body,
389 content_type=CONST.CONTENT_TYPE,
390 headers=self.runtime_data_initialised.json_header
391 )
392 data.append(refresh_link)
393 self.disp.log_debug(f"Generated data for new oauth connexion user: {data}", title)
394 user_info = self._get_user_info(provider, access_token)
395 if "error" in user_info:
396 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
397 title,
398 user_info["error"],
399 "internal error",
400 token=access_token,
401 error=True
402 )
403 return HCI.internal_server_error(
404 body,
405 content_type=CONST.CONTENT_TYPE,
406 headers=self.runtime_data_initialised.json_header
407 )
408 return self._oauth_user_logger(user_info, provider, data)
409
410 def refresh_token(self, provider_name: str, refresh_link: str) -> Union[str, None]:
411 """
412 The function that use the given provider name and refresh link to generate a new token for oauth authentication
413 """
414 title: str = "refresh_token"
415 if any(word in provider_name for word in ("google", "discord", "spotify")):
416 retrieved_data = self.runtime_data_initialised.database_link.get_data_from_table(
417 CONST.TAB_USER_OAUTH_CONNECTION,
418 "*",
419 f"provider_name='{provider_name}'"
420 )
421 msg = "Retrieved provider data:"
422 msg += f"{retrieved_data}"
423 self.disp.log_debug(msg, title)
424 if isinstance(retrieved_data, int):
425 self.disp.log_error(
426 "An error has been detected when retrieving the provider data", title
427 )
428 return None
429 token_url: str = retrieved_data[0]["token_grabber_base_url"]
430 generated_data: dict = {}
431 generated_data["client_id"] = retrieved_data[0]["client_id"]
432 generated_data["client_secret"] = retrieved_data[0]["client_secret"]
433 generated_data["refresh_token"] = refresh_link
434 generated_data["grant_type"] = "refresh_token"
435 self.disp.log_debug(f"Generated data: {generated_data}", title)
436 provider_response: Response = requests.post(
437 token_url, data=generated_data, timeout=10
438 )
439 self.disp.log_debug(f"Provider response: {provider_response}", title)
440 if provider_response.status_code == 200:
441 token_response = provider_response.json()
442 msg = "Provider response to json: "
443 msg += f"{token_response}"
444 self.disp.log_debug(msg, title)
445 if "access_token" in token_response:
446 return token_response["access_token"]
447 else:
448 return None
449 self.disp.log_error("The provider is not recognised", title)
450 return None
451
452 async def oauth_callback(self, request: Request) -> Response:
453 """
454 Callback of the OAuth login
455 """
456 title = "oauth_callback"
457 query_params = request.query_params
458 if not query_params:
459 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
460 title=title,
461 message="Query parameters not provided.",
462 resp="no query parameters",
463 token=None,
464 error=True
465 )
466 return HCI.bad_request(
467 body,
468 content_type=CONST.CONTENT_TYPE,
469 headers=self.runtime_data_initialised.json_header
470 )
471 self.disp.log_debug(f"Query params: {query_params}", title)
472 code = query_params.get("code")
473 self.disp.log_debug(f"Code: {code}", title)
474 state = query_params.get("state")
475 self.disp.log_debug(f"State: {state}", title)
476 if not code or not state:
477 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
478 title=title,
479 message="Authorization code or state not provided.",
480 resp="no code or state",
481 token=None,
482 error=True
483 )
484 return HCI.bad_request(
485 body,
486 content_type=CONST.CONTENT_TYPE,
487 headers=self.runtime_data_initialised.json_header
488 )
489 uuid_gotten, provider = state.split(":")
490 if not uuid_gotten or not provider:
491 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
492 title=title,
493 message="The state is in bad format.",
494 resp="bad state format",
495 token=None,
496 error=True
497 )
498 return HCI.bad_request(
499 body,
500 content_type=CONST.CONTENT_TYPE,
501 headers=self.runtime_data_initialised.json_header
502 )
503 self.disp.log_debug(f"Uuid retrived: {uuid_gotten}", title)
504 self.disp.log_debug(f"Provider: {provider}", title)
505 data = self.runtime_data_initialised.database_link.get_data_from_table(
506 CONST.TAB_VERIFICATION,
507 "*",
508 f"definition='{uuid_gotten}'"
509 )
510 self.disp.log_debug(f"Data received: {data}", title)
511 if isinstance(data, int):
512 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
513 if isinstance(self.runtime_data_initialised.database_link.drop_data_from_table(
514 CONST.TAB_VERIFICATION,
515 f"definition='{uuid_gotten}'"
516 ), int) is False:
517 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
518 token_response = self._exchange_code_for_token(provider, code)
519 self.disp.log_debug(f"Token response: {token_response}", title)
520 if "error" in token_response:
521 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
522 title=title,
523 message="Failed to get the token.",
524 resp=token_response["error"],
525 token=None,
526 error=True
527 )
528 return HCI.bad_request(
529 body,
530 content_type=CONST.CONTENT_TYPE,
531 headers=self.runtime_data_initialised.json_header
532 )
533 return self._handle_token_response(token_response, provider)
534
535 async def oauth_login(self, request: Request) -> Response:
536 """
537 Get the authorization url for the OAuth login depending on the provider
538 """
539 title = "oauth_login"
540 request_body = await self.runtime_data_initialised.boilerplate_incoming_initialised.get_body(request)
541 self.disp.log_debug(f"Request body: {request_body}", title)
542 if not request_body or "provider" not in request_body:
543 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
544 title=title,
545 message="The provider is not found in the request.",
546 resp="no provider",
547 token=None,
548 error=True
549 )
550 return HCI.bad_request(
551 body,
552 content_type=CONST.CONTENT_TYPE,
553 headers=self.runtime_data_initialised.json_header
554 )
555 provider = request_body["provider"]
556 self.disp.log_debug(f"Oauth login provider: {provider}", title)
557 authorization_url = self._generate_oauth_authorization_url(provider)
558 self.disp.log_debug(f"Authorization url: {authorization_url}", title)
559 if isinstance(authorization_url, int):
560 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
561 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
562 title=title,
563 message="Authorization url successfully generated.",
564 resp="success",
565 token=None,
566 error=False
567 )
568 body["authorization_url"] = authorization_url
569 return HCI.success(
570 body,
571 content_type=CONST.CONTENT_TYPE,
572 headers=self.runtime_data_initialised.json_header
573 )
574
575 async def add_oauth_provider(self, request: Request, provider: str = "") -> Response:
576 """
577 Add a new oauth provider to the database (only for admin)
578 """
579 title: str = "add_oauth_provider"
580 if not provider:
581 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
582 title=title,
583 message="The provider is not provided.",
584 resp="no provider",
585 token=None,
586 error=True
587 )
588 return HCI.bad_request(
589 body,
590 content_type=CONST.CONTENT_TYPE,
591 headers=self.runtime_data_initialised.json_header
592 )
593 self.disp.log_debug(f"Provider: {provider}", title)
594 token: str = self.runtime_data_initialised.boilerplate_incoming_initialised.get_token_if_present(
595 request
596 )
597 if self.runtime_data_initialised.boilerplate_non_http_initialised.is_token_admin(token) is False:
598 self.disp.log_error("You're not admin.", title)
599 return self.runtime_data_initialised.boilerplate_responses_initialised.insuffisant_rights(title, token)
600 retrived_data = self.runtime_data_initialised.database_link.get_data_from_table(
601 CONST.TAB_USER_OAUTH_CONNECTION,
602 "*",
603 f"provider_name='{provider}'"
604 )
605 if isinstance(retrived_data, int) is False:
606 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
607 title=title,
608 message="The provider already exist in the database.",
609 resp="provider already exist",
610 token=token,
611 error=True
612 )
613 return HCI.conflict(
614 body,
615 content_type=CONST.CONTENT_TYPE,
616 headers=self.runtime_data_initialised.json_header
617 )
618 request_body = await self.runtime_data_initialised.boilerplate_incoming_initialised.get_body(request)
619 if not request_body or not all(key in request_body for key in ("client_id", "client_secret", "provider_scope", "authorisation_base_url", "token_grabber_base_url", "user_info_base_url")):
620 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
621 title=title,
622 message="A variable is missing in the body.",
623 resp="missing variable",
624 token=token,
625 error=True
626 )
627 return HCI.bad_request(
628 body,
629 content_type=CONST.CONTENT_TYPE,
630 headers=self.runtime_data_initialised.json_header
631 )
632 self.disp.log_debug(f"Request body: {request_body}", title)
633 client_id = request_body["client_id"]
634 client_secret = request_body["client_secret"]
635 provider_scope = request_body["provider_scope"]
636 authorisation_base_url = request_body["authorisation_base_url"]
637 token_grabber_base_url = request_body["token_grabber_base_url"]
638 user_info_base_url = request_body["user_info_base_url"]
639 data: list = [
640 provider,
641 client_id,
642 client_secret,
643 provider_scope,
644 authorisation_base_url,
645 token_grabber_base_url,
646 user_info_base_url
647 ]
648 columns = self.runtime_data_initialised.database_link.get_table_column_names(
649 CONST.TAB_USER_OAUTH_CONNECTION
650 )
651 if isinstance(columns, int):
652 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, token)
653 columns.pop(0)
654 self.disp.log_debug(f"Columns: {columns}", title)
655 if self.runtime_data_initialised.database_link.insert_data_into_table(CONST.TAB_USER_OAUTH_CONNECTION, data, columns) == self.error:
656 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, token)
657 oauth: list = ["1"]
658 column: list = ["oauth"]
659 if self.runtime_data_initialised.database_link.update_data_in_table(
660 CONST.TAB_SERVICES,
661 oauth,
662 column,
663 f"name='{provider}'"
664 ) == self.error:
665 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, token)
666 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
667 title=title,
668 message="The provider is successfully added.",
669 resp="success",
670 token=token,
671 error=False
672 )
673 return HCI.success(
674 body,
675 content_type=CONST.CONTENT_TYPE,
676 headers=self.runtime_data_initialised.json_header
677 )
678
679 async def update_oauth_provider_data(self, request: Request, provider: str) -> Response:
680 """
681 The function that modify every value of an oauth provider
682 """
683 title: str = "update_oauth_provider_data"
684 if not provider:
685 return self.runtime_data_initialised.boilerplate_responses_initialised.provider_not_given(title, None)
686 self.disp.log_debug(f"Provider: {provider}", title)
687 token: str = self.runtime_data_initialised.boilerplate_incoming_initialised.get_token_if_present(
688 request
689 )
690 self.disp.log_debug(f"Token received: {token}", title)
691 if self.runtime_data_initialised.boilerplate_non_http_initialised.is_token_admin(token) is False:
692 self.disp.log_error("You're not admin.", title)
693 return self.runtime_data_initialised.boilerplate_responses_initialised.insuffisant_rights(title, token)
694 retrived_data = self.runtime_data_initialised.database_link.get_data_from_table(
695 CONST.TAB_USER_OAUTH_CONNECTION,
696 "*",
697 f"provider_name='{provider}'")
698 if isinstance(retrived_data, int):
699 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
700 request_body = await self.runtime_data_initialised.boilerplate_incoming_initialised.get_body(request)
701 if not request_body or not all(key in request_body for key in ("client_id", "client_secret", "provider_scope", "authorisation_base_url", "token_grabber_base_url", "user_info_base_url")):
702 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
703 title=title,
704 message="A variable is missing in the body.",
705 resp="missing variable",
706 token=token,
707 error=True
708 )
709 return HCI.bad_request(
710 body,
711 content_type=CONST.CONTENT_TYPE,
712 headers=self.runtime_data_initialised.json_header
713 )
714 self.disp.log_debug(f"Request body: {request_body}", title)
715 client_id = request_body["client_id"]
716 client_secret = request_body["client_secret"]
717 provider_scope = request_body["provider_scope"]
718 authorisation_base_url = request_body["authorisation_base_url"]
719 token_grabber_base_url = request_body["token_grabber_base_url"]
720 user_info_base_url = request_body["user_info_base_url"]
721 data: list = [
722 client_id,
723 client_secret,
724 provider_scope,
725 authorisation_base_url,
726 token_grabber_base_url,
727 user_info_base_url
728 ]
729 self.disp.log_debug(f"Generated data: {data}", title)
730 columns: list = self.runtime_data_initialised.database_link.get_table_column_names(
731 CONST.TAB_USER_OAUTH_CONNECTION
732 )
733 if isinstance(columns, int):
734 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
735 columns.pop(0)
736 columns.pop(0)
737 self.disp.log_debug(f"Columns: {columns}", title)
738 if self.runtime_data_initialised.database_link.update_data_in_table(
739 CONST.TAB_USER_OAUTH_CONNECTION,
740 data,
741 columns,
742 f"provider_name='{provider}'"
743 ) == self.error:
744 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, None)
745 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
746 title=title,
747 message="The provider is successfully updated.",
748 resp="success",
749 token=token,
750 error=False
751 )
752 return HCI.success(
753 body,
754 content_type=CONST.CONTENT_TYPE,
755 headers=self.runtime_data_initialised.json_header
756 )
757
758 async def patch_oauth_provider_data(self, request: Request, provider: str) -> Response:
759 """
760 The function that modify every value of an oauth provider
761 """
762 title: str = "update_oauth_provider_data"
763 if not provider:
764 return self.runtime_data_initialised.boilerplate_responses_initialised.provider_not_given(title, None)
765 self.disp.log_debug(f"Provider: {provider}", title)
766 token: str = self.runtime_data_initialised.boilerplate_incoming_initialised.get_token_if_present(
767 request
768 )
769 self.disp.log_debug(f"Token gotten: {token}", title)
770 if self.runtime_data_initialised.boilerplate_non_http_initialised.is_token_admin(token) is False:
771 self.disp.log_error("You're not admin.", title)
772 return self.runtime_data_initialised.boilerplate_responses_initialised.insuffisant_rights(title, token)
773 retrived_data = self.runtime_data_initialised.database_link.get_data_from_table(
774 CONST.TAB_USER_OAUTH_CONNECTION,
775 "*",
776 f"provider_name='{provider}'"
777 )
778 if isinstance(retrived_data, int):
779 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, token)
780 request_body = await self.runtime_data_initialised.boilerplate_incoming_initialised.get_body(request)
781 if not request_body:
782 return self.runtime_data_initialised.boilerplate_responses_initialised.missing_variable_in_body(title, token)
783 self.disp.log_debug(f"Request body: {request_body}", title)
784 if "provider_name" in request_body:
785 if self.runtime_data_initialised.boilerplate_non_http_initialised.update_single_data(
786 CONST.TAB_USER_OAUTH_CONNECTION,
787 "provider_name",
788 "provider_name",
789 provider,
790 request_body
791 ) == self.error:
792 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, token)
793 if "client_id" in request_body:
794 if self.runtime_data_initialised.boilerplate_non_http_initialised.update_single_data(
795 CONST.TAB_USER_OAUTH_CONNECTION,
796 "provider_name",
797 "client_id",
798 provider,
799 request_body
800 ) == self.error:
801 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, token)
802 if "client_secret" in request_body:
803 if self.runtime_data_initialised.boilerplate_non_http_initialised.update_single_data(
804 CONST.TAB_USER_OAUTH_CONNECTION,
805 "provider_name",
806 "client_secret",
807 provider,
808 request_body
809 ) == self.error:
810 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, token)
811 if "provider_scope" in request_body:
812 if self.runtime_data_initialised.boilerplate_non_http_initialised.update_single_data(
813 CONST.TAB_USER_OAUTH_CONNECTION,
814 "provider_name",
815 "provider_scope",
816 provider,
817 request_body
818 ) == self.error:
819 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, token)
820 if "authorisation_base_url" in request_body:
821 if self.runtime_data_initialised.boilerplate_non_http_initialised.update_single_data(
822 CONST.TAB_USER_OAUTH_CONNECTION,
823 "provider_name",
824 "authorisation_base_url",
825 provider,
826 request_body
827 ) == self.error:
828 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, token)
829 if "token_grabber_base_url" in request_body:
830 if self.runtime_data_initialised.boilerplate_non_http_initialised.update_single_data(
831 CONST.TAB_USER_OAUTH_CONNECTION,
832 "provider_name",
833 "token_grabber_base_url",
834 provider,
835 request_body
836 ) == self.error:
837 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, token)
838 if "user_info_base_url" in request_body:
839 if self.runtime_data_initialised.boilerplate_non_http_initialised.update_single_data(
840 CONST.TAB_USER_OAUTH_CONNECTION,
841 "provider_name",
842 "user_info_base_url",
843 provider,
844 request_body
845 ) == self.error:
846 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(title, token)
847 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
848 title=title,
849 message="The provider is successfully updated.",
850 resp="success",
851 token=token,
852 error=False
853 )
854 return HCI.success(body, content_type=CONST.CONTENT_TYPE, headers=self.runtime_data_initialised.json_header)
855
856 async def delete_oauth_provider(self, request: Request, provider: str) -> Response:
857 """
858 The function to delete an oauth provider from the database
859 """
860 title: str = "delete_oauth_provider"
861 if not provider:
862 return self.runtime_data_initialised.boilerplate_responses_initialised.provider_not_given(
863 title,
864 None
865 )
866 self.disp.log_debug(f"Provider: {provider}", title)
867 token: str = self.runtime_data_initialised.boilerplate_incoming_initialised.get_token_if_present(
868 request
869 )
870 self.disp.log_debug(f"Token gotten: {token}", title)
871 if self.runtime_data_initialised.boilerplate_non_http_initialised.is_token_admin(token) is False:
872 self.disp.log_error("You're not admin.", title)
873 return self.runtime_data_initialised.boilerplate_responses_initialised.insuffisant_rights(
874 title,
875 token
876 )
877 retrived_data = self.runtime_data_initialised.database_link.get_data_from_table(
878 CONST.TAB_USER_OAUTH_CONNECTION,
879 "*",
880 f"provider_name='{provider}'"
881 )
882 if isinstance(retrived_data, int):
883 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(
884 title,
885 token
886 )
887 retrieved_service_id = self.runtime_data_initialised.database_link.get_data_from_table(
888 CONST.TAB_SERVICES,
889 "id",
890 f"name='{provider}'"
891 )
892 if isinstance(retrieved_service_id, int):
893 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(
894 title,
895 token
896 )
897 service_id = str(retrieved_service_id[0]["id"])
898 if self.runtime_data_initialised.database_link.drop_data_from_table(
899 CONST.TAB_ACTIVE_OAUTHS,
900 f"service_id='{service_id}'"
901 ) == self.error:
902 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(
903 title,
904 token
905 )
906 if self.runtime_data_initialised.database_link.update_data_in_table(
907 CONST.TAB_SERVICES,
908 "0",
909 "oauth",
910 f"id='{service_id}'"
911 ) == self.error:
912 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(
913 title,
914 token
915 )
916 if self.runtime_data_initialised.database_link.drop_data_from_table(
917 CONST.TAB_USER_OAUTH_CONNECTION,
918 f"provider_name='{provider}'"
919 ) == self.error:
920 return self.runtime_data_initialised.boilerplate_responses_initialised.internal_server_error(
921 title,
922 token
923 )
924 body = self.runtime_data_initialised.boilerplate_responses_initialised.build_response_body(
925 title=title,
926 message="The oauth provider has been deleted successfully.",
927 resp="success",
928 token=token,
929 )
930 return HCI.success(
931 body,
932 content_type=CONST.CONTENT_TYPE,
933 headers=self.runtime_data_initialised.json_header
934 )
Response _handle_token_response(self, Dict token_response, str provider)
None __init__(self, RuntimeData runtime_data, int success=0, int error=84, bool debug=False)
Response _oauth_user_logger(self, Dict user_info, str provider, list connection_data)
Response update_oauth_provider_data(self, Request request, str provider)
Union[int, str] _generate_oauth_authorization_url(self, str provider)
Response add_oauth_provider(self, Request request, str provider="")
Union[str, None] refresh_token(self, str provider_name, str refresh_link)
Response delete_oauth_provider(self, Request request, str provider)
Response patch_oauth_provider_data(self, Request request, str provider)