| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110 |
- # IRIS Source Code
- # Copyright (C) 2024 - DFIR-IRIS
- # contact@dfir-iris.org
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 3 of the License, or (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License
- # along with this program; if not, write to the Free Software Foundation,
- # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- import json
- import marshmallow
- from datetime import datetime
- from flask import Blueprint
- from flask import request
- from flask_login import current_user
- from typing import List
- from werkzeug import Response
- import app
- from app import db
- from app.blueprints.rest.endpoints import endpoint_deprecated
- from app.blueprints.rest.parsing import parse_comma_separated_identifiers
- from app.blueprints.rest.case_comments import case_comment_update
- from app.datamgmt.alerts.alerts_db import get_filtered_alerts
- from app.datamgmt.alerts.alerts_db import get_alert_by_id
- from app.datamgmt.alerts.alerts_db import create_case_from_alert
- from app.datamgmt.alerts.alerts_db import delete_related_alerts_cache
- from app.datamgmt.alerts.alerts_db import merge_alert_in_case
- from app.datamgmt.alerts.alerts_db import unmerge_alert_from_case
- from app.datamgmt.alerts.alerts_db import cache_similar_alert
- from app.datamgmt.alerts.alerts_db import get_related_alerts
- from app.datamgmt.alerts.alerts_db import get_related_alerts_details
- from app.datamgmt.alerts.alerts_db import get_alert_comments
- from app.datamgmt.alerts.alerts_db import delete_alert_comment
- from app.datamgmt.alerts.alerts_db import get_alert_comment
- from app.datamgmt.alerts.alerts_db import delete_similar_alert_cache
- from app.datamgmt.alerts.alerts_db import delete_alerts
- from app.datamgmt.alerts.alerts_db import create_case_from_alerts
- from app.datamgmt.case.case_db import get_case
- from app.datamgmt.manage.manage_access_control_db import check_ua_case_client
- from app.datamgmt.manage.manage_access_control_db import user_has_client_access
- from app.iris_engine.access_control.utils import ac_set_new_case_access
- from app.iris_engine.module_handler.module_handler import call_modules_hook
- from app.iris_engine.utils.tracker import track_activity
- from app.models.alerts import AlertStatus
- from app.models.authorization import Permissions
- from app.schema.marshables import AlertSchema
- from app.schema.marshables import CaseSchema
- from app.schema.marshables import CommentSchema
- from app.schema.marshables import CaseAssetsSchema
- from app.schema.marshables import IocSchema
- from app.blueprints.access_controls import ac_api_requires
- from app.blueprints.responses import response_error
- from app.util import add_obj_history_entry
- from app.blueprints.responses import response_success
- alerts_rest_blueprint = Blueprint('alerts_rest', __name__)
- @alerts_rest_blueprint.route('/alerts/filter', methods=['GET'])
- @endpoint_deprecated('GET', '/api/v2/alerts')
- @ac_api_requires(Permissions.alerts_read)
- def alerts_list_route() -> Response:
- """
- Get a list of alerts from the database
- args:
- caseid (str): The case id
- returns:
- Response: The response
- """
- page = request.args.get('page', 1, type=int)
- per_page = request.args.get('per_page', 10, type=int)
- alert_ids_str = request.args.get('alert_ids')
- alert_ids = None
- if alert_ids_str:
- try:
- if ',' in alert_ids_str:
- alert_ids = [int(alert_id) for alert_id in alert_ids_str.split(',')]
- else:
- alert_ids = [int(alert_ids_str)]
- except ValueError:
- return response_error('Invalid alert id')
- alert_assets_str = request.args.get('alert_assets')
- alert_assets = None
- if alert_assets_str:
- try:
- if ',' in alert_assets_str:
- alert_assets = [str(alert_asset) for alert_asset in alert_assets_str.split(',')]
- else:
- alert_assets = [str(alert_assets_str)]
- except ValueError:
- return response_error('Invalid alert asset')
- alert_iocs_str = request.args.get('alert_iocs')
- alert_iocs = None
- if alert_iocs_str:
- try:
- if ',' in alert_iocs_str:
- alert_iocs = [str(alert_ioc) for alert_ioc in alert_iocs_str.split(',')]
- else:
- alert_iocs = [str(alert_iocs_str)]
- except ValueError:
- return response_error('Invalid alert ioc')
- fields_str = request.args.get('fields')
- if fields_str:
- # Split into a list
- fields = [field.strip() for field in fields_str.split(',') if field.strip()]
- else:
- fields = None
- try:
- filtered_alerts = get_filtered_alerts(
- start_date=request.args.get('creation_start_date'),
- end_date=request.args.get('creation_end_date'),
- source_start_date=request.args.get('source_start_date'),
- source_end_date=request.args.get('source_end_date'),
- source_reference=request.args.get('source_reference'),
- title=request.args.get('alert_title'),
- description=request.args.get('alert_description'),
- status=request.args.get('alert_status_id', type=int),
- severity=request.args.get('alert_severity_id', type=int),
- owner=request.args.get('alert_owner_id', type=int),
- source=request.args.get('alert_source'),
- tags=request.args.get('alert_tags'),
- classification=request.args.get('alert_classification_id', type=int),
- client=request.args.get('alert_customer_id'),
- case_id=request.args.get('case_id', type=int),
- alert_ids=alert_ids,
- page=page,
- per_page=per_page,
- sort=request.args.get('sort', 'desc', type=str),
- custom_conditions=request.args.get('custom_conditions'),
- assets=alert_assets,
- iocs=alert_iocs,
- resolution_status=request.args.get('alert_resolution_id', type=int),
- current_user_id=current_user.id
- )
- except Exception as e:
- app.app.logger.exception(e)
- return response_error(str(e))
- if filtered_alerts is None:
- return response_error('Filtering error')
- # If fields are provided, use them in the schema
- if fields:
- try:
- alert_schema = AlertSchema(only=fields)
- except Exception as e:
- app.app.logger.exception(f"Error selecting fields in AlertSchema: {str(e)}")
- alert_schema = AlertSchema()
- else:
- alert_schema = AlertSchema()
- filtered_data = {
- 'total': filtered_alerts.total,
- 'alerts': alert_schema.dump(filtered_alerts, many=True),
- 'last_page': filtered_alerts.pages,
- 'current_page': filtered_alerts.page,
- 'next_page': filtered_alerts.next_num if filtered_alerts.has_next else None,
- }
- return response_success(data=filtered_data)
- @alerts_rest_blueprint.route('/alerts/add', methods=['POST'])
- @ac_api_requires(Permissions.alerts_write)
- def alerts_add_route() -> Response:
- """
- Add a new alert to the database
- args:
- caseid (str): The case id
- returns:
- Response: The response
- """
- if not request.json:
- return response_error('No JSON data provided')
- alert_schema = AlertSchema()
- ioc_schema = IocSchema()
- asset_schema = CaseAssetsSchema()
- try:
- # Load the JSON data from the request
- data = request.get_json()
- iocs_list = data.pop('alert_iocs', [])
- assets_list = data.pop('alert_assets', [])
- iocs = ioc_schema.load(iocs_list, many=True, partial=True)
- assets = asset_schema.load(assets_list, many=True, partial=True)
- # Deserialize the JSON data into an Alert object
- new_alert = alert_schema.load(data)
- # Verify the user is entitled to create an alert for the client
- if not user_has_client_access(current_user.id, new_alert.alert_customer_id):
- return response_error('User not entitled to create alerts for the client')
- new_alert.alert_creation_time = datetime.utcnow()
- new_alert.iocs = iocs
- new_alert.assets = assets
- # Add the new alert to the session and commit it
- db.session.add(new_alert)
- db.session.commit()
- # Add history entry
- add_obj_history_entry(new_alert, 'Alert created')
- # Cache the alert for similarities check
- cache_similar_alert(new_alert.alert_customer_id, assets=assets_list,
- iocs=iocs_list, alert_id=new_alert.alert_id,
- creation_date=new_alert.alert_source_event_time)
- #register_related_alerts(new_alert, assets_list=assets, iocs_list=iocs)
-
- new_alert = call_modules_hook('on_postload_alert_create', data=new_alert)
- track_activity(f"created alert #{new_alert.alert_id} - {new_alert.alert_title}", ctx_less=True)
- # Emit a socket io event
- app.socket_io.emit('new_alert', json.dumps({
- 'alert_id': new_alert.alert_id
- }), namespace='/alerts')
- # Return the newly created alert as JSON
- return response_success(data=alert_schema.dump(new_alert))
- except Exception as e:
- app.app.logger.exception(e)
- # Handle any errors during deserialization or DB operations
- return response_error(str(e))
- @alerts_rest_blueprint.route('/alerts/<int:alert_id>', methods=['GET'])
- @ac_api_requires(Permissions.alerts_read)
- def alerts_get_route(alert_id) -> Response:
- """
- Get an alert from the database
- args:
- caseid (str): The case id
- alert_id (int): The alert id
- returns:
- Response: The response
- """
- alert_schema = AlertSchema()
- # Get the alert from the database
- alert = get_alert_by_id(alert_id)
- # Return the alert as JSON
- if alert is None:
- return response_error('Alert not found')
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('Alert not found')
- alert_dump = alert_schema.dump(alert)
- # Get similar alerts
- similar_alerts = get_related_alerts(alert.alert_customer_id, alert.assets, alert.iocs)
- alert_dump['related_alerts'] = similar_alerts
- return response_success(data=alert_dump)
- @alerts_rest_blueprint.route('/alerts/similarities/<int:alert_id>', methods=['GET'])
- @ac_api_requires(Permissions.alerts_read)
- def alerts_similarities_route(alert_id) -> Response:
- """
- Get an alert and similarities from the database
- args:
- caseid (str): The case id
- alert_id (int): The alert id
- returns:
- Response: The response
- """
- # Get the alert from the database
- alert = get_alert_by_id(alert_id)
- # Return the alert as JSON
- if alert is None:
- return response_error('Alert not found')
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('Alert not found')
- open_alerts = request.args.get('open-alerts', 'false').lower() == 'true'
- open_cases = request.args.get('open-cases', 'false').lower() == 'true'
- closed_cases = request.args.get('closed-cases', 'false').lower() == 'true'
- closed_alerts = request.args.get('closed-alerts', 'false').lower() == 'true'
- days_back = request.args.get('days-back', 180, type=int)
- number_of_results = request.args.get('number-of-nodes', 100, type=int)
- if number_of_results < 0:
- number_of_results = 100
- if days_back < 0:
- days_back = 180
- # Get similar alerts
- similar_alerts = get_related_alerts_details(alert.alert_customer_id, alert.assets, alert.iocs,
- open_alerts=open_alerts, open_cases=open_cases,
- closed_cases=closed_cases, closed_alerts=closed_alerts,
- days_back=days_back, number_of_results=number_of_results)
- return response_success(data=similar_alerts)
- @alerts_rest_blueprint.route('/alerts/update/<int:alert_id>', methods=['POST'])
- @ac_api_requires(Permissions.alerts_write)
- def alerts_update_route(alert_id) -> Response:
- """
- Update an alert in the database
- args:
- caseid (str): The case id
- alert_id (int): The alert id
- returns:
- Response: The response
- """
- if not request.json:
- return response_error('No JSON data provided')
- alert = get_alert_by_id(alert_id)
- if not alert:
- return response_error('Alert not found')
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('User not entitled to update alerts for the client', status=403)
- alert_schema = AlertSchema()
- do_resolution_hook = False
- do_status_hook = False
- try:
- # Load the JSON data from the request
- data = request.get_json()
- activity_data = []
- for key, value in data.items():
- old_value = getattr(alert, key, None)
- if type(old_value) is int:
- old_value = str(old_value)
- if type(value) is int:
- value = str(value)
- if old_value != value:
- if key == "alert_resolution_status_id":
- do_resolution_hook = True
- if key == 'alert_status_id':
- do_status_hook = True
- if key not in ["alert_content", "alert_note"]:
- activity_data.append(f"\"{key}\" from \"{old_value}\" to \"{value}\"")
- else:
- activity_data.append(f"\"{key}\"")
- # Deserialize the JSON data into an Alert object
- updated_alert = alert_schema.load(data, instance=alert, partial=True)
- if data.get('alert_owner_id') is None and updated_alert.alert_owner_id is None:
- updated_alert.alert_owner_id = current_user.id
- if data.get('alert_owner_id') == "-1" or data.get('alert_owner_id') == -1:
- updated_alert.alert_owner_id = None
- # Save the changes
- db.session.commit()
- updated_alert = call_modules_hook('on_postload_alert_update', data=updated_alert)
- if do_resolution_hook:
- updated_alert = call_modules_hook('on_postload_alert_resolution_update', data=updated_alert)
- if do_status_hook:
- updated_alert = call_modules_hook('on_postload_alert_status_update', data=updated_alert)
- if activity_data:
- activity_data_as_string = ','.join(activity_data)
- track_activity(f'updated alert #{alert_id}: {activity_data_as_string}', ctx_less=True)
- add_obj_history_entry(updated_alert, f'updated alert: {activity_data_as_string}')
- else:
- track_activity(f'updated alert #{alert_id}', ctx_less=True)
- add_obj_history_entry(updated_alert, 'updated alert')
- db.session.commit()
- # Return the updated alert as JSON
- return response_success(data=alert_schema.dump(updated_alert))
- except Exception as e:
- # Handle any errors during deserialization or DB operations
- return response_error(str(e))
- @alerts_rest_blueprint.route('/alerts/batch/update', methods=['POST'])
- @ac_api_requires(Permissions.alerts_write)
- def alerts_batch_update_route() -> Response:
- """
- Update multiple alerts in the database
- args:
- caseid (int): The case id
- returns:
- Response: The response
- """
- if not request.json:
- return response_error('No JSON data provided')
- # Load the JSON data from the request
- data = request.get_json()
- # Get the list of alert IDs and updates from the request data
- alert_ids: List[int] = data.get('alert_ids', [])
- updates = data.get('updates', {})
- if not updates.get('alert_tags'):
- updates.pop('alert_tags', None)
- if not alert_ids:
- return response_error('No alert IDs provided')
- alert_schema = AlertSchema()
- # Process each alert ID
- for alert_id in alert_ids:
- alert = get_alert_by_id(alert_id)
- if not alert:
- return response_error(f'Alert with ID {alert_id} not found')
- try:
- activity_data = []
- for key, value in updates.items():
- old_value = getattr(alert, key, None)
- if old_value != value:
- activity_data.append(f"\"{key}\"")
- # Check if the user has access to the client
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('User not entitled to update alerts for the client', status=403)
- if getattr(alert, 'alert_owner_id') is None:
- updates['alert_owner_id'] = current_user.id
- if data.get('alert_owner_id') == "-1" or data.get('alert_owner_id') == -1:
- updates['alert_owner_id'] = None
- # Deserialize the JSON data into an Alert object
- alert_schema.load(updates, instance=alert, partial=True)
- db.session.commit()
- alert = call_modules_hook('on_postload_alert_update', data=alert)
- if activity_data:
- track_activity(f"updated alert #{alert_id}: {','.join(activity_data)}", ctx_less=True)
- add_obj_history_entry(alert, f"updated alert: {','.join(activity_data)}")
- db.session.commit()
- except Exception as e:
- # Handle any errors during deserialization or DB operations
- return response_error(str(e))
- # Return a success response
- return response_success(msg='Batch update successful')
- @alerts_rest_blueprint.route('/alerts/batch/delete', methods=['POST'])
- @ac_api_requires(Permissions.alerts_delete)
- def alerts_batch_delete_route() -> Response:
- """
- Delete multiple alerts from the database
- args:
- caseid (int): The case id
- returns:
- Response: The response
- """
- if not request.json:
- return response_error('No JSON data provided')
- # Load the JSON data from the request
- data = request.get_json()
- # Get the list of alert IDs and updates from the request data
- alert_ids: List[int] = data.get('alert_ids', [])
- if not alert_ids:
- return response_error('No alert IDs provided')
- # Check if the user has access to the client
- for alert_id in alert_ids:
- alert = get_alert_by_id(alert_id)
- if not alert:
- return response_error(f'Alert with ID {alert_id} not found')
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('User not entitled to delete alerts for the client', status=403)
- success, logs = delete_alerts(alert_ids)
- if not success:
- return response_error(logs)
- alert = call_modules_hook('on_postload_alert_delete', data={"alert_ids": alert_ids})
- track_activity(f"deleted alerts #{','.join(str(alert_id) for alert_id in alert_ids)}", ctx_less=True)
- return response_success(msg='Batch delete successful')
- @alerts_rest_blueprint.route('/alerts/delete/<int:alert_id>', methods=['POST'])
- @ac_api_requires(Permissions.alerts_delete)
- def alerts_delete_route(alert_id) -> Response:
- """
- Delete an alert from the database
- args:
- caseid (str): The case id
- alert_id (int): The alert id
- returns:
- Response: The response
- """
- alert = get_alert_by_id(alert_id)
- if not alert:
- return response_error('Alert not found')
- try:
- # Check if the user has access to the client
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('User not entitled to delete alerts for the client', status=403)
- # Delete the case association
- delete_similar_alert_cache(alert_id=alert_id)
- # Delete the similarity entries
- delete_related_alerts_cache([alert_id])
- # Delete the alert from the database
- db.session.delete(alert)
- db.session.commit()
- alert = call_modules_hook('on_postload_alert_delete', data=alert_id)
- track_activity(f"delete alert #{alert_id}", ctx_less=True)
- # Return the deleted alert as JSON
- return response_success(data={'alert_id': alert_id})
- except Exception as e:
- # Handle any errors during deserialization or DB operations
- return response_error(str(e))
- @alerts_rest_blueprint.route('/alerts/escalate/<int:alert_id>', methods=['POST'])
- @ac_api_requires(Permissions.alerts_write)
- def alerts_escalate_route(alert_id) -> Response:
- """
- Escalate an alert
- args:
- caseid (str): The case id
- alert_id (int): The alert id
- returns:
- Response: The response
- """
- alert = get_alert_by_id(alert_id)
- if not alert:
- return response_error('Alert not found')
- if request.json is None:
- return response_error('No JSON data provided')
- data = request.get_json()
- iocs_import_list: List[str] = data.get('iocs_import_list')
- assets_import_list: List[str] = data.get('assets_import_list')
- note: str = data.get('note')
- import_as_event: bool = data.get('import_as_event')
- case_tags: str = data.get('case_tags')
- case_title: str = data.get('case_title')
- case_template_id: int = data.get('case_template_id', None)
- try:
- # Check if the user has access to the client
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('User not entitled to escalate alerts for the client', status=403)
- # Escalate the alert to a case
- alert.alert_status_id = AlertStatus.query.filter_by(status_name='Escalated').first().status_id
- db.session.commit()
- # Create a new case from the alert
- case = create_case_from_alert(alert, iocs_list=iocs_import_list, assets_list=assets_import_list, note=note,
- import_as_event=import_as_event, case_tags=case_tags, case_title=case_title,
- template_id=case_template_id)
- if not case:
- return response_error('Failed to create case from alert')
- ac_set_new_case_access(None, case.case_id, case.client_id)
- case = call_modules_hook('on_postload_case_create', data=case)
- add_obj_history_entry(case, 'created')
- track_activity("new case {case_name} created from alert".format(case_name=case.name),
- ctx_less=True)
- add_obj_history_entry(alert, f"Alert escalated to case #{case.case_id}")
- alert = call_modules_hook('on_postload_alert_escalate', data=alert)
- # Return the updated alert as JSON
- return response_success(data=CaseSchema().dump(case))
- except Exception as e:
- app.logger.exception(e)
- # Handle any errors during deserialization or DB operations
- return response_error(str(e))
- @alerts_rest_blueprint.route('/alerts/merge/<int:alert_id>', methods=['POST'])
- @ac_api_requires(Permissions.alerts_write)
- def alerts_merge_route(alert_id) -> Response:
- """
- Merge an alert into an existing case
- args:
- caseid (str): The case id
- alert_id (int): The alert id
- returns:
- Response: The response
- """
- if request.json is None:
- return response_error('No JSON data provided')
- data = request.get_json()
- target_case_id = data.get('target_case_id')
- if target_case_id is None:
- return response_error('No target case id provided')
- alert = get_alert_by_id(alert_id)
- if not alert:
- return response_error('Alert not found')
- case = get_case(target_case_id)
- if not case:
- return response_error('Target case not found')
- iocs_import_list: List[str] = data.get('iocs_import_list')
- assets_import_list: List[str] = data.get('assets_import_list')
- note: str = data.get('note')
- import_as_event: bool = data.get('import_as_event')
- case_tags = data.get('case_tags')
- try:
- # Check if the user has access to the client
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('User not entitled to merge alerts for the client', status=403)
- # Check if the user has access to the case
- if not check_ua_case_client(current_user.id, target_case_id):
- return response_error('User not entitled to merge alerts for the case', status=403)
- # Merge the alert into a case
- alert.alert_status_id = AlertStatus.query.filter_by(status_name='Merged').first().status_id
- db.session.commit()
- # Merge alert in the case
- merge_alert_in_case(alert, case,
- iocs_list=iocs_import_list, assets_list=assets_import_list, note=note,
- import_as_event=import_as_event, case_tags=case_tags)
- alert = call_modules_hook('on_postload_alert_merge', data=alert, caseid=target_case_id)
- track_activity(f"merge alert #{alert_id} into existing case #{target_case_id}", caseid=target_case_id)
- add_obj_history_entry(alert, f"Alert merged into existing case #{target_case_id}")
- # Return the updated alert as JSON
- return response_success(data=CaseSchema().dump(case))
- except Exception as e:
- app.app.logger.exception(e)
- # Handle any errors during deserialization or DB operations
- return response_error(str(e))
- @alerts_rest_blueprint.route('/alerts/unmerge/<int:alert_id>', methods=['POST'])
- @ac_api_requires(Permissions.alerts_write)
- def alerts_unmerge_route(alert_id) -> Response:
- """
- Unmerge an alert from a case
- args:
- caseid (str): The case id
- alert_id (int): The alert id
- returns:
- Response: The response
- """
- if request.json is None:
- return response_error('No JSON data provided')
- target_case_id = request.json.get('target_case_id')
- if target_case_id is None:
- return response_error('No target case id provided')
- alert = get_alert_by_id(alert_id)
- if not alert:
- return response_error('Alert not found')
- case = get_case(target_case_id)
- if not case:
- return response_error('Target case not found')
- try:
- # Check if the user has access to the client
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('User not entitled to unmerge alerts for the client', status=403)
- # Check if the user has access to the case
- if not check_ua_case_client(current_user.id, target_case_id):
- return response_error('User not entitled to unmerge alerts for the case', status=403)
- # Unmerge alert from the case
- success, message = unmerge_alert_from_case(alert, case)
- if success is False:
- return response_error(message)
- track_activity(f"unmerge alert #{alert_id} from case #{target_case_id}", caseid=target_case_id)
- add_obj_history_entry(alert, f"Alert unmerged from case #{target_case_id}")
- alert = call_modules_hook('on_postload_alert_unmerge', data=alert)
- # Return the updated case as JSON
- return response_success(data=AlertSchema().dump(alert), msg=message)
- except Exception as e:
- # Handle any errors during deserialization or DB operations
- return response_error(str(e))
- @alerts_rest_blueprint.route('/alerts/batch/merge', methods=['POST'])
- @ac_api_requires(Permissions.alerts_write)
- def alerts_batch_merge_route() -> Response:
- """
- Merge multiple alerts into a case
- args:
- caseid (str): The case id
- returns:
- Response: The response
- """
- if request.json is None:
- return response_error('No JSON data provided')
- data = request.get_json()
- target_case_id = data.get('target_case_id')
- if target_case_id is None:
- return response_error('No target case id provided')
- alert_ids = data.get('alert_ids')
- if not alert_ids:
- return response_error('No alert ids provided')
- case = get_case(target_case_id)
- if not case:
- return response_error('Target case not found')
- iocs_import_list: List[str] = data.get('iocs_import_list')
- assets_import_list: List[str] = data.get('assets_import_list')
- note: str = data.get('note')
- import_as_event: bool = data.get('import_as_event')
- case_tags = data.get('case_tags')
- # Check if the user has access to the case
- if not check_ua_case_client(current_user.id, target_case_id):
- return response_error('User not entitled to merge alerts for the case', status=403)
- try:
- # Merge the alerts into a case
- for alert_id in parse_comma_separated_identifiers(alert_ids):
- alert = get_alert_by_id(alert_id)
- if not alert:
- continue
- # Check if the user has access to the client
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('User not entitled to merge alerts for the client', status=403)
- alert.alert_status_id = AlertStatus.query.filter_by(status_name='Merged').first().status_id
- db.session.commit()
- # Merge alert in the case
- merge_alert_in_case(alert, case, iocs_list=iocs_import_list, assets_list=assets_import_list, note=None,
- import_as_event=import_as_event, case_tags=case_tags)
- add_obj_history_entry(alert, f"Alert merged into existing case #{target_case_id}")
- alert = call_modules_hook('on_postload_alert_merge', data=alert)
- if note:
- case.description += f"\n\n### Escalation note\n\n{note}\n\n" if case.description else f"\n\n{note}\n\n"
- db.session.commit()
- track_activity(f"batched merge alerts {alert_ids} into existing case #{target_case_id}",
- caseid=target_case_id)
- # Return the updated case as JSON
- return response_success(data=CaseSchema().dump(case))
- except Exception as e:
- app.app.logger.exception(e)
- # Handle any errors during deserialization or DB operations
- return response_error(str(e))
- @alerts_rest_blueprint.route('/alerts/batch/escalate', methods=['POST'])
- @ac_api_requires(Permissions.alerts_write)
- def alerts_batch_escalate_route() -> Response:
- """
- Escalate multiple alerts into a case
- args:
- caseid (str): The case id
- returns:
- Response: The response
- """
- if request.json is None:
- return response_error('No JSON data provided')
- data = request.get_json()
- alert_ids = data.get('alert_ids')
- if not alert_ids:
- return response_error('No alert ids provided')
- iocs_import_list: List[str] = data.get('iocs_import_list')
- assets_import_list: List[str] = data.get('assets_import_list')
- note: str = data.get('note')
- import_as_event: bool = data.get('import_as_event')
- case_tags = data.get('case_tags')
- case_title = data.get('case_title')
- alerts_list = []
- case_template_id: int = data.get('case_template_id', None)
- try:
- # Merge the alerts into a case
- for alert_id in parse_comma_separated_identifiers(alert_ids):
- alert = get_alert_by_id(alert_id)
- if not alert:
- continue
- # Check if the user has access to the client
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('User not entitled to escalate alerts for the client', status=403)
- alert.alert_status_id = AlertStatus.query.filter_by(status_name='Merged').first().status_id
- db.session.commit()
- alert = call_modules_hook('on_postload_alert_escalate', data=alert)
- alerts_list.append(alert)
- # Merge alerts in the case
- case = create_case_from_alerts(alerts_list, iocs_list=iocs_import_list, assets_list=assets_import_list,
- note=note, import_as_event=import_as_event, case_tags=case_tags,
- case_title=case_title, template_id=case_template_id)
- if not case:
- return response_error('Failed to create case from alert')
- ac_set_new_case_access(None, case.case_id, case.client_id)
- case = call_modules_hook('on_postload_case_create', data=case)
- add_obj_history_entry(case, 'created')
- track_activity("new case {case_name} created from alerts".format(case_name=case.name),
- caseid=case.case_id)
- for alert in alerts_list:
- add_obj_history_entry(alert, f"Alert escalated into new case #{case.case_id}")
- # Return the updated case as JSON
- return response_success(data=CaseSchema().dump(case))
- except Exception as e:
- app.app.logger.exception(e)
- # Handle any errors during deserialization or DB operations
- return response_error(str(e))
- @alerts_rest_blueprint.route('/alerts/<int:alert_id>/comments/list', methods=['GET'])
- @ac_api_requires(Permissions.alerts_read)
- def alert_comments_get(alert_id):
- """
- Get the comments for an alert
- args:
- alert_id (int): The alert id
- caseid (str): The case id
- returns:
- Response: The response
- """
- # Check if the user has access to the client
- alert = get_alert_by_id(alert_id)
- if not alert:
- return response_error('Invalid alert ID')
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('User not entitled to read alerts for the client', status=403)
- alert_comments = get_alert_comments(alert_id)
- if alert_comments is None:
- return response_error('Invalid alert ID')
- return response_success(data=CommentSchema(many=True).dump(alert_comments))
- @alerts_rest_blueprint.route('/alerts/<int:alert_id>/comments/<int:com_id>/delete', methods=['POST'])
- @ac_api_requires(Permissions.alerts_write)
- def alert_comment_delete(alert_id, com_id):
- """
- Delete a comment for an alert
- args:
- alert_id (int): The alert id
- com_id (int): The comment id
- caseid (str): The case id
- returns:
- Response: The response
- """
- # Check if the user has access to the client
- alert = get_alert_by_id(alert_id)
- if not alert:
- return response_error('Invalid alert ID')
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('User not entitled to read alerts for the client', status=403)
- success, msg = delete_alert_comment(comment_id=com_id, alert_id=alert_id)
- if not success:
- return response_error(msg)
- call_modules_hook('on_postload_alert_comment_delete', data=com_id)
- track_activity(f"comment {com_id} on alert {alert_id} deleted", ctx_less=True)
- return response_success(msg)
- @alerts_rest_blueprint.route('/alerts/<int:alert_id>/comments/<int:com_id>', methods=['GET'])
- @ac_api_requires(Permissions.alerts_read)
- def alert_comment_get(alert_id, com_id):
- """
- Get a comment for an alert
- args:
- cur_id (int): The alert id
- com_id (int): The comment id
- caseid (str): The case id
- returns:
- Response: The response
- """
- # Check if the user has access to the client
- alert = get_alert_by_id(alert_id)
- if not alert:
- return response_error('Invalid alert ID')
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('User not entitled to read alerts for the client', status=403)
- comment = get_alert_comment(alert_id, com_id)
- if not comment:
- return response_error("Invalid comment ID")
- return response_success(data=CommentSchema().dump(comment))
- @alerts_rest_blueprint.route('/alerts/<int:alert_id>/comments/<int:com_id>/edit', methods=['POST'])
- @ac_api_requires(Permissions.alerts_write)
- def alert_comment_edit(alert_id, com_id):
- """
- Edit a comment for an alert
- args:
- alert_id (int): The alert id
- com_id (int): The comment id
- caseid (str): The case id
- returns:
- Response: The response
- """
- alert = get_alert_by_id(alert_id)
- if not alert:
- return response_error('Invalid alert ID')
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('User not entitled to read alerts for the client', status=403)
- return case_comment_update(com_id, 'events', None)
- @alerts_rest_blueprint.route('/alerts/<int:alert_id>/comments/add', methods=['POST'])
- @ac_api_requires(Permissions.alerts_write)
- def case_comment_add(alert_id):
- """
- Add a comment to an alert
- args:
- alert_id (int): The alert id
- caseid (str): The case id
- returns:
- Response: The response
- """
- try:
- alert = get_alert_by_id(alert_id=alert_id)
- if not alert:
- return response_error('Invalid alert ID')
- if not user_has_client_access(current_user.id, alert.alert_customer_id):
- return response_error('User not entitled to read alerts for the client', status=403)
- comment_schema = CommentSchema()
- comment = comment_schema.load(request.get_json())
- comment.comment_alert_id = alert_id
- comment.comment_user_id = current_user.id
- comment.comment_date = datetime.now()
- comment.comment_update_date = datetime.now()
- db.session.add(comment)
- db.session.commit()
- add_obj_history_entry(alert, 'commented')
- db.session.commit()
- hook_data = {
- "comment": comment_schema.dump(comment),
- "alert": AlertSchema().dump(alert)
- }
- call_modules_hook('on_postload_alert_commented', data=hook_data)
- track_activity(f"alert \"{alert.alert_id}\" commented", ctx_less=True)
- return response_success("Alert commented", data=comment_schema.dump(comment))
- except marshmallow.exceptions.ValidationError as e:
- return response_error(msg="Data error", data=e.normalized_messages())
|