| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- # IRIS Source Code
- # Copyright (C) 2021 - Airbus CyberSecurity (SAS)
- # ir@cyberactionlab.net
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 3 of the License, or (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License
- # along with this program; if not, write to the Free Software Foundation,
- # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- import collections
- import json
- import logging as logger
- import os
- import urllib.parse
- from flask import Flask
- from flask import session
- from flask_bcrypt import Bcrypt
- from flask_caching import Cache
- from flask_login import LoginManager
- from flask_marshmallow import Marshmallow
- from flask_socketio import SocketIO, Namespace
- from flask_sqlalchemy import SQLAlchemy
- from functools import partial
- from werkzeug.middleware.proxy_fix import ProxyFix
- from app.flask_dropzone import Dropzone
- from app.iris_engine.tasker.celery import make_celery
- from app.iris_engine.access_control.oidc_handler import get_oidc_client
- class ReverseProxied(object):
- def __init__(self, flask_app):
- self._app = flask_app
- def __call__(self, environ, start_response):
- scheme = environ.get('HTTP_X_FORWARDED_PROTO', None)
- if scheme is not None:
- environ['wsgi.url_scheme'] = scheme
- return self._app(environ, start_response)
- class AlertsNamespace(Namespace):
- pass
- APP_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- TEMPLATE_PATH = os.path.join(APP_PATH, 'templates/')
- # Grabs the folder where the script runs.
- basedir = os.path.abspath(os.path.dirname(__file__))
- LOG_FORMAT = '%(asctime)s :: %(levelname)s :: %(module)s :: %(funcName)s :: %(message)s'
- LOG_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
- logger.basicConfig(level=logger.INFO, format=LOG_FORMAT, datefmt=LOG_TIME_FORMAT)
- app = Flask(__name__, static_folder='../static')
- # CORS(app,
- # supports_credentials=True,
- # resources={r"/api/*": {"origins": ["http://127.0.0.1:5137", "http://localhost:5173"]}})
- def ac_current_user_has_permission(*permissions):
- """
- Return True if current user has permission
- """
- for permission in permissions:
- if ('permissions' in session and
- session['permissions'] & permission.value == permission.value):
- return True
- return False
- def ac_current_user_has_manage_perms():
- if session['permissions'] != 1 and session['permissions'] & 0x1FFFFF0 != 0:
- return True
- return False
- app.jinja_env.filters['unquote'] = lambda u: urllib.parse.unquote(u)
- app.jinja_env.filters['tojsonsafe'] = lambda u: json.dumps(u, indent=4, ensure_ascii=False)
- app.jinja_env.filters['tojsonindent'] = lambda u: json.dumps(u, indent=4)
- app.jinja_env.filters['escape_dots'] = lambda u: u.replace('.', '[.]')
- app.jinja_env.globals.update(user_has_perm=ac_current_user_has_permission)
- app.jinja_env.globals.update(user_has_manage_perms=ac_current_user_has_manage_perms)
- app.jinja_options["autoescape"] = lambda _: True
- app.jinja_env.autoescape = True
- app.config.from_object('app.configuration.Config')
- app.config.update(
- SESSION_COOKIE_SECURE=True,
- SESSION_COOKIE_HTTPONLY=True,
- SESSION_COOKIE_SAMESITE='Lax',
- SOC_INTEGRATOR_URL=os.getenv('SOC_INTEGRATOR_URL', 'http://soc-integrator:8080'),
- )
- cache = Cache(app)
- SQLALCHEMY_ENGINE_OPTIONS = {
- "json_deserializer": partial(json.loads, object_pairs_hook=collections.OrderedDict),
- "pool_pre_ping": True
- }
- db = SQLAlchemy(app, engine_options=SQLALCHEMY_ENGINE_OPTIONS) # flask-sqlalchemy
- bc = Bcrypt(app) # flask-bcrypt
- lm = LoginManager() # flask-loginmanager
- lm.init_app(app) # init the login manager
- ma = Marshmallow(app) # Init marshmallow
- dropzone = Dropzone(app)
- celery = make_celery(app)
- # store = HttpExposedFileSystemStore(
- # path='images',
- # prefix='/static/assets/images/'
- # )
- app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1)
- #app.wsgi_app = store.wsgi_middleware(app.wsgi_app)
- socket_io = SocketIO(app, cors_allowed_origins="*")
- alerts_namespace = AlertsNamespace('/alerts')
- socket_io.on_namespace(alerts_namespace)
- oidc_client = None
- if app.config.get('AUTHENTICATION_TYPE') == "oidc":
- oidc_client = get_oidc_client(app)
- @app.teardown_appcontext
- def shutdown_session(exception=None):
- db.session.remove()
- @app.after_request
- def after_request(response):
- response.headers.add('Access-Control-Allow-Origin', '*')
- response.headers.add('Access-Control-Allow-Credentials', 'true')
- response.headers.add('Access-Control-Allow-Headers', 'Content-Type, Authorization')
- response.headers.add('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')
- return response
- from app.views import register_blusprints
- from app.views import load_user
- from app.views import load_user_from_request
- register_blusprints(app)
- from app.post_init import run_post_init
- try:
- run_post_init(development=app.config['DEVELOPMENT'])
- except Exception as e:
- app.logger.exception('Post init failed. IRIS not started')
- raise e
- lm.user_loader(load_user)
- lm.request_loader(load_user_from_request)
|