| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368 |
- # IRIS Source Code
- # Copyright (C) 2021 - Airbus CyberSecurity (SAS)
- # ir@cyberactionlab.net
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 3 of the License, or (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License
- # along with this program; if not, write to the Free Software Foundation,
- # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- from flask_login import current_user
- from sqlalchemy import and_
- from app import db
- from app import app
- from app.datamgmt.states import update_ioc_state
- from app.datamgmt.conversions import convert_sort_direction
- from app.iris_engine.access_control.utils import ac_get_fast_user_cases_access
- from app.models.cases import Cases
- from app.models.models import Client
- from app.models.models import Comments
- from app.models.models import Ioc
- from app.models.models import IocComments
- from app.models.models import IocType
- from app.models.models import Tlp
- from app.models.authorization import User
- from app.models.authorization import UserCaseEffectiveAccess
- from app.models.authorization import CaseAccessLevel
- from app.models.pagination_parameters import PaginationParameters
- def get_iocs(case_identifier) -> list[Ioc]:
- return Ioc.query.filter(
- Ioc.case_id == case_identifier
- ).all()
- def get_ioc(ioc_id, caseid=None):
- q = Ioc.query.filter(Ioc.ioc_id == ioc_id)
- if caseid:
- q = q.filter(Ioc.case_id == caseid)
- return q.first()
- def update_ioc(ioc_type, ioc_tags, ioc_value, ioc_description, ioc_tlp, userid, ioc_id):
- ioc = get_ioc(ioc_id)
- if ioc:
- ioc.ioc_type = ioc_type
- ioc.ioc_tags = ioc_tags
- ioc.ioc_value = ioc_value
- ioc.ioc_description = ioc_description
- ioc.ioc_tlp_id = ioc_tlp
- ioc.user_id = userid
- db.session.commit()
- else:
- return False
- def delete_ioc(ioc: Ioc):
- # Delete the relevant records from the AssetComments table
- com_ids = IocComments.query.with_entities(
- IocComments.comment_id
- ).filter(
- IocComments.comment_ioc_id == ioc.ioc_id,
- ).all()
- com_ids = [c.comment_id for c in com_ids]
- IocComments.query.filter(IocComments.comment_id.in_(com_ids)).delete()
- Comments.query.filter(
- Comments.comment_id.in_(com_ids)
- ).delete()
- db.session.delete(ioc)
- update_ioc_state(ioc.case_id)
- def get_detailed_iocs(caseid):
- detailed_iocs = (Ioc.query.with_entities(
- Ioc.ioc_id,
- Ioc.ioc_uuid,
- Ioc.ioc_value,
- Ioc.ioc_type_id,
- IocType.type_name.label('ioc_type'),
- Ioc.ioc_type_id,
- Ioc.ioc_description,
- Ioc.ioc_tags,
- Ioc.ioc_misp,
- Tlp.tlp_name,
- Tlp.tlp_bscolor,
- Ioc.ioc_tlp_id
- ).filter(Ioc.case_id == caseid)
- .join(Ioc.ioc_type)
- .outerjoin(Ioc.tlp)
- .order_by(IocType.type_name).all())
- return detailed_iocs
- def get_ioc_links(ioc_id):
- search_condition = and_(Cases.case_id.in_([]))
- user_search_limitations = ac_get_fast_user_cases_access(current_user.id)
- if user_search_limitations:
- search_condition = and_(Cases.case_id.in_(user_search_limitations))
- ioc = Ioc.query.filter(Ioc.ioc_id == ioc_id).first()
- # Search related iocs based on value and type
- related_iocs = (Ioc.query.with_entities(
- Cases.case_id,
- Cases.name.label('case_name'),
- Client.name.label('client_name')
- ).filter(and_(
- Ioc.ioc_value == ioc.ioc_value,
- Ioc.ioc_type_id == ioc.ioc_type_id,
- Ioc.ioc_id != ioc_id,
- search_condition)
- ).join(Ioc.case)
- .join(Cases.client)
- .all())
- return related_iocs
- def add_ioc(ioc: Ioc, user_id, caseid):
- ioc.user_id = user_id
- ioc.case_id = caseid
- db.session.add(ioc)
- update_ioc_state(caseid=caseid)
- db.session.commit()
- def case_iocs_db_exists(ioc: Ioc):
- iocs = Ioc.query.filter(Ioc.case_id == ioc.case_id,
- Ioc.ioc_value == ioc.ioc_value,
- Ioc.ioc_type_id == ioc.ioc_type_id)
- return iocs.first() is not None
- def get_ioc_types_list():
- ioc_types = IocType.query.with_entities(
- IocType.type_id,
- IocType.type_name,
- IocType.type_description,
- IocType.type_taxonomy,
- IocType.type_validation_regex,
- IocType.type_validation_expect,
- ).all()
- l_types = [row._asdict() for row in ioc_types]
- return l_types
- def add_ioc_type(name:str, description:str, taxonomy:str):
- ioct = IocType(type_name=name,
- type_description=description,
- type_taxonomy=taxonomy
- )
- db.session.add(ioct)
- db.session.commit()
- return ioct
- def check_ioc_type_id(type_id: int):
- type_id = IocType.query.filter(
- IocType.type_id == type_id
- ).first()
- return type_id
- def get_ioc_type_id(type_name: str):
- type_id = IocType.query.filter(
- IocType.type_name == type_name
- ).first()
- return type_id if type_id else None
- def get_tlps():
- return [(tlp.tlp_id, tlp.tlp_name) for tlp in Tlp.query.all()]
- def get_tlps_dict():
- tlpDict = {}
- for tlp in Tlp.query.all():
- tlpDict[tlp.tlp_name]=tlp.tlp_id
- return tlpDict
- def get_case_ioc_comments(ioc_id):
- return Comments.query.filter(
- IocComments.comment_ioc_id == ioc_id
- ).with_entities(
- Comments
- ).join(
- IocComments,
- Comments.comment_id == IocComments.comment_id
- ).order_by(
- Comments.comment_date.asc()
- ).all()
- def add_comment_to_ioc(ioc_id, comment_id):
- ec = IocComments()
- ec.comment_ioc_id = ioc_id
- ec.comment_id = comment_id
- db.session.add(ec)
- db.session.commit()
- def get_case_iocs_comments_count(iocs_list):
- return IocComments.query.filter(
- IocComments.comment_ioc_id.in_(iocs_list)
- ).with_entities(
- IocComments.comment_ioc_id,
- IocComments.comment_id
- ).group_by(
- IocComments.comment_ioc_id,
- IocComments.comment_id
- ).all()
- def get_case_ioc_comment(ioc_id, comment_id):
- return (IocComments.query.filter(
- IocComments.comment_ioc_id == ioc_id,
- IocComments.comment_id == comment_id
- ).with_entities(
- Comments.comment_id,
- Comments.comment_text,
- Comments.comment_date,
- Comments.comment_update_date,
- Comments.comment_uuid,
- User.name,
- User.user
- ).join(IocComments.comment)
- .join(Comments.user).first())
- def delete_ioc_comment(ioc_id, comment_id):
- comment = Comments.query.filter(
- Comments.comment_id == comment_id,
- Comments.comment_user_id == current_user.id
- ).first()
- if not comment:
- return False, "You are not allowed to delete this comment"
- IocComments.query.filter(
- IocComments.comment_ioc_id == ioc_id,
- IocComments.comment_id == comment_id
- ).delete()
- db.session.delete(comment)
- db.session.commit()
- return True, "Comment deleted"
- def get_ioc_by_value(ioc_value, caseid=None):
- if caseid:
- Ioc.query.filter(Ioc.ioc_value == ioc_value, Ioc.case_id == caseid).first()
- return Ioc.query.filter(Ioc.ioc_value == ioc_value).first()
- def user_list_cases_view(user_id):
- res = UserCaseEffectiveAccess.query.with_entities(
- UserCaseEffectiveAccess.case_id
- ).filter(and_(
- UserCaseEffectiveAccess.user_id == user_id,
- UserCaseEffectiveAccess.access_level != CaseAccessLevel.deny_all.value
- )).all()
- return [r.case_id for r in res]
- def _build_filter_ioc_query(
- caseid: int = None,
- ioc_type_id: int = None,
- ioc_type: str = None,
- ioc_tlp_id: int = None,
- ioc_value: str = None,
- ioc_description: str = None,
- ioc_tags: str = None,
- sort_by=None,
- sort_dir='asc'):
- """
- Get a list of iocs from the database, filtered by the given parameters
- """
- conditions = []
- if ioc_type_id is not None:
- conditions.append(Ioc.ioc_type_id == ioc_type_id)
- if ioc_type is not None:
- conditions.append(Ioc.ioc_type == ioc_type)
- if ioc_tlp_id is not None:
- conditions.append(Ioc.ioc_tlp_id == ioc_tlp_id)
- if ioc_value is not None:
- conditions.append(Ioc.ioc_value == ioc_value)
- if ioc_description is not None:
- conditions.append(Ioc.ioc_description == ioc_description)
- if ioc_tags is not None:
- conditions.append(Ioc.ioc_tags == ioc_tags)
- if caseid is not None:
- conditions.append(Ioc.case_id == caseid)
- query = Ioc.query.filter(*conditions)
- if sort_by is not None:
- order_func = convert_sort_direction(sort_dir)
- if sort_by == 'opened_by':
- query = query.join(User, Ioc.user_id == User.id).order_by(order_func(User.name))
- elif hasattr(Ioc, sort_by):
- query = query.order_by(order_func(getattr(Ioc, sort_by)))
- return query
- def get_filtered_iocs(
- pagination_parameters: PaginationParameters,
- caseid: int = None,
- ioc_type_id: int = None,
- ioc_type: str = None,
- ioc_tlp_id: int = None,
- ioc_value: str = None,
- ioc_description: str = None,
- ioc_tags: str = None
- ):
- query = _build_filter_ioc_query(caseid=caseid, ioc_type_id=ioc_type_id, ioc_type=ioc_type, ioc_tlp_id=ioc_tlp_id, ioc_value=ioc_value,
- ioc_description=ioc_description, ioc_tags=ioc_tags,
- sort_by=pagination_parameters.get_order_by(), sort_dir=pagination_parameters.get_direction())
- try:
- filtered_iocs = query.paginate(page=pagination_parameters.get_page(), per_page=pagination_parameters.get_per_page(), error_out=False)
- except Exception as e:
- app.logger.exception(f"Error getting cases: {str(e)}")
- return None
- return filtered_iocs
|