| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569 |
- # IRIS Source Code
- # Copyright (C) 2023 - DFIR-IRIS
- # contact@dfir-iris.org
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 3 of the License, or (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License
- # along with this program; if not, write to the Free Software Foundation,
- # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- from copy import deepcopy
- import json
- from datetime import datetime, timedelta
- from flask_login import current_user
- from sqlalchemy import desc, asc, func, tuple_, or_, not_, and_
- from sqlalchemy.orm import aliased, make_transient, selectinload
- from typing import List, Tuple
- import app
- from app import db
- from app.datamgmt.case.case_assets_db import create_asset
- from app.datamgmt.case.case_assets_db import set_ioc_links
- from app.datamgmt.case.case_assets_db import get_unspecified_analysis_status_id
- from app.datamgmt.case.case_events_db import update_event_assets
- from app.datamgmt.case.case_events_db import update_event_iocs
- from app.datamgmt.case.case_iocs_db import add_ioc
- from app.datamgmt.manage.manage_access_control_db import get_user_clients_id
- from app.datamgmt.manage.manage_case_state_db import get_case_state_by_name
- from app.datamgmt.manage.manage_case_templates_db import get_case_template_by_id
- from app.datamgmt.manage.manage_case_templates_db import case_template_post_modifier
- from app.datamgmt.states import update_timeline_state
- from app.iris_engine.access_control.utils import ac_current_user_has_permission
- from app.iris_engine.utils.common import parse_bf_date_format
- from app.models.cases import Cases
- from app.models.models import EventCategory
- from app.models.models import Tags
- from app.models.models import AssetsType
- from app.models.models import Comments
- from app.models.models import CaseAssets
- from app.models.models import alert_assets_association
- from app.models.models import alert_iocs_association
- from app.models.models import Ioc
- from app.models.models import Client
- from app.models.alerts import Alert
- from app.models.alerts import AlertStatus
- from app.models.alerts import AlertCaseAssociation
- from app.models.alerts import SimilarAlertsCache
- from app.models.alerts import AlertResolutionStatus
- from app.models.alerts import AlertSimilarity
- from app.models.alerts import Severity
- from app.models.authorization import Permissions
- from app.models.authorization import User
- from app.schema.marshables import EventSchema
- from app.util import add_obj_history_entry
- relationship_model_map = {
- 'owner': User,
- 'severity': Severity,
- 'status': AlertStatus,
- 'customer': Client,
- 'resolution_status': AlertResolutionStatus,
- 'cases': Cases,
- 'comments': Comments,
- 'assets': CaseAssets,
- 'iocs': Ioc
- }
- RESTRICTED_USER_FIELDS = {
- 'password',
- 'mfa_secrets',
- 'webauthn_credentials',
- 'api_key',
- 'external_id',
- 'ctx_case',
- 'ctx_human_case',
- 'is_service_account'
- }
- def db_list_all_alerts():
- """
- List all alerts in the database
- """
- return db.session.query(Alert).all()
- def build_condition(column, operator, value):
- if hasattr(column, 'property') and hasattr(column.property, 'local_columns'):
- # It's a relationship attribute
- fk_cols = list(column.property.local_columns)
- if operator in ['in', 'not_in']:
- if len(fk_cols) == 1:
- # Use the single FK column for the condition
- fk_col = fk_cols[0]
- if operator == 'in':
- return fk_col.in_(value)
- else:
- return ~fk_col.in_(value)
- else:
- raise NotImplementedError(
- "in_() on a relationship with multiple FK columns not supported. Specify a direct column.")
- else:
- raise ValueError(
- "Non-in operators on relationships require specifying a related model column, e.g., owner.id or assets.asset_name.")
- # If we get here, 'column' should be an actual column, not a relationship.
- if operator == 'not':
- return column != value
- elif operator == 'in':
- return column.in_(value)
- elif operator == 'not_in':
- return ~column.in_(value)
- elif operator == 'eq':
- return column == value
- elif operator == 'like':
- return column.ilike(f"%{value}%")
- else:
- raise ValueError(f"Unsupported operator: {operator}")
- def combine_conditions(conditions, logical_operator):
- if len(conditions) > 1:
- if logical_operator == 'or':
- return or_(*conditions)
- elif logical_operator == 'not':
- return not_(and_(*conditions))
- else: # Default to 'and'
- return and_(*conditions)
- elif conditions:
- return conditions[0]
- else:
- return None
- def get_filtered_alerts(
- start_date: str = None,
- end_date: str = None,
- source_start_date: str = None,
- source_end_date: str = None,
- title: str = None,
- description: str = None,
- status: int = None,
- severity: int = None,
- owner: int = None,
- source: str = None,
- tags: str = None,
- case_id: int = None,
- client: int = None,
- classification: int = None,
- alert_ids: List[int] = None,
- assets: List[str] = None,
- iocs: List[str] = None,
- resolution_status: List[int] = None,
- logical_operator: str = 'and', # Logical operator: 'and', 'or', 'not'
- page: int = 1,
- per_page: int = 10,
- sort: str = 'desc',
- current_user_id: int = None,
- source_reference=None,
- custom_conditions: List[dict] = None):
- """
- Get a list of alerts that match the given filter conditions
- args:
- start_date (datetime): The start date of the alert creation time
- end_date (datetime): The end date of the alert creation time
- title (str): The title of the alert
- description (str): The description of the alert
- status (str): The status of the alert
- severity (str): The severity of the alert
- owner (str): The owner of the alert
- source (str): The source of the alert
- tags (str): The tags of the alert
- case_id (int): The case id of the alert
- client (int): The client id of the alert
- classification (int): The classification id of the alert
- alert_ids (int): The alert ids
- assets (list): The assets of the alert
- iocs (list): The iocs of the alert
- resolution_status (list): The resolution status of the alert
- logical_operator (str): Logical operator to combine conditions ('and', 'or', 'not')
- page (int): The page number
- per_page (int): The number of alerts per page
- sort (str): The sort order
- current_user_id (int): The ID of the current user
- source_reference (str): Alert source reference
- custom_conditions (list): Custom conditions to be applied (e.g., NOT client AND owner_id in [1,2,3])
- returns:
- list: A list of alerts that match the given filter conditions
- ...
- fields (List[str]): The list of fields to include in the output
- returns:
- dict: Dictionary with pagination info and list of serialized alerts
- """
- conditions = []
- if start_date is not None and end_date is not None:
- start_date = parse_bf_date_format(start_date)
- end_date = parse_bf_date_format(end_date)
- if start_date and end_date:
- conditions.append(Alert.alert_creation_time.between(start_date, end_date))
- if source_start_date is not None and source_end_date is not None:
- source_start_date = parse_bf_date_format(source_start_date)
- source_end_date = parse_bf_date_format(source_end_date)
- if source_start_date and source_end_date:
- conditions.append(Alert.alert_source_event_time.between(source_start_date, source_end_date))
- if title is not None:
- conditions.append(Alert.alert_title.ilike(f'%{title}%'))
- if description is not None:
- conditions.append(Alert.alert_description.ilike(f'%{description}%'))
- if status is not None:
- conditions.append(Alert.alert_status_id == status)
- if severity is not None:
- conditions.append(Alert.alert_severity_id == severity)
- if resolution_status is not None:
- if isinstance(resolution_status, list):
- conditions.append(not_(Alert.alert_resolution_status_id.in_(resolution_status)))
- else:
- conditions.append(Alert.alert_resolution_status_id == resolution_status)
- if source_reference is not None:
- conditions.append(Alert.alert_source_ref.like(f'%{source_reference}%'))
- if owner is not None:
- if owner == -1:
- conditions.append(Alert.alert_owner_id.is_(None))
- else:
- conditions.append(Alert.alert_owner_id == owner)
- if source is not None:
- conditions.append(Alert.alert_source.ilike(f'%{source}%'))
- if tags is not None:
- conditions.append(Alert.alert_tags.ilike(f"%{tags}%"))
- if client is not None:
- conditions.append(Alert.alert_customer_id == client)
- if alert_ids is not None:
- if isinstance(alert_ids, list):
- conditions.append(Alert.alert_id.in_(alert_ids))
- if classification is not None:
- conditions.append(Alert.alert_classification_id == classification)
- if case_id is not None:
- conditions.append(Alert.cases.any(AlertCaseAssociation.case_id == case_id))
- if assets is not None:
- if isinstance(assets, list):
- conditions.append(Alert.assets.any(CaseAssets.asset_name.in_(assets)))
- if iocs is not None:
- if isinstance(iocs, list):
- conditions.append(Alert.iocs.any(Ioc.ioc_value.in_(iocs)))
- if current_user_id is not None and not ac_current_user_has_permission(Permissions.server_administrator):
- clients_filters = get_user_clients_id(current_user_id)
- if clients_filters is not None:
- conditions.append(Alert.alert_customer_id.in_(clients_filters))
- query = db.session.query(
- Alert
- ).options(
- selectinload(Alert.severity),
- selectinload(Alert.status),
- selectinload(Alert.customer),
- selectinload(Alert.cases),
- selectinload(Alert.iocs),
- selectinload(Alert.assets)
- )
- # Apply custom conditions if provided
- if custom_conditions:
- if isinstance(custom_conditions, str):
- try:
- custom_conditions = json.loads(custom_conditions)
- except:
- app.app.logger.exception(f"Error parsing custom_conditions: {custom_conditions}")
- return
- # Keep track of which relationships we've already joined
- joined_relationships = set()
- for custom_condition in custom_conditions:
- field_path = custom_condition['field']
- operator = custom_condition['operator']
- value = custom_condition['value']
- # Check if we need to handle a related field
- if '.' in field_path:
- relationship_name, related_field_name = field_path.split('.', 1)
- # Ensure the relationship name is known
- if relationship_name not in relationship_model_map:
- raise ValueError(f"Unknown relationship: {relationship_name}")
- if related_field_name in RESTRICTED_USER_FIELDS:
- app.logger.error(f"Access to the field '{related_field_name}' is restricted.")
- app.logger.error(f"Suspicious behavior detected for user {current_user.id} - {current_user.user}.")
- continue
- related_model = relationship_model_map[relationship_name]
- # Join the relationship if not already joined
- if relationship_name not in joined_relationships:
- query = query.join(getattr(Alert, relationship_name))
- joined_relationships.add(relationship_name)
- related_field = getattr(related_model, related_field_name, None)
- if related_field is None:
- raise ValueError(
- f"Field '{related_field_name}' not found in related model '{related_model.__name__}'")
- # Build the condition
- condition = build_condition(related_field, operator, value)
- conditions.append(condition)
- else:
- # Field belongs to Alert model
- field = getattr(Alert, field_path, None)
- if field is None:
- raise ValueError(f"Field '{field_path}' not found in Alert model")
- condition = build_condition(field, operator, value)
- conditions.append(condition)
- # Combine conditions
- combined_conditions = combine_conditions(conditions, logical_operator)
- order_func = desc if sort == "desc" else asc
- try:
- # Query the alerts using the filter conditions
- if combined_conditions is not None:
- query = query.filter(combined_conditions)
- filtered_alerts = query.order_by(
- order_func(Alert.alert_source_event_time)
- ).paginate(page=page, per_page=per_page, error_out=False)
- return filtered_alerts
- except Exception as e:
- app.app.logger.exception(f"Error getting alerts: {str(e)}")
- return None
- def add_alert(
- title,
- description,
- source,
- status,
- severity,
- owner
- ):
- """
- Add an alert to the database
- args:
- title (str): The title of the alert
- description (str): The description of the alert
- source (str): The source of the alert
- status (str): The status of the alert
- severity (str): The severity of the alert
- owner (str): The owner of the alert
- returns:
- Alert: The alert that was added to the database
- """
- # Create the alert
- alert = Alert()
- alert.alert_title = title
- alert.alert_description = description
- alert.alert_source = source
- alert.alert_status = status
- alert.alert_severity = severity
- alert.alert_owner_id = owner
- # Add the alert to the database
- db.session.add(alert)
- db.session.commit()
- return alert
- def get_alert_by_id(alert_id: int) -> Alert:
- """
- Get an alert from the database
- args:
- alert_id (int): The ID of the alert
- returns:
- Alert: The alert that was retrieved from the database
- """
- return (
- db.session.query(Alert)
- .options(selectinload(Alert.iocs), selectinload(Alert.assets))
- .filter(Alert.alert_id == alert_id)
- .first()
- )
- def get_unspecified_event_category():
- """
- Get the id of the 'Unspecified' event category
- """
- event_cat = EventCategory.query.filter(
- EventCategory.name == 'Unspecified'
- ).first()
- return event_cat
- def create_case_from_alerts(alerts: List[Alert], iocs_list: List[str], assets_list: List[str], case_title: str,
- note: str, import_as_event: bool, case_tags: str, template_id: int) -> Cases:
- """
- Create a case from multiple alerts
- args:
- alerts (Alert): The Alerts
- iocs_list (list): The list of IOCs
- assets_list (list): The list of assets
- note (str): The note to add to the case
- import_as_event (bool): Whether to import the alert as an event
- case_tags (str): The tags to add to the case
- case_title (str): The title of the case
- template_id (int): The ID of the template to use
- returns:
- Cases: The case that was created from the alert
- """
- escalation_note = ""
- if note:
- escalation_note = f"\n\n### Escalation note\n\n{note}\n\n"
- if template_id is not None and template_id != 0 and template_id != '':
- case_template = get_case_template_by_id(template_id)
- if case_template:
- case_template_title_prefix = case_template.title_prefix
- # Create the case
- case = Cases(
- name=f"[ALERT]{case_template_title_prefix} "
- f"Merge of alerts {', '.join([str(alert.alert_id) for alert in alerts])}" if not case_title else
- f"{case_template_title_prefix} {case_title}",
- description=f"*Alerts escalated by {current_user.name}*\n\n{escalation_note}"
- f"[Alerts link](/alerts?alert_ids={','.join([str(alert.alert_id) for alert in alerts])})",
- soc_id='',
- client_id=alerts[0].alert_customer_id,
- user=current_user,
- classification_id=alerts[0].alert_classification_id,
- state_id=get_case_state_by_name('Open').state_id
- )
- case.save()
- for tag in case_tags.split(','):
- tag = Tags(tag_title=tag)
- tag = tag.save()
- case.tags.append(tag)
- db.session.commit()
- # Link the alert to the case
- for alert in alerts:
- alert.cases.append(case)
- ioc_links = []
- asset_links = []
- # Add the IOCs to the case
- for ioc_uuid in iocs_list:
- for alert_ioc in alert.iocs:
- if str(alert_ioc.ioc_uuid) == ioc_uuid:
- add_ioc(alert_ioc, current_user.id, case.case_id)
- ioc_links.append(alert_ioc.ioc_id)
- # Add the assets to the case
- for asset_uuid in assets_list:
- for alert_asset in alert.assets:
- if str(alert_asset.asset_uuid) == asset_uuid:
- alert_asset.analysis_status_id = get_unspecified_analysis_status_id()
- asset = create_asset(asset=alert_asset,
- caseid=case.case_id,
- user_id=current_user.id
- )
- asset.asset_uuid = alert_asset.asset_uuid
- set_ioc_links(ioc_links, asset.asset_id)
- asset_links.append(asset.asset_id)
- # Add event to timeline
- if import_as_event:
- unspecified_cat = get_unspecified_event_category()
- event_schema = EventSchema()
- event = event_schema.load({
- 'event_title': f"[ALERT] {alert.alert_title}",
- 'event_content': alert.alert_description,
- 'event_source': alert.alert_source,
- 'event_raw': json.dumps(alert.alert_source_content, indent=4),
- 'event_date': alert.alert_source_event_time.strftime("%Y-%m-%dT%H:%M:%S.%f"),
- 'event_date_wtz': alert.alert_source_event_time.strftime("%Y-%m-%dT%H:%M:%S.%f"),
- 'event_iocs': ioc_links,
- 'event_assets': asset_links,
- 'event_tags': alert.alert_tags,
- 'event_tz': '+00:00',
- 'event_category_id': unspecified_cat.id,
- }, session=db.session)
- event.case_id = case.case_id
- event.user_id = current_user.id
- event.event_added = datetime.utcnow()
- add_obj_history_entry(event, 'created')
- db.session.add(event)
- update_timeline_state(caseid=case.case_id)
- event.category = [unspecified_cat]
- update_event_assets(event_id=event.event_id,
- caseid=case.case_id,
- assets_list=asset_links,
- iocs_list=ioc_links,
- sync_iocs_assets=False)
- update_event_iocs(event_id=event.event_id,
- caseid=case.case_id,
- iocs_list=ioc_links)
- if template_id is not None and template_id != 0 and template_id != '':
- case, logs = case_template_post_modifier(case, template_id)
- db.session.commit()
- return case
- def create_case_from_alert(alert: Alert, iocs_list: List[str], assets_list: List[str], case_title: str,
- note: str, import_as_event: bool, case_tags: str, template_id: int) -> Cases:
- """
- Create a case from an alert
- args:
- alert (Alert): The Alert
- iocs_list (list): The list of IOCs
- assets_list (list): The list of assets
- note (str): The note to add to the case
- import_as_event (bool): Whether to import the alert as an event
- case_tags (str): The tags to add to the case
- case_title (str): The title of the case
- template_id (int): The template to use for the case
- returns:
- Cases: The case that was created from the alert
- """
- escalation_note = ""
- if note:
- escalation_note = f"\n\n### Escalation note\n\n{note}\n\n"
- case_template_title_prefix = ""
- if template_id is not None and template_id != 0 and template_id != '':
- case_template = get_case_template_by_id(template_id)
- if case_template:
- case_template_title_prefix = case_template.title_prefix
- # Create the case
- case = Cases(
- name=f"[ALERT]{case_template_title_prefix} {alert.alert_title}" if not case_title else f"{case_template_title_prefix} {case_title}",
- description=f"*Alert escalated by {current_user.name}*\n\n{escalation_note}"
- f"### Alert description\n\n{alert.alert_description}"
- f"\n\n### IRIS alert link\n\n"
- f"[<i class='fa-solid fa-bell'></i> #{alert.alert_id}](/alerts?alert_ids={alert.alert_id})",
- soc_id=alert.alert_id,
- client_id=alert.alert_customer_id,
- user=current_user,
- classification_id=alert.alert_classification_id,
- state_id=get_case_state_by_name('Open').state_id
- )
- case.save()
- for tag in case_tags.split(','):
- tag = Tags(tag_title=tag)
- tag = tag.save()
- case.tags.append(tag)
- case.severity_id = alert.alert_severity_id
- db.session.commit()
- # Link the alert to the case
- alert.cases.append(case)
- ioc_links = []
- asset_links = []
- # Add the IOCs to the case
- for ioc_uuid in iocs_list:
- for alert_ioc in alert.iocs:
- if str(alert_ioc.ioc_uuid) == ioc_uuid:
- # Make sure we don't have an existing IOC already
- tmp_ioc = Ioc.query.filter(
- Ioc.case_id == case.case_id,
- Ioc.ioc_value == alert_ioc.ioc_value,
- Ioc.ioc_type_id == alert_ioc.ioc_type_id
- ).first()
- if tmp_ioc:
- # Skip as we already have it in the case
- ioc_links.append(tmp_ioc.ioc_id)
- continue
- if alert_ioc.case_id is not None:
- # Make a deep copy of the ioc
- # prevent the ioc to conflict with the existing ioc
- new_alert_ioc = deepcopy(alert_ioc)
- make_transient(new_alert_ioc)
- new_alert_ioc.ioc_id = None
- new_alert_ioc.ioc_uuid = ioc_uuid
- new_alert_ioc.user_id = current_user.id
- new_alert_ioc.case_id = case.case_id
- db.session.add(new_alert_ioc)
- db.session.commit()
- alert_ioc = new_alert_ioc
- add_ioc(alert_ioc, current_user.id, case.case_id)
- ioc_links.append(alert_ioc.ioc_id)
- # Add the assets to the case
- for asset_uuid in assets_list:
- for alert_asset in alert.assets:
- if str(alert_asset.asset_uuid) == asset_uuid:
- alert_asset.analysis_status_id = get_unspecified_analysis_status_id()
- if alert_asset.case_id is not None:
- # Make a deep copy of the asset
- # prevent the asset to conflict with the existing asset
- new_alert_asset = deepcopy(alert_asset)
- make_transient(new_alert_asset)
- new_alert_asset.asset_id = None
- new_alert_asset.asset_uuid = asset_uuid
- db.session.add(new_alert_asset)
- db.session.commit()
- alert_asset = new_alert_asset
- asset = create_asset(asset=alert_asset,
- caseid=case.case_id,
- user_id=current_user.id
- )
- asset.asset_uuid = alert_asset.asset_uuid
- set_ioc_links(ioc_links, asset.asset_id)
- asset_links.append(asset.asset_id)
- # Add event to timeline
- if import_as_event:
- unspecified_cat = get_unspecified_event_category()
- event_schema = EventSchema()
- event = event_schema.load({
- 'event_title': f"[ALERT] {alert.alert_title}",
- 'event_content': alert.alert_description if alert.alert_description else "",
- 'event_source': alert.alert_source if alert.alert_source else "",
- 'event_raw': json.dumps(alert.alert_source_content, indent=4) if alert.alert_source_content else "",
- 'event_date': alert.alert_source_event_time.strftime("%Y-%m-%dT%H:%M:%S.%f") if alert.alert_source_event_time else datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f"),
- 'event_date_wtz': alert.alert_source_event_time.strftime("%Y-%m-%dT%H:%M:%S.%f") if alert.alert_source_event_time else datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f"),
- 'event_iocs': ioc_links,
- 'event_assets': asset_links,
- 'event_tags': alert.alert_tags + ',alert' if alert.alert_tags else "alert",
- 'event_tz': '+00:00',
- 'event_category_id': unspecified_cat.id,
- 'event_in_graph': True,
- 'event_in_summary': True
- }, session=db.session)
- event.case_id = case.case_id
- event.user_id = current_user.id
- event.event_added = datetime.utcnow()
- add_obj_history_entry(event, 'created')
- db.session.add(event)
- update_timeline_state(caseid=case.case_id)
- event.category = [unspecified_cat]
- update_event_assets(event_id=event.event_id,
- caseid=case.case_id,
- assets_list=asset_links,
- iocs_list=ioc_links,
- sync_iocs_assets=False)
- update_event_iocs(event_id=event.event_id,
- caseid=case.case_id,
- iocs_list=ioc_links)
- if template_id is not None and template_id != 0 and template_id != '':
- case, logs = case_template_post_modifier(case, template_id)
- db.session.commit()
- return case
- def merge_alert_in_case(alert: Alert, case: Cases, iocs_list: List[str],
- assets_list: List[str], note: str, import_as_event: bool, case_tags: str):
- """
- Merge an alert in a case
- args:
- alert (Alert): The Alert
- case (Cases): The Case
- iocs_list (list): The list of IOCs
- case_title (str): The title of the case
- assets_list (list): The list of assets
- note (str): The note to add to the case
- import_as_event (bool): Whether to import the alert as an event
- case_tags (str): The tags to add to the case
- """
- if case in alert.cases:
- return case
- escalation_note = ""
- if note:
- escalation_note = f"\n\n### Escalation note\n\n{note}\n\n"
- case.description += f"\n\n*Alert [#{alert.alert_id}](/alerts?alert_ids={alert.alert_id}) escalated by {current_user.name}*\n\n{escalation_note}"
- for tag in case_tags.split(',') if case_tags else []:
- tag = Tags(tag_title=tag).save()
- case.tags.append(tag)
- # Link the alert to the case
- alert.cases.append(case)
- ioc_links = []
- asset_links = []
- # Add the IOCs to the case
- for ioc_uuid in iocs_list:
- for alert_ioc in alert.iocs:
- if str(alert_ioc.ioc_uuid) == ioc_uuid:
- tmp_ioc = Ioc.query.filter(
- Ioc.case_id == case.case_id,
- Ioc.ioc_value == alert_ioc.ioc_value,
- Ioc.ioc_type_id == alert_ioc.ioc_type_id
- ).first()
- if tmp_ioc:
- alert_ioc = tmp_ioc
- add_ioc(alert_ioc, current_user.id, case.case_id)
- ioc_links.append(alert_ioc.ioc_id)
- # Add the assets to the case
- for asset_uuid in assets_list:
- for alert_asset in alert.assets:
- # Filter selected assets by the user
- if str(alert_asset.asset_uuid) == asset_uuid:
- alert_asset.analysis_status_id = get_unspecified_analysis_status_id()
- # Check if the asset exists already in the case
- tmp_asset = CaseAssets.query.filter(and_(
- CaseAssets.asset_name == alert_asset.asset_name,
- CaseAssets.asset_type_id == alert_asset.asset_type_id,
- CaseAssets.case_id == case.case_id
- )).first()
- if tmp_asset:
- asset = tmp_asset
- else:
- asset = create_asset(asset=alert_asset,
- caseid=case.case_id,
- user_id=current_user.id
- )
- set_ioc_links(ioc_links, asset.asset_id)
- asset_links.append(asset.asset_id)
- # Add event to timeline
- if import_as_event:
- unspecified_cat = get_unspecified_event_category()
- event_schema = EventSchema()
- event = event_schema.load({
- 'event_title': f"[ALERT] {alert.alert_title}",
- 'event_content': alert.alert_description,
- 'event_source': alert.alert_source,
- 'event_raw': json.dumps(alert.alert_source_content, indent=4),
- 'event_date': alert.alert_source_event_time.strftime("%Y-%m-%dT%H:%M:%S.%f"),
- 'event_date_wtz': alert.alert_source_event_time.strftime("%Y-%m-%dT%H:%M:%S.%f"),
- 'event_iocs': ioc_links,
- 'event_assets': asset_links,
- 'event_tags': alert.alert_tags,
- 'event_tz': '+00:00',
- 'event_category_id': unspecified_cat.id,
- 'event_in_graph': True,
- 'event_in_summary': True
- }, session=db.session)
- event.case_id = case.case_id
- event.user_id = current_user.id
- event.event_added = datetime.utcnow()
- add_obj_history_entry(event, 'created')
- db.session.add(event)
- update_timeline_state(caseid=case.case_id)
- event.category = [unspecified_cat]
- update_event_assets(event_id=event.event_id,
- caseid=case.case_id,
- assets_list=asset_links,
- iocs_list=ioc_links,
- sync_iocs_assets=False)
- update_event_iocs(event_id=event.event_id,
- caseid=case.case_id,
- iocs_list=ioc_links)
- db.session.commit()
- def unmerge_alert_from_case(alert: Alert, case: Cases):
- """
- Unmerge an alert from a case
- args:
- alert (Alert): The Alert
- case (Cases): The Case
- """
- # Check if the case is in the alert.cases list
- if case in alert.cases:
- # Unlink the alert from the case
- alert.cases.remove(case)
- db.session.commit()
- else:
- return False, f"Case {case.case_id} not linked with alert {alert.alert_id}"
- return True, f"Alert {alert.alert_id} unlinked from case {case.case_id}"
- def get_alert_status_list():
- """
- Get a list of alert statuses
- returns:
- list: A list of alert statuses
- """
- return db.session.query(AlertStatus).distinct().all()
- def get_alert_status_by_id(status_id: int) -> AlertStatus:
- """
- Get an alert status from the database
- args:
- status_id (int): The ID of the alert status
- returns:
- AlertStatus: The alert status that was retrieved from the database
- """
- return db.session.query(AlertStatus).filter(AlertStatus.status_id == status_id).first()
- def search_alert_status_by_name(status_name: str, exact_match: False) -> AlertStatus:
- """
- Get an alert status from the database from its name
- args:
- status_name (str): The name of the alert status
- exact_match (bool): Whether to perform an exact match or not
- returns:
- AlertStatus: The alert status that was retrieved from the database
- """
- if exact_match:
- return db.session.query(AlertStatus).filter(func.lower(AlertStatus.status_name) == status_name.lower()).first()
- return db.session.query(AlertStatus).filter(AlertStatus.status_name.ilike(f"%{status_name}%")).all()
- def get_alert_resolution_list():
- """
- Get a list of alert resolutions
- returns:
- list: A list of alert resolutions
- """
- return db.session.query(AlertResolutionStatus).distinct().all()
- def get_alert_resolution_by_id(resolution_id: int) -> AlertResolutionStatus:
- """
- Get an alert resolution from the database
- args:
- resolution_id (int): The ID of the alert resolution
- returns:
- Alertresolution: The alert resolution that was retrieved from the database
- """
- return db.session.query(AlertResolutionStatus).filter(AlertResolutionStatus.resolution_status_id == resolution_id).first()
- def search_alert_resolution_by_name(resolution_status_name: str, exact_match: False) -> AlertResolutionStatus:
- """
- Get an alert resolution from the database from its name
- args:
- resolution_name (str): The name of the alert resolution
- exact_match (bool): Whether to perform an exact match or not
- returns:
- Alertresolution: The alert resolution that was retrieved from the database
- """
- if exact_match:
- return db.session.query(AlertResolutionStatus).filter(func.lower(
- AlertResolutionStatus.resolution_status_name) == resolution_status_name.lower()).first()
- return db.session.query(AlertResolutionStatus).filter(
- AlertResolutionStatus.resolution_status_name.ilike(f"%{resolution_status_name}%")).all()
- def cache_similar_alert(customer_id, assets, iocs, alert_id, creation_date):
- """
- Cache similar alerts
- args:
- customer_id (int): The ID of the customer
- assets (list): The list of assets
- iocs (list): The list of IOCs
- alert_id (int): The ID of the alert
- creation_date (datetime): The creation date of the alert
- returns:
- None
- """
- for asset in assets:
- cache_entry = SimilarAlertsCache(customer_id=customer_id, asset_name=asset['asset_name'],
- asset_type_id=asset["asset_type_id"], alert_id=alert_id,
- created_at=creation_date)
- db.session.add(cache_entry)
- for ioc in iocs:
- cache_entry = SimilarAlertsCache(customer_id=customer_id, ioc_value=ioc['ioc_value'],
- ioc_type_id=ioc['ioc_type_id'], alert_id=alert_id,
- created_at=creation_date)
- db.session.add(cache_entry)
- db.session.commit()
- def register_related_alerts(new_alert=None, assets_list=None, iocs_list=None):
- """
- Register related alerts
- """
- # Step 1: Identify similar alerts based on title, assets, and IOCs
- similar_alerts = db.session.query(Alert).filter(
- Alert.alert_customer_id == new_alert.alert_customer_id,
- Alert.alert_id != new_alert.alert_id,
- or_(
- Alert.alert_title == new_alert.alert_title,
- Alert.assets.any(CaseAssets.asset_name.in_([asset.asset_name for asset in new_alert.assets])),
- Alert.iocs.any(Ioc.ioc_value.in_([ioc.ioc_value for ioc in new_alert.iocs]))
- )
- ).all()
- # Step 2: Create relationships in the AlertSimilarity table
- for similar_alert in similar_alerts:
- # Matching on title
- if new_alert.alert_title == similar_alert.alert_title:
- alert_similarity = AlertSimilarity(
- alert_id=new_alert.alert_id,
- similar_alert_id=similar_alert.alert_id,
- similarity_type="title_match"
- )
- db.session.add(alert_similarity)
- # Matching on assets
- for asset in new_alert.assets:
- if asset in similar_alert.assets:
- alert_similarity = AlertSimilarity(
- alert_id=new_alert.alert_id,
- similar_alert_id=similar_alert.alert_id,
- similarity_type="asset_match",
- matching_asset_id=asset.asset_id
- )
- db.session.add(alert_similarity)
- # Matching on IOCs
- for ioc in new_alert.iocs:
- if ioc in similar_alert.iocs:
- alert_similarity = AlertSimilarity(
- alert_id=new_alert.alert_id,
- similar_alert_id=similar_alert.alert_id,
- similarity_type="ioc_match",
- matching_ioc_id=ioc.ioc_id
- )
- db.session.add(alert_similarity)
- def delete_similar_alert_cache(alert_id):
- """
- Delete the similar alert cache
- args:
- alert_id (int): The ID of the alert
- returns:
- None
- """
- SimilarAlertsCache.query.filter(SimilarAlertsCache.alert_id == alert_id).delete()
- db.session.commit()
- def delete_related_alert_cache(alert_id):
- """
- Delete the related alerts cache
- args:
- alert_id (int): The ID of the alert
- returns:
- None
- """
- AlertSimilarity.query.filter(
- or_(
- AlertSimilarity.alert_id == alert_id,
- AlertSimilarity.similar_alert_id == alert_id
- )
- ).delete()
- db.session.commit()
- def delete_similar_alerts_cache(alert_ids: List[int]):
- """
- Delete the similar alerts cache
- args:
- alert_ids (List(int)): The ID of the alert
- returns:
- None
- """
- SimilarAlertsCache.query.filter(SimilarAlertsCache.alert_id.in_(alert_ids)).delete()
- db.session.commit()
- def delete_related_alerts_cache(alert_ids: List[int]):
- """
- Delete the related alerts cache
- args:
- alert_ids (List(int)): The ID of the alert
- returns:
- None
- """
- AlertSimilarity.query.filter(
- or_(
- AlertSimilarity.alert_id.in_(alert_ids),
- AlertSimilarity.similar_alert_id.in_(alert_ids)
- )
- ).delete()
- db.session.commit()
- def get_related_alerts(customer_id, assets, iocs, details=False):
- """
- Check if an alert is related to another alert
- args:
- customer_id (int): The ID of the customer
- assets (list): The list of assets
- iocs (list): The list of IOCs
- details (bool): Whether to return the details of the related alerts
- returns:
- bool: True if the alert is related to another alert, False otherwise
- """
- asset_names = [asset.asset_name for asset in assets]
- ioc_values = [ioc.ioc_value for ioc in iocs]
- similar_assets = SimilarAlertsCache.query.filter(
- SimilarAlertsCache.customer_id == customer_id,
- SimilarAlertsCache.asset_name.in_(asset_names)
- ).all()
- similar_iocs = SimilarAlertsCache.query.filter(
- SimilarAlertsCache.customer_id == customer_id,
- SimilarAlertsCache.ioc_value.in_(ioc_values)
- ).all()
- similarities = {
- 'assets': [asset.alert_id for asset in similar_assets],
- 'iocs': [ioc.alert_id for ioc in similar_iocs]
- }
- return similarities
- def get_related_alerts_details(customer_id, assets, iocs, open_alerts, closed_alerts, open_cases, closed_cases,
- days_back=30, number_of_results=200):
- """
- Get the details of the related alerts
- args:
- customer_id (int): The ID of the customer
- assets (list): The list of assets
- iocs (list): The list of IOCs
- open_alerts (bool): Include open alerts
- closed_alerts (bool): Include closed alerts
- open_cases (bool): Include open cases
- closed_cases (bool): Include closed cases
- days_back (int): The number of days to look back
- number_of_results (int): The maximum number of alerts to return
- returns:
- dict: The details of the related alerts with matched assets and/or IOCs
- """
- if not assets and not iocs:
- return {
- 'nodes': [],
- 'edges': []
- }
- asset_names = [(asset.asset_name, asset.asset_type_id) for asset in assets]
- ioc_values = [(ioc.ioc_value, ioc.ioc_type_id) for ioc in iocs]
- asset_type_alias = aliased(AssetsType)
- alert_status_filter = []
- if open_alerts:
- open_alert_status_ids = AlertStatus.query.with_entities(
- AlertStatus.status_id
- ).filter(AlertStatus.status_name.in_(['New', 'Assigned', 'In progress', 'Pending', 'Unspecified'])).all()
- alert_status_filter += [status_id[0] for status_id in open_alert_status_ids]
- if closed_alerts:
- closed_alert_status_ids = AlertStatus.query.with_entities(
- AlertStatus.status_id
- ).filter(AlertStatus.status_name.in_(['Closed', 'Merged', 'Escalated'])).all()
- alert_status_filter += [status_id[0] for status_id in closed_alert_status_ids]
- conditions = and_(
- SimilarAlertsCache.customer_id == customer_id,
- and_(or_(
- tuple_(SimilarAlertsCache.asset_name, SimilarAlertsCache.asset_type_id).in_(asset_names),
- tuple_(SimilarAlertsCache.ioc_value, SimilarAlertsCache.ioc_type_id).in_(ioc_values)
- ),
- SimilarAlertsCache.created_at >= (func.now() - timedelta(days=days_back))
- )
- )
- if alert_status_filter:
- conditions = and_(conditions, Alert.alert_status_id.in_(alert_status_filter))
- related_alerts = (
- db.session.query(Alert, SimilarAlertsCache.asset_name, SimilarAlertsCache.ioc_value,
- asset_type_alias.asset_icon_not_compromised)
- .join(SimilarAlertsCache, Alert.alert_id == SimilarAlertsCache.alert_id)
- .outerjoin(Alert.resolution_status)
- .outerjoin(asset_type_alias, SimilarAlertsCache.asset_type_id == asset_type_alias.asset_id)
- .filter(conditions)
- .limit(number_of_results)
- .all()
- )
- alerts_dict = {}
- for alert, asset_name, ioc_value, asset_icon_not_compromised in related_alerts:
- if alert.alert_id not in alerts_dict:
- alerts_dict[alert.alert_id] = {'alert': alert, 'assets': [], 'iocs': []}
- if any(name == asset_name for name, _ in asset_names):
- asset_info = {'asset_name': asset_name, 'icon': asset_icon_not_compromised}
- alerts_dict[alert.alert_id]['assets'].append(asset_info)
- if any(value == ioc_value for value, _ in ioc_values):
- alerts_dict[alert.alert_id]['iocs'].append(ioc_value)
- nodes = []
- edges = []
- added_assets = set()
- added_iocs = set()
- added_cases = set()
- for alert_id, alert_info in alerts_dict.items():
- alert_color = '#c95029' if alert_info['alert'].status.status_name in ['Closed', 'Merged', 'Escalated'] else ''
- alert_resolution_title = f'[{alert_info["alert"].resolution_status.resolution_status_name}]\n' if alert_info["alert"].resolution_status else ""
- nodes.append({
- 'id': f'alert_{alert_id}',
- 'label': f'[Closed]{alert_resolution_title} {alert_info["alert"].alert_title}' if alert_color != '' else f'{alert_resolution_title}{alert_info["alert"].alert_title}',
- 'title': f'{alert_info["alert"].alert_description}',
- 'group': 'alert',
- 'shape': 'icon',
- 'icon': {
- 'face': 'FontAwesome',
- 'code': '\uf0f3',
- 'color': alert_color,
- 'weight': "bold"
- },
- 'font': "12px verdana white" if current_user.in_dark_mode else ''
- })
- for asset_info in alert_info['assets']:
- asset_id = asset_info['asset_name']
- if asset_id not in added_assets:
- nodes.append({
- 'id': f'asset_{asset_id}',
- 'label': asset_id,
- 'group': 'asset',
- 'shape': 'image',
- 'image': '/static/assets/img/graph/' + asset_info['icon'],
- 'font': "12px verdana white" if current_user.in_dark_mode else ''
- })
- added_assets.add(asset_id)
- edges.append({
- 'from': f'alert_{alert_id}',
- 'to': f'asset_{asset_id}'
- })
- for ioc_value in alert_info['iocs']:
- if ioc_value not in added_iocs:
- nodes.append({
- 'id': f'ioc_{ioc_value}',
- 'label': ioc_value,
- 'group': 'ioc',
- 'shape': 'icon',
- 'icon': {
- 'face': 'FontAwesome',
- 'code': '\ue4a8',
- 'color': 'white' if current_user.in_dark_mode else '',
- 'weight': "bold"
- },
- 'font': "12px verdana white" if current_user.in_dark_mode else ''
- })
- added_iocs.add(ioc_value)
- edges.append({
- 'from': f'alert_{alert_id}',
- 'to': f'ioc_{ioc_value}',
- 'dashes': True
- })
- if open_cases or closed_cases:
- close_condition = None
- if open_cases and not closed_cases:
- close_condition = Cases.close_date.is_(None)
- if closed_cases and not open_cases:
- close_condition = Cases.close_date.isnot(None)
- if open_cases and closed_cases:
- close_condition = Cases.close_date.isnot(None) | Cases.close_date.is_(None)
- matching_ioc_cases = (
- db.session.query(Ioc)
- .with_entities(Ioc.case_id, Ioc.ioc_value, Cases.name, Cases.close_date, Cases.description)
- .join(Ioc.case)
- .filter(
- and_(
- and_(
- Ioc.ioc_value.in_(added_iocs),
- close_condition,
- ),
- Cases.client_id == customer_id
- )
- )
- .distinct()
- .all()
- )
- matching_asset_cases = (
- db.session.query(CaseAssets)
- .with_entities(CaseAssets.case_id, CaseAssets.asset_name, Cases.name, Cases.close_date, Cases.description)
- .join(CaseAssets.case)
- .filter(
- and_(
- and_(
- CaseAssets.asset_name.in_(added_assets),
- close_condition
- ),
- Cases.client_id == customer_id
- )
- )
- .distinct(CaseAssets.case_id)
- .all()
- )
- cases_data = {}
- for case_id, ioc_value, case_name, close_date, case_desc in matching_ioc_cases:
- if case_id not in cases_data:
- cases_data[case_id] = {'name': case_name, 'matching_ioc': [], 'matching_assets': [],
- 'close_date': close_date, 'description': case_desc}
- cases_data[case_id]['matching_ioc'].append(ioc_value)
- for case_id, asset_name, case_name, close_date, case_desc in matching_asset_cases:
- if case_id not in cases_data:
- cases_data[case_id] = {'name': case_name, 'matching_ioc': [], 'matching_assets': [],
- 'close_date': close_date, 'description': case_desc}
- cases_data[case_id]['matching_assets'].append(asset_name)
- for case_id in cases_data:
- if case_id not in added_cases:
- nodes.append({
- 'id': f'case_{case_id}',
- 'label': f'[Closed] Case #{case_id}' if cases_data[case_id].get('close_date') else f'Case #{case_id}',
- 'title': cases_data[case_id].get("description"),
- 'group': 'case',
- 'shape': 'icon',
- 'icon': {
- 'face': 'FontAwesome',
- 'code': '\uf0b1',
- 'color': '#c95029' if cases_data[case_id].get('close_date') else '#4cba4f'
- },
- 'font': "12px verdana white" if current_user.in_dark_mode else ''
- })
- added_cases.add(case_id)
- for ioc_value in cases_data[case_id]['matching_ioc']:
- edges.append({
- 'from': f'ioc_{ioc_value}',
- 'to': f'case_{case_id}',
- 'dashes': True
- })
- for asset_name in cases_data[case_id]['matching_assets']:
- edges.append({
- 'from': f'asset_{asset_name}',
- 'to': f'case_{case_id}',
- 'dashes': True
- })
- return {
- 'nodes': nodes,
- 'edges': edges
- }
- def get_alert_comments(alert_id: int) -> List[Comments]:
- """
- Get the comments of an alert
- args:
- alert_id (int): The ID of the alert
- returns:
- list: The list of comments
- """
- return Comments.query.filter(Comments.comment_alert_id == alert_id).all()
- def get_alert_comment(alert_id: int, comment_id: int) -> Comments:
- """
- Get a comment of an alert
- args:
- alert_id (int): The ID of the alert
- comment_id (int): The ID of the comment
- returns:
- Comments: The comment
- """
- return Comments.query.filter(
- Comments.comment_alert_id == alert_id,
- Comments.comment_id == comment_id
- ).first()
- def delete_alert_comment(comment_id: int, alert_id: int) -> Tuple[bool, str]:
- """
- Delete a comment of an alert
- args:
- comment_id (int): The ID of the comment
- """
- comment = Comments.query.filter(
- Comments.comment_id == comment_id,
- Comments.comment_user_id == current_user.id,
- Comments.comment_alert_id == alert_id
- ).first()
- if not comment:
- return False, "You are not allowed to delete this comment"
- db.session.delete(comment)
- db.session.commit()
- return True, "Comment deleted successfully"
- def remove_alerts_from_assets_by_ids(alert_ids: List[int]) -> None:
- """
- Remove the alerts from the CaseAssets based on the alert_ids
- args:
- alert_ids (List[int]): list of alerts to remove
- returns:
- None
- """
- # Query the affected CaseAssets based on the alert_ids
- affected_case_assets = (
- db.session.query(CaseAssets)
- .join(alert_assets_association)
- .join(Alert, alert_assets_association.c.alert_id == Alert.alert_id)
- .filter(Alert.alert_id.in_(alert_ids))
- .all()
- )
- # Remove the alerts and delete the CaseAssets if not related to a case
- for case_asset in affected_case_assets:
- # Remove the alerts based on alert_ids
- case_asset.alerts = [alert for alert in case_asset.alerts if alert.alert_id not in alert_ids]
- # Delete the CaseAsset if it's not related to a case
- if case_asset.case_id is None:
- db.session.delete(case_asset)
- # Commit the changes
- db.session.commit()
- def remove_alerts_from_iocs_by_ids(alert_ids: List[int]) -> None:
- """
- Remove the alerts from the Ioc based on the alert_ids
- args:
- alert_ids (List[int]): list of alerts to remove
- returns:
- None
- """
- # Query the affected CaseAssets based on the alert_ids
- affected_case_iocs = (
- db.session.query(Ioc)
- .join(alert_iocs_association)
- .join(Alert, alert_iocs_association.c.alert_id == Alert.alert_id)
- .filter(Alert.alert_id.in_(alert_ids))
- .all()
- )
- # Remove the alerts and delete the Ioc if not related to a case
- for ioc in affected_case_iocs:
- # Remove the alerts based on alert_ids
- ioc.alerts = [alert for alert in ioc.alerts if alert.alert_id not in alert_ids]
- # Commit the changes
- db.session.commit()
- def remove_case_alerts_by_ids(alert_ids: List[int]) -> None:
- """
- Remove the alerts from the Case based on the alert_ids
- args:
- alert_ids (List[int]): list of alerts to remove
- returns:
- None
- """
- affected_cases = (
- db.session.query(Cases)
- .join(AlertCaseAssociation)
- .join(Alert, AlertCaseAssociation.alert_id == Alert.alert_id)
- .filter(Alert.alert_id.in_(alert_ids))
- .all()
- )
- for case in affected_cases:
- # Remove the alerts based on alert_ids
- case.alerts = [alert for alert in case.alerts if alert.alert_id not in alert_ids]
- db.session.query(AlertCaseAssociation).filter(
- AlertCaseAssociation.alert_id.in_(alert_ids)
- ).delete(synchronize_session='fetch')
- db.session.commit()
- def delete_alerts(alert_ids: List[int]) -> tuple[bool, str]:
- """
- Delete multiples alerts from the database
- args:
- alert_ids (List[int]): list of alerts to delete
- returns:
- True if deleted successfully
- """
- try:
- delete_similar_alerts_cache(alert_ids)
- delete_related_alerts_cache(alert_ids)
- remove_alerts_from_assets_by_ids(alert_ids)
- remove_alerts_from_iocs_by_ids(alert_ids)
- remove_case_alerts_by_ids(alert_ids)
- Comments.query.filter(Comments.comment_alert_id.in_(alert_ids)).delete()
- Alert.query.filter(Alert.alert_id.in_(alert_ids)).delete()
- except Exception as e:
- db.session.rollback()
- app.logger.exception(str(e))
- return False, "Server side error"
- return True, ""
- def get_alert_status_by_name(name: str) -> AlertStatus:
- """
- Get the alert status by name
- args:
- name (str): The name of the alert status
- returns:
- AlertStatus: The alert status
- """
- return AlertStatus.query.filter(AlertStatus.status_name == name).first()
|