| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520 |
- # IRIS Source Code
- # Copyright (C) 2024 - DFIR-IRIS
- # contact@dfir-iris.org
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 3 of the License, or (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License
- # along with this program; if not, write to the Free Software Foundation,
- # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- import json
- import logging as log
- import traceback
- import uuid
- from functools import wraps
- import jwt
- import requests
- from flask import Request
- from flask import url_for
- from flask import request
- from flask import render_template
- from flask import session
- from flask_login import current_user
- from flask_login import login_user
- from flask_wtf import FlaskForm
- from jwt import PyJWKClient
- from requests.auth import HTTPBasicAuth
- from werkzeug.utils import redirect
- from app import TEMPLATE_PATH
- from app import app
- from app import db
- from app.blueprints.responses import response_error
- from app.datamgmt.case.case_db import get_case
- from app.datamgmt.manage.manage_access_control_db import user_has_client_access
- from app.datamgmt.manage.manage_users_db import get_user
- from app.iris_engine.access_control.utils import ac_fast_check_user_has_case_access
- from app.iris_engine.access_control.utils import ac_get_effective_permissions_of_user
- from app.iris_engine.utils.tracker import track_activity
- from app.models.cases import Cases
- from app.models.authorization import Permissions
- from app.models.authorization import CaseAccessLevel
- def _user_has_at_least_a_required_permission(permissions: list[Permissions]):
- """
- Returns true as soon as the user has at least one permission in the list of permissions
- Returns true if the list of required permissions is empty
- """
- if not permissions:
- return True
- for permission in permissions:
- if session['permissions'] & permission.value:
- return True
- return False
- def _set_caseid_from_current_user():
- redir = False
- if current_user.ctx_case is None:
- redir = True
- current_user.ctx_case = 1
- caseid = current_user.ctx_case
- return redir, caseid
- def _log_exception_and_error(e):
- log.exception(e)
- log.error(traceback.print_exc())
- def _get_caseid_from_request_data(request_data, no_cid_required):
- caseid = request_data.args.get('cid', default=None, type=int)
- if caseid:
- return False, caseid, True
- if no_cid_required:
- return False, caseid, True
- js_d = None
- try:
- if request_data.content_type == 'application/json':
- js_d = request_data.get_json()
- if not js_d:
- redir, caseid = _set_caseid_from_current_user()
- return redir, caseid, True
- if 'cid' not in js_d:
- cookie_session = request_data.cookies.get('session')
- if not cookie_session:
- redir, caseid = _set_caseid_from_current_user()
- return redir, caseid, True
- caseid = js_d.get('cid')
- return False, caseid, True
- except Exception as e:
- cookie_session = request_data.cookies.get('session')
- if not cookie_session:
- redir, caseid = _set_caseid_from_current_user()
- return redir, caseid, True
- _log_exception_and_error(e)
- return True, 0, False
- def _handle_no_cid_required(no_cid_required):
- if no_cid_required:
- js_d = request.get_json(silent=True)
- try:
- if type(js_d) == str:
- js_d = json.loads(js_d)
- caseid = js_d.get('cid') if type(js_d) == dict else None
- if caseid and 'cid' in request.json:
- request.json.pop('cid')
- except Exception:
- return None, False
- return caseid, True
- return None, False
- def _update_denied_case(caseid):
- session['current_case'] = {
- 'case_name': "{} to #{}".format("Access denied", caseid),
- 'case_info': "",
- 'case_id': caseid,
- 'access': '<i class="ml-2 text-danger mt-1 fa-solid fa-ban"></i>'
- }
- def _update_current_case(caseid, restricted_access):
- if session['current_case']['case_id'] != caseid:
- case = get_case(caseid)
- if case:
- session['current_case'] = {
- 'case_name': "{}".format(case.name),
- 'case_info': "(#{} - {})".format(caseid, case.client.name),
- 'case_id': caseid,
- 'access': restricted_access
- }
- def _update_session(caseid, eaccess_level):
- restricted_access = ''
- if not eaccess_level:
- eaccess_level = [CaseAccessLevel.read_only, CaseAccessLevel.full_access]
- if CaseAccessLevel.read_only.value == eaccess_level:
- restricted_access = '<i class="ml-2 text-warning mt-1 fa-solid fa-lock" title="Read only access"></i>'
- _update_current_case(caseid, restricted_access)
- # TODO would be nice to remove parameter no_cid_required
- def _get_case_access(request_data, access_level, no_cid_required=False):
- redir, caseid, has_access = _get_caseid_from_request_data(request_data, no_cid_required)
- ctmp, has_access = _handle_no_cid_required(no_cid_required)
- redir = False
- if ctmp is not None:
- return redir, ctmp, has_access
- eaccess_level = ac_fast_check_user_has_case_access(current_user.id, caseid, access_level)
- if eaccess_level is None and access_level:
- _update_denied_case(caseid)
- return redir, caseid, False
- _update_session(caseid, eaccess_level)
- if caseid is not None and not get_case(caseid):
- log.warning('No case found. Using default case')
- return True, 1, True
- return redir, caseid, True
- def _is_csrf_token_valid():
- if request.method != 'POST':
- return True
- if request.headers.get('X-IRIS-AUTH') is not None:
- return True
- if request.headers.get('Authorization') is not None:
- return True
- cookie_session = request.cookies.get('session')
- # True in the absence of a session cookie, because no CSRF token is required for API calls
- if not cookie_session:
- return True
- form = FlaskForm()
- if not form.validate():
- return False
- # TODO not nice to have a side-effect within a 'is' method.
- if request.is_json:
- request.json.pop('csrf_token')
- return True
- def _ac_return_access_denied(caseid: int = None):
- error_uuid = uuid.uuid4()
- log.warning(f"Access denied to case #{caseid} for user ID {current_user.id}. Error {error_uuid}")
- return render_template('pages/error-403.html', user=current_user, caseid=caseid, error_uuid=error_uuid,
- template_folder=TEMPLATE_PATH), 403
- def ac_requires_case_identifier(*access_level):
- def decorate_with_requires_case_identifier(f):
- @wraps(f)
- def wrap(*args, **kwargs):
- try:
- redir, caseid, has_access = get_case_access_from_api(request, access_level)
- except Exception as e:
- log.exception(e)
- return response_error('Invalid data. Check server logs', status=500)
- if not caseid and not redir:
- return response_error('Invalid case ID', status=404)
- if not has_access:
- return ac_api_return_access_denied(caseid=caseid)
- kwargs.update({'caseid': caseid})
- return f(*args, **kwargs)
- return wrap
- return decorate_with_requires_case_identifier
- def get_case_access_from_api(request_data, access_level):
- redir, caseid, has_access = _get_caseid_from_request_data(request_data, False)
- redir = False
- if not hasattr(current_user, 'id'):
- # Anonymous request, deny access
- return False, 1, False
- eaccess_level = ac_fast_check_user_has_case_access(current_user.id, caseid, access_level)
- if eaccess_level is None and access_level:
- return redir, caseid, False
- if caseid is not None and not get_case(caseid):
- log.warning('No case found. Using default case')
- return True, 1, True
- return redir, caseid, True
- def not_authenticated_redirection_url(request_url: str):
- redirection_mapper = {
- "oidc_proxy": lambda: app.config.get("AUTHENTICATION_PROXY_LOGOUT_URL"),
- "local": lambda: url_for('login.login', next=request_url),
- "ldap": lambda: url_for('login.login', next=request_url),
- "oidc": lambda: url_for('login.login', next=request_url,)
- }
- return redirection_mapper.get(app.config.get("AUTHENTICATION_TYPE"))()
- def ac_case_requires(*access_level):
- def inner_wrap(f):
- @wraps(f)
- def wrap(*args, **kwargs):
- if not is_user_authenticated(request):
- return redirect(not_authenticated_redirection_url(request.full_path))
- redir, caseid, has_access = _get_case_access(request, access_level)
- if not has_access:
- return _ac_return_access_denied(caseid=caseid)
- kwargs.update({"caseid": caseid, "url_redir": redir})
- return f(*args, **kwargs)
- return wrap
- return inner_wrap
- # TODO try to remove option no_cid_required
- def ac_requires(*permissions, no_cid_required=False):
- def inner_wrap(f):
- @wraps(f)
- def wrap(*args, **kwargs):
- if not is_user_authenticated(request):
- return redirect(not_authenticated_redirection_url(request.full_path))
- redir, caseid, _ = _get_case_access(request, [], no_cid_required=no_cid_required)
- kwargs.update({'caseid': caseid, 'url_redir': redir})
- if not _user_has_at_least_a_required_permission(permissions):
- return _ac_return_access_denied()
- return f(*args, **kwargs)
- return wrap
- return inner_wrap
- def ac_api_requires(*permissions):
- def inner_wrap(f):
- @wraps(f)
- def wrap(*args, **kwargs):
- if not _is_csrf_token_valid():
- return response_error('Invalid CSRF token')
- if not is_user_authenticated(request):
- return response_error('Authentication required', status=401)
- if 'permissions' not in session:
- session['permissions'] = ac_get_effective_permissions_of_user(current_user)
- if not _user_has_at_least_a_required_permission(permissions):
- return response_error('Permission denied', status=403)
- return f(*args, **kwargs)
- return wrap
- return inner_wrap
- def ac_requires_client_access():
- def inner_wrap(f):
- @wraps(f)
- def wrap(*args, **kwargs):
- client_id = kwargs.get('client_id')
- if not user_has_client_access(current_user.id, client_id):
- return _ac_return_access_denied()
- return f(*args, **kwargs)
- return wrap
- return inner_wrap
- def ac_socket_requires(*access_level):
- def inner_wrap(f):
- @wraps(f)
- def wrap(*args, **kwargs):
- if not is_user_authenticated(request):
- return redirect(not_authenticated_redirection_url(request.full_path))
- else:
- chan_id = args[0].get('channel')
- if chan_id:
- case_id = int(chan_id.replace('case-', '').split('-')[0])
- else:
- return _ac_return_access_denied(caseid=0)
- access = ac_fast_check_user_has_case_access(current_user.id, case_id, access_level)
- if not access:
- return _ac_return_access_denied(caseid=case_id)
- return f(*args, **kwargs)
- return wrap
- return inner_wrap
- def ac_api_return_access_denied(caseid: int = None):
- user_id = current_user.id if hasattr(current_user, 'id') else 'Anonymous'
- error_uuid = uuid.uuid4()
- log.warning(f"EID {error_uuid} - Access denied with case #{caseid} for user ID {user_id} "
- f"accessing URI {request.full_path}")
- data = {
- 'user_id': user_id,
- 'case_id': caseid,
- 'error_uuid': error_uuid
- }
- return response_error('Permission denied', data=data, status=403)
- def ac_api_requires_client_access():
- def inner_wrap(f):
- @wraps(f)
- def wrap(*args, **kwargs):
- client_id = kwargs.get('client_id')
- if not user_has_client_access(current_user.id, client_id):
- return response_error("Permission denied", status=403)
- return f(*args, **kwargs)
- return wrap
- return inner_wrap
- def _authenticate_with_email(user_email):
- user = get_user(user_email, id_key="email")
- if not user:
- log.error(f'User with email {user_email} is not registered in the IRIS')
- return False
- login_user(user)
- track_activity(f"User '{user.id}' successfully logged-in", ctx_less=True)
- caseid = user.ctx_case
- session['permissions'] = ac_get_effective_permissions_of_user(user)
- if caseid is None:
- case = Cases.query.order_by(Cases.case_id).first()
- user.ctx_case = case.case_id
- user.ctx_human_case = case.name
- db.session.commit()
- session['current_case'] = {
- 'case_name': user.ctx_human_case,
- 'case_info': "",
- 'case_id': user.ctx_case
- }
- return True
- def _oidc_proxy_authentication_process(incoming_request: Request):
- # Get the OIDC JWT authentication token from the request header
- authentication_token = incoming_request.headers.get('X-Forwarded-Access-Token', '')
- if app.config.get("AUTHENTICATION_TOKEN_VERIFY_MODE") == 'lazy':
- user_email = incoming_request.headers.get('X-Email')
- if user_email:
- return _authenticate_with_email(user_email.split(',')[0])
- elif app.config.get("AUTHENTICATION_TOKEN_VERIFY_MODE") == 'introspection':
- # Use the authentication server's token introspection endpoint in order to determine if the request is valid /
- # authenticated. The TLS_ROOT_CA is used to validate the authentication server's certificate.
- # The other solution was to skip the certificate verification, BUT as the authentication server might be
- # located on another server, this check is necessary.
- introspection_body = {"token": authentication_token}
- introspection = requests.post(
- app.config.get("AUTHENTICATION_TOKEN_INTROSPECTION_URL"),
- auth=HTTPBasicAuth(app.config.get('AUTHENTICATION_CLIENT_ID'), app.config.get('AUTHENTICATION_CLIENT_SECRET')),
- data=introspection_body,
- verify=app.config.get("TLS_ROOT_CA")
- )
- if introspection.status_code == 200:
- response_json = introspection.json()
- if response_json.get("active", False) is True:
- user_email = response_json.get("sub")
- return _authenticate_with_email(user_email=user_email)
- else:
- log.info("USER IS NOT AUTHENTICATED")
- return False
- elif app.config.get("AUTHENTICATION_TOKEN_VERIFY_MODE") == 'signature':
- # Use the JWKS urls provided by the OIDC discovery to fetch the signing keys
- # and check the signature of the token
- try:
- jwks_client = PyJWKClient(app.config.get("AUTHENTICATION_JWKS_URL"))
- signing_key = jwks_client.get_signing_key_from_jwt(authentication_token)
- try:
- data = jwt.decode(
- authentication_token,
- signing_key.key,
- algorithms=["RS256"],
- audience=app.config.get("AUTHENTICATION_AUDIENCE"),
- options={"verify_exp": app.config.get("AUTHENTICATION_VERIFY_TOKEN_EXP")},
- )
- except jwt.ExpiredSignatureError:
- log.error("Provided token has expired")
- return False
- except Exception as e:
- log.error(f"Error decoding JWT. {e.__str__()}")
- return False
- # Extract the user email
- user_email = data.get("sub")
- return _authenticate_with_email(user_email)
- else:
- log.error("ERROR DURING TOKEN INTROSPECTION PROCESS")
- return False
- def _local_authentication_process(incoming_request: Request):
- return current_user.is_authenticated
- def is_user_authenticated(incoming_request: Request):
- authentication_mapper = {
- "oidc_proxy": _oidc_proxy_authentication_process,
- "local": _local_authentication_process,
- "ldap": _local_authentication_process,
- "oidc": _local_authentication_process,
- }
- return authentication_mapper.get(app.config.get("AUTHENTICATION_TYPE"))(incoming_request)
- def is_authentication_oidc():
- return app.config.get('AUTHENTICATION_TYPE') == "oidc"
- def is_authentication_ldap():
- return app.config.get('AUTHENTICATION_TYPE') == "ldap"
|