暫無描述

auth.py 4.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. from urllib.parse import urlparse, urljoin
  2. from flask import session, redirect, url_for, request
  3. from flask_login import login_user
  4. from app import bc, app, db
  5. from app.datamgmt.manage.manage_srv_settings_db import get_server_settings_as_dict
  6. from app.datamgmt.manage.manage_users_db import get_active_user_by_login
  7. from app.iris_engine.access_control.ldap_handler import ldap_authenticate
  8. from app.iris_engine.access_control.utils import ac_get_effective_permissions_of_user
  9. from app.iris_engine.utils.tracker import track_activity
  10. from app.models.cases import Cases
  11. from app.schema.marshables import UserSchema
  12. log = app.logger
  13. def _retrieve_user_by_username(username:str):
  14. """
  15. Retrieve the user object by username.
  16. :param username: Username
  17. :return: User object if found, None
  18. """
  19. user = get_active_user_by_login(username)
  20. if not user:
  21. track_activity(f'someone tried to log in with user \'{username}\', which does not exist',
  22. ctx_less=True, display_in_ui=False)
  23. return user
  24. def validate_ldap_login(username: str, password:str, local_fallback: bool = True):
  25. """
  26. Validate the user login using LDAP authentication.
  27. :param username: Username
  28. :param password: Password
  29. :param local_fallback: If True, will fallback to local authentication if LDAP fails.
  30. :return: User object if successful, None otherwise
  31. """
  32. try:
  33. if ldap_authenticate(username, password) is False:
  34. if local_fallback is True:
  35. track_activity(f'wrong login password for user \'{username}\' using LDAP auth - falling back to local based on settings',
  36. ctx_less=True, display_in_ui=False)
  37. return validate_local_login(username, password)
  38. track_activity(f'wrong login password for user \'{username}\' using LDAP auth', ctx_less=True, display_in_ui=False)
  39. return None
  40. user = _retrieve_user_by_username(username)
  41. if not user:
  42. return None
  43. return UserSchema(exclude=['user_password', 'mfa_secrets', 'webauthn_credentials']).dump(user)
  44. except Exception as e:
  45. log.error(e.__str__())
  46. return None
  47. def validate_local_login(username: str, password: str):
  48. """
  49. Validate the user login using local authentication.
  50. :param username: Username
  51. :param password: Password
  52. :return: User object if successful, None otherwise
  53. """
  54. user = _retrieve_user_by_username(username)
  55. if not user:
  56. return None
  57. if bc.check_password_hash(user.password, password):
  58. wrap_login_user(user)
  59. return UserSchema(exclude=['user_password', 'mfa_secrets', 'webauthn_credentials']).dump(user)
  60. track_activity(f'wrong login password for user \'{username}\' using local auth', ctx_less=True, display_in_ui=False)
  61. return None
  62. def is_safe_url(target):
  63. """
  64. Check whether the target URL is safe for redirection by ensuring that it is either a relative URL or
  65. has the same host as the current request.
  66. """
  67. ref_url = urlparse(request.host_url)
  68. test_url = urlparse(urljoin(request.host_url, target))
  69. return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
  70. def _filter_next_url(next_url, context_case):
  71. """
  72. Ensures that the URL to which the user is redirected is safe. If the provided URL is not safe or is missing,
  73. a default URL (typically the index page) is returned.
  74. """
  75. if not next_url:
  76. return url_for('index.index', cid=context_case)
  77. # Remove backslashes to mitigate obfuscation
  78. next_url = next_url.replace('\\', '')
  79. if is_safe_url(next_url):
  80. return next_url
  81. return url_for('index.index', cid=context_case)
  82. def wrap_login_user(user, is_oidc=False):
  83. session['username'] = user.user
  84. if 'SERVER_SETTINGS' not in app.config:
  85. app.config['SERVER_SETTINGS'] = get_server_settings_as_dict()
  86. if app.config['SERVER_SETTINGS']['enforce_mfa'] is True and is_oidc is False:
  87. if "mfa_verified" not in session or session["mfa_verified"] is False:
  88. return redirect(url_for('mfa_verify'))
  89. login_user(user)
  90. caseid = user.ctx_case
  91. session['permissions'] = ac_get_effective_permissions_of_user(user)
  92. if caseid is None:
  93. case = Cases.query.order_by(Cases.case_id).first()
  94. user.ctx_case = case.case_id
  95. user.ctx_human_case = case.name
  96. db.session.commit()
  97. session['current_case'] = {
  98. 'case_name': user.ctx_human_case,
  99. 'case_info': "",
  100. 'case_id': user.ctx_case
  101. }
  102. track_activity(f'user \'{user.user}\' successfully logged-in', ctx_less=True, display_in_ui=False)
  103. next_url = _filter_next_url(request.args.get('next'), user.ctx_case)
  104. return redirect(next_url)