46 Generate an OAuth authorization url depends on the given provider
48 title =
"generate_oauth_authorization_url"
50 CONST.TAB_USER_OAUTH_CONNECTION,
52 f
"provider_name='{provider}'"
54 self.
disp.log_debug(f
"Retrived provider: {retrived_provider}", title)
55 if isinstance(retrived_provider, int):
56 self.
disp.log_error(
"Unknown or Unsupported OAuth provider", title)
58 base_url = retrived_provider[0][
"authorisation_base_url"]
59 client_id = retrived_provider[0][
"client_id"]
60 scope = retrived_provider[0][
"provider_scope"]
61 redirect_uri = CONST.REDIRECT_URI
62 state = str(uuid.uuid4())
64 CONST.TAB_VERIFICATION)
65 self.
disp.log_debug(f
"Columns list: {columns}", title)
66 if isinstance(columns, int):
70 CONST.EMAIL_VERIFICATION_DELAY)
72 expiration_time,
False)
73 self.
disp.log_debug(f
"Expiration time: {et_str}", et_str)
82 if provider ==
"google":
83 url = f
"{base_url}?access_type=offline&client_id={client_id}&redirect_uri={redirect_uri}&prompt=consent"
85 url = f
"{base_url}?client_id={client_id}&redirect_uri={redirect_uri}"
86 url += f
"&response_type=code&scope={scope}&state={state}"
87 url = url.replace(
" ",
"%20")
88 url = url.replace(
":",
"%3A")
89 url = url.replace(
"/",
"%2F")
90 url = url.replace(
"?",
"%3F")
91 url = url.replace(
"&",
"%26")
92 self.
disp.log_debug(f
"url = {url}", title)
97 Exchange the OAuth authorization code for an access token
99 title =
"exchange_code_for_token"
102 CONST.TAB_USER_OAUTH_CONNECTION,
104 f
"provider_name='{provider}'"
106 if isinstance(retrieved_provider, int):
108 "exchange_code_for_token",
109 "Internal server error.",
110 "Internal server error.",
115 headers[
"Accept"] =
"application/json"
116 headers[
"Content-Type"] =
"application/x-www-form-urlencoded"
117 token_url = retrieved_provider[0][
"token_grabber_base_url"]
120 data[
"client_id"] = retrieved_provider[0][
"client_id"]
121 data[
"client_secret"] = retrieved_provider[0][
"client_secret"]
123 data[
"redirect_uri"] = CONST.REDIRECT_URI
124 data[
"grant_type"] =
"authorization_code"
126 response = requests.post(
127 token_url, data=data, headers=headers, timeout=10
129 self.
disp.log_debug(f
"Exchange response = {response}", title)
130 response.raise_for_status()
131 token_response = response.json()
132 if "error" in token_response:
133 msg =
"OAuth error: "
134 msg += f
"{token_response['error_description']}"
135 self.
disp.log_error(msg, title)
137 "exchange_code_for_token",
138 "Failed to get the token",
139 f
"{token_response['error']}",
143 return token_response
144 except requests.RequestException
as e:
145 self.
disp.log_error(f
"RequestException: {str(e)}", title)
147 "exchange_code_for_token",
148 "HTTP request failed.",
154 self.
disp.log_error(
"Failed to parse response JSON.", title)
156 "exchange_code_for_token",
157 "Invalid JSON response from provider.",
165 Get a user information depending
167 title: str =
"get_user_info"
169 CONST.TAB_USER_OAUTH_CONNECTION,
171 f
"provider_name='{provider}'"
173 self.
disp.log_debug(f
"Retrieved oauth provider data: {retrieved_data}", title)
174 if isinstance(retrieved_data, int):
177 "Failed to fetch the OAuth provider information.",
178 "Failed to fetch the OAuth provider information.",
183 "Authorization": f
"Bearer {access_token}"
185 user_info_url = retrieved_data[0][
"user_info_base_url"]
186 self.
disp.log_debug(f
"User info headers: {headers}", title)
187 self.
disp.log_debug(f
"User info url: {user_info_url}", title)
188 response = requests.get(user_info_url, headers=headers, timeout=10)
189 if response.status_code != 200:
192 "Failed to retrieve the user email from the provider",
197 user_info = response.json()
198 self.
disp.log_debug(f
"User info: {user_info}", title)
199 if provider ==
"github":
200 for _, info
in enumerate(user_info):
203 email[
"email"] = info[
"email"]
209 The function to insert or update the user information in the database
211 title: str =
"oauth_user_logger"
212 email: str = user_info[
"email"]
218 self.
disp.log_debug(f
"Retrieved user: {retrieved_user}", title)
219 if isinstance(retrieved_user, int)
is False:
225 msg =
"Retrieved provider: "
226 msg += f
"{retrieved_provider}"
227 self.
disp.log_debug(msg, title)
228 if isinstance(retrieved_user, int):
230 connection_data.append(str(retrieved_provider[0][
"id"]))
231 connection_data.append(str(retrieved_user[0][
"id"]))
232 self.
disp.log_debug(f
"Connection data: {connection_data}", title)
234 provider_id = str(retrieved_provider[0][
"id"])
235 user_id = str(retrieved_user[0][
"id"])
237 CONST.TAB_ACTIVE_OAUTHS,
239 f
"service_id='{provider_id}' AND user_id='{user_id}'"
242 CONST.TAB_ACTIVE_OAUTHS)
243 if isinstance(columns, int):
246 self.
disp.log_debug(f
"Columns list = {columns}", title)
248 CONST.TAB_ACTIVE_OAUTHS,
256 if user_data[
"status"] == self.
error:
260 message=
"User successfully logged in.",
262 token=user_data[
"token"],
265 body[
"token"] = user_data[
"token"]
268 content_type=CONST.CONTENT_TYPE,
273 if isinstance(columns, int):
276 self.
disp.log_debug(f
"Columns list = {columns}", title)
277 username: str = email.split(
'@')[0]
279 user_data.append(username)
280 user_data.append(email)
281 user_data.append(provider)
282 user_data.append(provider)
283 user_data.append(
"NULL")
284 user_data.append(str(int(
False)))
285 self.
disp.log_debug(f
"Data list = {user_data}", title)
293 self.
disp.log_debug(f
"Retrieved user: {retrieved_user}", title)
294 if isinstance(retrieved_user, int):
301 self.
disp.log_debug(f
"Retrieved provider: {retrieved_provider}", title)
302 if isinstance(retrieved_user, int):
304 connection_data.append(str(retrieved_provider[0][
"id"]))
305 connection_data.append(str(retrieved_user[0][
"id"]))
306 self.
disp.log_debug(f
"Connection data: {connection_data}", title)
308 CONST.TAB_ACTIVE_OAUTHS)
309 if isinstance(columns, int):
312 self.
disp.log_debug(f
"Columns list = {columns}", title)
314 CONST.TAB_ACTIVE_OAUTHS,
321 if user_data[
"status"] == self.
error:
325 message=
"User successfully logged in.",
327 token=user_data[
"token"],
330 body[
"token"] = user_data[
"token"]
333 content_type=CONST.CONTENT_TYPE,
339 The function that handle the response given by the provider for the oauth token
341 title =
"handle_token_response"
343 access_token: str = token_response[
"access_token"]
346 data.append(access_token)
347 self.
disp.log_debug(f
"Gotten access token: {access_token}", title)
348 if provider ==
"github":
357 expires: int = token_response[
"expires_in"]
361 "The expiration time was not found in the provided response.",
362 "Expiration time not found.",
366 return HCI.bad_request(
368 content_type=CONST.CONTENT_TYPE,
371 current_time = datetime.now()
372 new_time = current_time + timedelta(seconds=expires)
376 data.append(expiration_date)
377 data.append(str(expires))
378 refresh_link = token_response[
"refresh_token"]
382 "The refresh link was not found in the provided response.",
383 "Refresh link not found.",
387 return HCI.bad_request(
389 content_type=CONST.CONTENT_TYPE,
392 data.append(refresh_link)
393 self.
disp.log_debug(f
"Generated data for new oauth connexion user: {data}", title)
395 if "error" in user_info:
403 return HCI.internal_server_error(
405 content_type=CONST.CONTENT_TYPE,
410 def refresh_token(self, provider_name: str, refresh_link: str) -> Union[str,
None]:
412 The function that use the given provider name and refresh link to generate a new token for oauth authentication
414 title: str =
"refresh_token"
415 if any(word
in provider_name
for word
in (
"google",
"discord",
"spotify")):
417 CONST.TAB_USER_OAUTH_CONNECTION,
419 f
"provider_name='{provider_name}'"
421 msg =
"Retrieved provider data:"
422 msg += f
"{retrieved_data}"
423 self.
disp.log_debug(msg, title)
424 if isinstance(retrieved_data, int):
426 "An error has been detected when retrieving the provider data", title
429 token_url: str = retrieved_data[0][
"token_grabber_base_url"]
430 generated_data: dict = {}
431 generated_data[
"client_id"] = retrieved_data[0][
"client_id"]
432 generated_data[
"client_secret"] = retrieved_data[0][
"client_secret"]
433 generated_data[
"refresh_token"] = refresh_link
434 generated_data[
"grant_type"] =
"refresh_token"
435 self.
disp.log_debug(f
"Generated data: {generated_data}", title)
436 provider_response: Response = requests.post(
437 token_url, data=generated_data, timeout=10
439 self.
disp.log_debug(f
"Provider response: {provider_response}", title)
440 if provider_response.status_code == 200:
441 token_response = provider_response.json()
442 msg =
"Provider response to json: "
443 msg += f
"{token_response}"
444 self.
disp.log_debug(msg, title)
445 if "access_token" in token_response:
446 return token_response[
"access_token"]
449 self.
disp.log_error(
"The provider is not recognised", title)
454 Callback of the OAuth login
456 title =
"oauth_callback"
457 query_params = request.query_params
461 message=
"Query parameters not provided.",
462 resp=
"no query parameters",
466 return HCI.bad_request(
468 content_type=CONST.CONTENT_TYPE,
471 self.
disp.log_debug(f
"Query params: {query_params}", title)
472 code = query_params.get(
"code")
473 self.
disp.log_debug(f
"Code: {code}", title)
474 state = query_params.get(
"state")
475 self.
disp.log_debug(f
"State: {state}", title)
476 if not code
or not state:
479 message=
"Authorization code or state not provided.",
480 resp=
"no code or state",
484 return HCI.bad_request(
486 content_type=CONST.CONTENT_TYPE,
489 uuid_gotten, provider = state.split(
":")
490 if not uuid_gotten
or not provider:
493 message=
"The state is in bad format.",
494 resp=
"bad state format",
498 return HCI.bad_request(
500 content_type=CONST.CONTENT_TYPE,
503 self.
disp.log_debug(f
"Uuid retrived: {uuid_gotten}", title)
504 self.
disp.log_debug(f
"Provider: {provider}", title)
506 CONST.TAB_VERIFICATION,
508 f
"definition='{uuid_gotten}'"
510 self.
disp.log_debug(f
"Data received: {data}", title)
511 if isinstance(data, int):
514 CONST.TAB_VERIFICATION,
515 f
"definition='{uuid_gotten}'"
519 self.
disp.log_debug(f
"Token response: {token_response}", title)
520 if "error" in token_response:
523 message=
"Failed to get the token.",
524 resp=token_response[
"error"],
528 return HCI.bad_request(
530 content_type=CONST.CONTENT_TYPE,
537 Get the authorization url for the OAuth login depending on the provider
539 title =
"oauth_login"
541 self.
disp.log_debug(f
"Request body: {request_body}", title)
542 if not request_body
or "provider" not in request_body:
545 message=
"The provider is not found in the request.",
550 return HCI.bad_request(
552 content_type=CONST.CONTENT_TYPE,
555 provider = request_body[
"provider"]
556 self.
disp.log_debug(f
"Oauth login provider: {provider}", title)
558 self.
disp.log_debug(f
"Authorization url: {authorization_url}", title)
559 if isinstance(authorization_url, int):
563 message=
"Authorization url successfully generated.",
568 body[
"authorization_url"] = authorization_url
571 content_type=CONST.CONTENT_TYPE,
577 Add a new oauth provider to the database (only for admin)
579 title: str =
"add_oauth_provider"
583 message=
"The provider is not provided.",
588 return HCI.bad_request(
590 content_type=CONST.CONTENT_TYPE,
593 self.
disp.log_debug(f
"Provider: {provider}", title)
598 self.
disp.log_error(
"You're not admin.", title)
601 CONST.TAB_USER_OAUTH_CONNECTION,
603 f
"provider_name='{provider}'"
605 if isinstance(retrived_data, int)
is False:
608 message=
"The provider already exist in the database.",
609 resp=
"provider already exist",
615 content_type=CONST.CONTENT_TYPE,
619 if not request_body
or not all(key
in request_body
for key
in (
"client_id",
"client_secret",
"provider_scope",
"authorisation_base_url",
"token_grabber_base_url",
"user_info_base_url")):
622 message=
"A variable is missing in the body.",
623 resp=
"missing variable",
627 return HCI.bad_request(
629 content_type=CONST.CONTENT_TYPE,
632 self.
disp.log_debug(f
"Request body: {request_body}", title)
633 client_id = request_body[
"client_id"]
634 client_secret = request_body[
"client_secret"]
635 provider_scope = request_body[
"provider_scope"]
636 authorisation_base_url = request_body[
"authorisation_base_url"]
637 token_grabber_base_url = request_body[
"token_grabber_base_url"]
638 user_info_base_url = request_body[
"user_info_base_url"]
644 authorisation_base_url,
645 token_grabber_base_url,
649 CONST.TAB_USER_OAUTH_CONNECTION
651 if isinstance(columns, int):
654 self.
disp.log_debug(f
"Columns: {columns}", title)
658 column: list = [
"oauth"]
668 message=
"The provider is successfully added.",
675 content_type=CONST.CONTENT_TYPE,
681 The function that modify every value of an oauth provider
683 title: str =
"update_oauth_provider_data"
686 self.
disp.log_debug(f
"Provider: {provider}", title)
690 self.
disp.log_debug(f
"Token received: {token}", title)
692 self.
disp.log_error(
"You're not admin.", title)
695 CONST.TAB_USER_OAUTH_CONNECTION,
697 f
"provider_name='{provider}'")
698 if isinstance(retrived_data, int):
701 if not request_body
or not all(key
in request_body
for key
in (
"client_id",
"client_secret",
"provider_scope",
"authorisation_base_url",
"token_grabber_base_url",
"user_info_base_url")):
704 message=
"A variable is missing in the body.",
705 resp=
"missing variable",
709 return HCI.bad_request(
711 content_type=CONST.CONTENT_TYPE,
714 self.
disp.log_debug(f
"Request body: {request_body}", title)
715 client_id = request_body[
"client_id"]
716 client_secret = request_body[
"client_secret"]
717 provider_scope = request_body[
"provider_scope"]
718 authorisation_base_url = request_body[
"authorisation_base_url"]
719 token_grabber_base_url = request_body[
"token_grabber_base_url"]
720 user_info_base_url = request_body[
"user_info_base_url"]
725 authorisation_base_url,
726 token_grabber_base_url,
729 self.
disp.log_debug(f
"Generated data: {data}", title)
731 CONST.TAB_USER_OAUTH_CONNECTION
733 if isinstance(columns, int):
737 self.
disp.log_debug(f
"Columns: {columns}", title)
739 CONST.TAB_USER_OAUTH_CONNECTION,
742 f
"provider_name='{provider}'"
747 message=
"The provider is successfully updated.",
754 content_type=CONST.CONTENT_TYPE,
760 The function that modify every value of an oauth provider
762 title: str =
"update_oauth_provider_data"
765 self.
disp.log_debug(f
"Provider: {provider}", title)
769 self.
disp.log_debug(f
"Token gotten: {token}", title)
771 self.
disp.log_error(
"You're not admin.", title)
774 CONST.TAB_USER_OAUTH_CONNECTION,
776 f
"provider_name='{provider}'"
778 if isinstance(retrived_data, int):
783 self.
disp.log_debug(f
"Request body: {request_body}", title)
784 if "provider_name" in request_body:
786 CONST.TAB_USER_OAUTH_CONNECTION,
793 if "client_id" in request_body:
795 CONST.TAB_USER_OAUTH_CONNECTION,
802 if "client_secret" in request_body:
804 CONST.TAB_USER_OAUTH_CONNECTION,
811 if "provider_scope" in request_body:
813 CONST.TAB_USER_OAUTH_CONNECTION,
820 if "authorisation_base_url" in request_body:
822 CONST.TAB_USER_OAUTH_CONNECTION,
824 "authorisation_base_url",
829 if "token_grabber_base_url" in request_body:
831 CONST.TAB_USER_OAUTH_CONNECTION,
833 "token_grabber_base_url",
838 if "user_info_base_url" in request_body:
840 CONST.TAB_USER_OAUTH_CONNECTION,
842 "user_info_base_url",
849 message=
"The provider is successfully updated.",
858 The function to delete an oauth provider from the database
860 title: str =
"delete_oauth_provider"
866 self.
disp.log_debug(f
"Provider: {provider}", title)
870 self.
disp.log_debug(f
"Token gotten: {token}", title)
872 self.
disp.log_error(
"You're not admin.", title)
878 CONST.TAB_USER_OAUTH_CONNECTION,
880 f
"provider_name='{provider}'"
882 if isinstance(retrived_data, int):
892 if isinstance(retrieved_service_id, int):
897 service_id = str(retrieved_service_id[0][
"id"])
899 CONST.TAB_ACTIVE_OAUTHS,
900 f
"service_id='{service_id}'"
917 CONST.TAB_USER_OAUTH_CONNECTION,
918 f
"provider_name='{provider}'"
926 message=
"The oauth provider has been deleted successfully.",
932 content_type=CONST.CONTENT_TYPE,