# IRIS Source Code # Copyright (C) 2024 - DFIR-IRIS # contact@dfir-iris.org # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 3 of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. import json import logging as log import traceback import uuid from functools import wraps import jwt import requests from flask import Request from flask import url_for from flask import request from flask import render_template from flask import session from flask_login import current_user from flask_login import login_user from flask_wtf import FlaskForm from jwt import PyJWKClient from requests.auth import HTTPBasicAuth from werkzeug.utils import redirect from app import TEMPLATE_PATH from app import app from app import db from app.blueprints.responses import response_error from app.datamgmt.case.case_db import get_case from app.datamgmt.manage.manage_access_control_db import user_has_client_access from app.datamgmt.manage.manage_users_db import get_user from app.iris_engine.access_control.utils import ac_fast_check_user_has_case_access from app.iris_engine.access_control.utils import ac_get_effective_permissions_of_user from app.iris_engine.utils.tracker import track_activity from app.models.cases import Cases from app.models.authorization import Permissions from app.models.authorization import CaseAccessLevel def _user_has_at_least_a_required_permission(permissions: list[Permissions]): """ Returns true as soon as the user has at least one permission in the list of permissions Returns true if the list of required permissions is empty """ if not permissions: return True for permission in permissions: if session['permissions'] & permission.value: return True return False def _set_caseid_from_current_user(): redir = False if current_user.ctx_case is None: redir = True current_user.ctx_case = 1 caseid = current_user.ctx_case return redir, caseid def _log_exception_and_error(e): log.exception(e) log.error(traceback.print_exc()) def _get_caseid_from_request_data(request_data, no_cid_required): caseid = request_data.args.get('cid', default=None, type=int) if caseid: return False, caseid, True if no_cid_required: return False, caseid, True js_d = None try: if request_data.content_type == 'application/json': js_d = request_data.get_json() if not js_d: redir, caseid = _set_caseid_from_current_user() return redir, caseid, True if 'cid' not in js_d: cookie_session = request_data.cookies.get('session') if not cookie_session: redir, caseid = _set_caseid_from_current_user() return redir, caseid, True caseid = js_d.get('cid') return False, caseid, True except Exception as e: cookie_session = request_data.cookies.get('session') if not cookie_session: redir, caseid = _set_caseid_from_current_user() return redir, caseid, True _log_exception_and_error(e) return True, 0, False def _handle_no_cid_required(no_cid_required): if no_cid_required: js_d = request.get_json(silent=True) try: if type(js_d) == str: js_d = json.loads(js_d) caseid = js_d.get('cid') if type(js_d) == dict else None if caseid and 'cid' in request.json: request.json.pop('cid') except Exception: return None, False return caseid, True return None, False def _update_denied_case(caseid): session['current_case'] = { 'case_name': "{} to #{}".format("Access denied", caseid), 'case_info': "", 'case_id': caseid, 'access': '' } def _update_current_case(caseid, restricted_access): if session['current_case']['case_id'] != caseid: case = get_case(caseid) if case: session['current_case'] = { 'case_name': "{}".format(case.name), 'case_info': "(#{} - {})".format(caseid, case.client.name), 'case_id': caseid, 'access': restricted_access } def _update_session(caseid, eaccess_level): restricted_access = '' if not eaccess_level: eaccess_level = [CaseAccessLevel.read_only, CaseAccessLevel.full_access] if CaseAccessLevel.read_only.value == eaccess_level: restricted_access = '' _update_current_case(caseid, restricted_access) # TODO would be nice to remove parameter no_cid_required def _get_case_access(request_data, access_level, no_cid_required=False): redir, caseid, has_access = _get_caseid_from_request_data(request_data, no_cid_required) ctmp, has_access = _handle_no_cid_required(no_cid_required) redir = False if ctmp is not None: return redir, ctmp, has_access eaccess_level = ac_fast_check_user_has_case_access(current_user.id, caseid, access_level) if eaccess_level is None and access_level: _update_denied_case(caseid) return redir, caseid, False _update_session(caseid, eaccess_level) if caseid is not None and not get_case(caseid): log.warning('No case found. Using default case') return True, 1, True return redir, caseid, True def _is_csrf_token_valid(): if request.method != 'POST': return True if request.headers.get('X-IRIS-AUTH') is not None: return True if request.headers.get('Authorization') is not None: return True cookie_session = request.cookies.get('session') # True in the absence of a session cookie, because no CSRF token is required for API calls if not cookie_session: return True form = FlaskForm() if not form.validate(): return False # TODO not nice to have a side-effect within a 'is' method. if request.is_json: request.json.pop('csrf_token') return True def _ac_return_access_denied(caseid: int = None): error_uuid = uuid.uuid4() log.warning(f"Access denied to case #{caseid} for user ID {current_user.id}. Error {error_uuid}") return render_template('pages/error-403.html', user=current_user, caseid=caseid, error_uuid=error_uuid, template_folder=TEMPLATE_PATH), 403 def ac_requires_case_identifier(*access_level): def decorate_with_requires_case_identifier(f): @wraps(f) def wrap(*args, **kwargs): try: redir, caseid, has_access = get_case_access_from_api(request, access_level) except Exception as e: log.exception(e) return response_error('Invalid data. Check server logs', status=500) if not caseid and not redir: return response_error('Invalid case ID', status=404) if not has_access: return ac_api_return_access_denied(caseid=caseid) kwargs.update({'caseid': caseid}) return f(*args, **kwargs) return wrap return decorate_with_requires_case_identifier def get_case_access_from_api(request_data, access_level): redir, caseid, has_access = _get_caseid_from_request_data(request_data, False) redir = False if not hasattr(current_user, 'id'): # Anonymous request, deny access return False, 1, False eaccess_level = ac_fast_check_user_has_case_access(current_user.id, caseid, access_level) if eaccess_level is None and access_level: return redir, caseid, False if caseid is not None and not get_case(caseid): log.warning('No case found. Using default case') return True, 1, True return redir, caseid, True def not_authenticated_redirection_url(request_url: str): redirection_mapper = { "oidc_proxy": lambda: app.config.get("AUTHENTICATION_PROXY_LOGOUT_URL"), "local": lambda: url_for('login.login', next=request_url), "ldap": lambda: url_for('login.login', next=request_url), "oidc": lambda: url_for('login.login', next=request_url,) } return redirection_mapper.get(app.config.get("AUTHENTICATION_TYPE"))() def ac_case_requires(*access_level): def inner_wrap(f): @wraps(f) def wrap(*args, **kwargs): if not is_user_authenticated(request): return redirect(not_authenticated_redirection_url(request.full_path)) redir, caseid, has_access = _get_case_access(request, access_level) if not has_access: return _ac_return_access_denied(caseid=caseid) kwargs.update({"caseid": caseid, "url_redir": redir}) return f(*args, **kwargs) return wrap return inner_wrap # TODO try to remove option no_cid_required def ac_requires(*permissions, no_cid_required=False): def inner_wrap(f): @wraps(f) def wrap(*args, **kwargs): if not is_user_authenticated(request): return redirect(not_authenticated_redirection_url(request.full_path)) redir, caseid, _ = _get_case_access(request, [], no_cid_required=no_cid_required) kwargs.update({'caseid': caseid, 'url_redir': redir}) if not _user_has_at_least_a_required_permission(permissions): return _ac_return_access_denied() return f(*args, **kwargs) return wrap return inner_wrap def ac_api_requires(*permissions): def inner_wrap(f): @wraps(f) def wrap(*args, **kwargs): if not _is_csrf_token_valid(): return response_error('Invalid CSRF token') if not is_user_authenticated(request): return response_error('Authentication required', status=401) if 'permissions' not in session: session['permissions'] = ac_get_effective_permissions_of_user(current_user) if not _user_has_at_least_a_required_permission(permissions): return response_error('Permission denied', status=403) return f(*args, **kwargs) return wrap return inner_wrap def ac_requires_client_access(): def inner_wrap(f): @wraps(f) def wrap(*args, **kwargs): client_id = kwargs.get('client_id') if not user_has_client_access(current_user.id, client_id): return _ac_return_access_denied() return f(*args, **kwargs) return wrap return inner_wrap def ac_socket_requires(*access_level): def inner_wrap(f): @wraps(f) def wrap(*args, **kwargs): if not is_user_authenticated(request): return redirect(not_authenticated_redirection_url(request.full_path)) else: chan_id = args[0].get('channel') if chan_id: case_id = int(chan_id.replace('case-', '').split('-')[0]) else: return _ac_return_access_denied(caseid=0) access = ac_fast_check_user_has_case_access(current_user.id, case_id, access_level) if not access: return _ac_return_access_denied(caseid=case_id) return f(*args, **kwargs) return wrap return inner_wrap def ac_api_return_access_denied(caseid: int = None): user_id = current_user.id if hasattr(current_user, 'id') else 'Anonymous' error_uuid = uuid.uuid4() log.warning(f"EID {error_uuid} - Access denied with case #{caseid} for user ID {user_id} " f"accessing URI {request.full_path}") data = { 'user_id': user_id, 'case_id': caseid, 'error_uuid': error_uuid } return response_error('Permission denied', data=data, status=403) def ac_api_requires_client_access(): def inner_wrap(f): @wraps(f) def wrap(*args, **kwargs): client_id = kwargs.get('client_id') if not user_has_client_access(current_user.id, client_id): return response_error("Permission denied", status=403) return f(*args, **kwargs) return wrap return inner_wrap def _authenticate_with_email(user_email): user = get_user(user_email, id_key="email") if not user: log.error(f'User with email {user_email} is not registered in the IRIS') return False login_user(user) track_activity(f"User '{user.id}' successfully logged-in", ctx_less=True) caseid = user.ctx_case session['permissions'] = ac_get_effective_permissions_of_user(user) if caseid is None: case = Cases.query.order_by(Cases.case_id).first() user.ctx_case = case.case_id user.ctx_human_case = case.name db.session.commit() session['current_case'] = { 'case_name': user.ctx_human_case, 'case_info': "", 'case_id': user.ctx_case } return True def _oidc_proxy_authentication_process(incoming_request: Request): # Get the OIDC JWT authentication token from the request header authentication_token = incoming_request.headers.get('X-Forwarded-Access-Token', '') if app.config.get("AUTHENTICATION_TOKEN_VERIFY_MODE") == 'lazy': user_email = incoming_request.headers.get('X-Email') if user_email: return _authenticate_with_email(user_email.split(',')[0]) elif app.config.get("AUTHENTICATION_TOKEN_VERIFY_MODE") == 'introspection': # Use the authentication server's token introspection endpoint in order to determine if the request is valid / # authenticated. The TLS_ROOT_CA is used to validate the authentication server's certificate. # The other solution was to skip the certificate verification, BUT as the authentication server might be # located on another server, this check is necessary. introspection_body = {"token": authentication_token} introspection = requests.post( app.config.get("AUTHENTICATION_TOKEN_INTROSPECTION_URL"), auth=HTTPBasicAuth(app.config.get('AUTHENTICATION_CLIENT_ID'), app.config.get('AUTHENTICATION_CLIENT_SECRET')), data=introspection_body, verify=app.config.get("TLS_ROOT_CA") ) if introspection.status_code == 200: response_json = introspection.json() if response_json.get("active", False) is True: user_email = response_json.get("sub") return _authenticate_with_email(user_email=user_email) else: log.info("USER IS NOT AUTHENTICATED") return False elif app.config.get("AUTHENTICATION_TOKEN_VERIFY_MODE") == 'signature': # Use the JWKS urls provided by the OIDC discovery to fetch the signing keys # and check the signature of the token try: jwks_client = PyJWKClient(app.config.get("AUTHENTICATION_JWKS_URL")) signing_key = jwks_client.get_signing_key_from_jwt(authentication_token) try: data = jwt.decode( authentication_token, signing_key.key, algorithms=["RS256"], audience=app.config.get("AUTHENTICATION_AUDIENCE"), options={"verify_exp": app.config.get("AUTHENTICATION_VERIFY_TOKEN_EXP")}, ) except jwt.ExpiredSignatureError: log.error("Provided token has expired") return False except Exception as e: log.error(f"Error decoding JWT. {e.__str__()}") return False # Extract the user email user_email = data.get("sub") return _authenticate_with_email(user_email) else: log.error("ERROR DURING TOKEN INTROSPECTION PROCESS") return False def _local_authentication_process(incoming_request: Request): return current_user.is_authenticated def is_user_authenticated(incoming_request: Request): authentication_mapper = { "oidc_proxy": _oidc_proxy_authentication_process, "local": _local_authentication_process, "ldap": _local_authentication_process, "oidc": _local_authentication_process, } return authentication_mapper.get(app.config.get("AUTHENTICATION_TYPE"))(incoming_request) def is_authentication_oidc(): return app.config.get('AUTHENTICATION_TYPE') == "oidc" def is_authentication_ldap(): return app.config.get('AUTHENTICATION_TYPE') == "ldap"