| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 |
- # IRIS Source Code
- # Copyright (C) 2021 - Airbus CyberSecurity (SAS)
- # ir@cyberactionlab.net
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 3 of the License, or (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License
- # along with this program; if not, write to the Free Software Foundation,
- # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- import base64
- import io
- import pyotp
- import qrcode
- import random
- import string
- from flask import Blueprint, flash
- from flask import redirect
- from flask import render_template
- from flask import request
- from flask import session
- from flask import url_for
- from flask_login import current_user
- from oic import rndstr
- from oic.oic.message import AuthorizationResponse
- from app import app
- from app import bc
- from app import db
- from app import oidc_client
- from app.blueprints.access_controls import is_authentication_oidc, is_authentication_ldap
- from app.blueprints.responses import response_error
- from app.business.auth import validate_ldap_login, _retrieve_user_by_username, wrap_login_user
- from app.datamgmt.manage.manage_users_db import create_user
- from app.datamgmt.manage.manage_users_db import get_user
- from app.forms import LoginForm, MFASetupForm
- from app.iris_engine.utils.tracker import track_activity
- login_blueprint = Blueprint(
- 'login',
- __name__,
- template_folder='templates'
- )
- log = app.logger
- # filter User out of database through username
- def _render_template_login(form, msg):
- organisation_name = app.config.get('ORGANISATION_NAME')
- login_banner = app.config.get('LOGIN_BANNER_TEXT')
- ptfm_contact = app.config.get('LOGIN_PTFM_CONTACT')
- auth_type = app.config.get('AUTHENTICATION_TYPE')
- return render_template('login.html', form=form, msg=msg, organisation_name=organisation_name,
- login_banner=login_banner, ptfm_contact=ptfm_contact, auth_type=auth_type)
- def _validate_local_login(username, password):
- user = _retrieve_user_by_username(username)
- if not user:
- return None
- if bc.check_password_hash(user.password, password):
- return user
- track_activity(f'wrong login password for user \'{username}\' using local auth', ctx_less=True, display_in_ui=False)
- return None
- def _authenticate_ldap(form, username, password, local_fallback=True):
- try:
- user = validate_ldap_login(username, password, local_fallback)
- if user is None:
- return _render_template_login(form, 'Wrong credentials. Please try again.')
- return wrap_login_user(user)
- except Exception as e:
- log.error(e.__str__())
- return _render_template_login(form, 'LDAP authentication unavailable. Check server logs')
- def _authenticate_password(form, username, password):
- user = _retrieve_user_by_username(username)
- if not user or user.is_service_account:
- return _render_template_login(form, 'Wrong credentials. Please try again.')
- if bc.check_password_hash(user.password, password):
- return wrap_login_user(user)
- track_activity(f'wrong login password for user \'{username}\' using local auth', ctx_less=True,
- display_in_ui=False)
- return _render_template_login(form, 'Wrong credentials. Please try again.')
- # Authenticate user
- if app.config.get("AUTHENTICATION_TYPE") in ["local", "ldap", "oidc"]:
- @login_blueprint.route('/login', methods=['GET', 'POST'])
- def login():
- if current_user.is_authenticated:
- return redirect(url_for('index.index'))
- if is_authentication_oidc() and app.config.get('AUTHENTICATION_LOCAL_FALLBACK') is False:
- return redirect(url_for('login.oidc_login'))
- form = LoginForm(request.form)
- # check if both http method is POST and form is valid on submit
- if not form.is_submitted() and not form.validate():
- return _render_template_login(form, None)
- # assign form data to variables
- username = request.form.get('username', '', type=str)
- password = request.form.get('password', '', type=str)
- if is_authentication_ldap() is True:
- return _authenticate_ldap(form, username, password, app.config.get('AUTHENTICATION_LOCAL_FALLBACK'))
- return _authenticate_password(form, username, password)
- if is_authentication_oidc():
- @login_blueprint.route('/oidc-login')
- def oidc_login():
- if current_user.is_authenticated:
- return redirect(url_for('index.index'))
- session["oidc_state"] = rndstr()
- session["oidc_nonce"] = rndstr()
- args = {
- "client_id": oidc_client.client_id,
- "response_type": "code",
- "scope": app.config.get("OIDC_SCOPES"),
- "nonce": session["oidc_nonce"],
- "redirect_uri": url_for("login.oidc_authorise", _external=True),
- "state": session["oidc_state"]
- }
- auth_req = oidc_client.construct_AuthorizationRequest(request_args=args)
- login_url = auth_req.request(oidc_client.authorization_endpoint)
- return redirect(login_url)
- if is_authentication_oidc():
- @login_blueprint.route('/oidc-authorize')
- def oidc_authorise():
- auth_resp = oidc_client.parse_response(AuthorizationResponse, info=request.args,
- sformat="dict")
- if auth_resp["state"] != session["oidc_state"]:
- track_activity(
- f"OIDC session state '{auth_resp['state']}' does not match authorization state '{session['oidc_state']}'",
- ctx_less=True,
- display_in_ui=False,
- )
- return redirect(url_for("login.login"))
- args = {
- "code": auth_resp["code"],
- }
- access_token_resp = oidc_client.do_access_token_request(state=auth_resp["state"], request_args=args)
- # not all providers set email by default, use preferred_username where it's missing
- # Use the mapping from the configuration or default to email or preferred_username if not set
- email_field = app.config.get("OIDC_MAPPING_EMAIL")
- username_field = app.config.get("OIDC_MAPPING_USERNAME")
- user_login = access_token_resp['id_token'].get(username_field) or access_token_resp['id_token'].get(email_field)
- user_name = access_token_resp['id_token'].get(email_field) or access_token_resp['id_token'].get(username_field)
- user = get_user(user_login, 'user')
- if not user:
- log.warning(f'OIDC user {user_login} not found in database')
- if app.config.get('AUTHENTICATION_CREATE_USER_IF_NOT_EXIST') is False:
- log.warning('Authentication is set to not create user if not exists')
- track_activity(
- f'OIDC user {user_login} not found in database',
- ctx_less=True,
- display_in_ui=False,
- )
- return response_error('User not found in IRIS', 404)
- log.info(f'Creating OIDC user {user_login} in database')
- track_activity(
- f'Creating OIDC user {user_login} in database',
- ctx_less=True,
- display_in_ui=False,
- )
- # generate random password
- password = ''.join(random.choices(string.printable[:-6], k=16))
- user = create_user(
- user_name=user_name,
- user_login=user_login,
- user_email=user_login,
- user_password=bc.generate_password_hash(password.encode('utf8')).decode('utf8'),
- user_active=True,
- user_is_service_account=False
- )
- if user and not user.active:
- return response_error("User not active in IRIS", 403)
- return wrap_login_user(user, is_oidc=True)
- @app.route('/auth/mfa-setup', methods=['GET', 'POST'])
- def mfa_setup():
- user = _retrieve_user_by_username(username=session['username'])
- form = MFASetupForm()
- if form.submit() and form.validate():
- token = form.token.data
- mfa_secret = form.mfa_secret.data
- user_password = form.user_password.data
- totp = pyotp.TOTP(mfa_secret)
- if totp.verify(token):
- has_valid_password = False
- if is_authentication_ldap() is True:
- if validate_ldap_login(user.user, user_password,
- local_fallback=app.config.get('AUTHENTICATION_LOCAL_FALLBACK')):
- has_valid_password = True
- elif bc.check_password_hash(user.password, user_password):
- has_valid_password = True
- if not has_valid_password:
- track_activity(f'Failed MFA setup for user {user.user}. Invalid password.', ctx_less=True, display_in_ui=False)
- flash('Invalid password. Please try again.', 'danger')
- return render_template('mfa_setup.html', form=form)
- user.mfa_secrets = mfa_secret
- user.mfa_setup_complete = True
- db.session.commit()
- session["mfa_verified"] = False
- track_activity(f'MFA setup successful for user {user.user}', ctx_less=True, display_in_ui=False)
- return wrap_login_user(user)
- else:
- track_activity(f'Failed MFA setup for user {user.user}. Invalid token.', ctx_less=True, display_in_ui=False)
- flash('Invalid token or password. Please try again.', 'danger')
- temp_otp_secret = pyotp.random_base32()
- otp_uri = pyotp.TOTP(temp_otp_secret).provisioning_uri(user.email, issuer_name="IRIS")
- form.mfa_secret.data = temp_otp_secret
- img = qrcode.make(otp_uri)
- buf = io.BytesIO()
- img.save(buf, format='PNG')
- img_str = base64.b64encode(buf.getvalue()).decode()
- return render_template('mfa_setup.html', form=form, img_data=img_str, otp_setup_key=temp_otp_secret)
- @app.route('/auth/mfa-verify', methods=['GET', 'POST'])
- def mfa_verify():
- if 'username' not in session:
- return redirect(url_for('login.login'))
- user = _retrieve_user_by_username(username=session['username'])
- # Redirect user to MFA setup if MFA is not fully set up
- if not user.mfa_secrets or not user.mfa_setup_complete:
- track_activity(f'MFA setup required for user {user.user}', ctx_less=True, display_in_ui=False)
- return redirect(url_for('mfa_setup'))
- form = MFASetupForm()
- form.user_password.data = 'not required for verification'
- if form.submit() and form.validate():
- token = form.token.data
- if not token:
- flash('Token is required.', 'danger')
- return render_template('mfa_verify.html', form=form)
- totp = pyotp.TOTP(user.mfa_secrets)
- if totp.verify(token):
- session.pop('username', None)
- session['mfa_verified'] = True
- track_activity(f'MFA verification successful for user {user.user}', ctx_less=True, display_in_ui=False)
- return wrap_login_user(user)
- else:
- track_activity(f'Failed MFA verification for user {user.user}. Invalid token.', ctx_less=True, display_in_ui=False)
- flash('Invalid token. Please try again.', 'danger')
- return render_template('mfa_verify.html', form=form)
|