| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- # IRIS Source Code
- # Copyright (C) 2021 - Airbus CyberSecurity (SAS)
- # ir@cyberactionlab.net
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 3 of the License, or (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License
- # along with this program; if not, write to the Free Software Foundation,
- # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- import datetime
- import re
- from sqlalchemy import desc
- from app.datamgmt.case.case_notes_db import get_notes_from_group
- from app.datamgmt.case.case_notes_db import get_case_note_comments
- from app.models.models import AnalysisStatus
- from app.models.models import CompromiseStatus
- from app.models.models import TaskAssignee
- from app.models.models import AssetsType
- from app.models.models import CaseAssets
- from app.models.models import CaseEventsAssets
- from app.models.models import CaseEventsIoc
- from app.models.models import CaseReceivedFile
- from app.models.models import CaseTasks
- from app.models.cases import Cases
- from app.models.cases import CasesEvent
- from app.models.models import Comments
- from app.models.models import EventCategory
- from app.models.models import Ioc
- from app.models.models import IocAssetLink
- from app.models.models import IocType
- from app.models.models import Notes
- from app.models.models import NotesGroup
- from app.models.models import TaskStatus
- from app.models.models import Tlp
- from app.models.authorization import User
- from app.schema.marshables import CaseDetailsSchema
- from app.schema.marshables import CommentSchema
- from app.schema.marshables import CaseNoteSchema
- def export_case_json_extended(case_id):
- """
- Export a case a JSON
- """
- export = {}
- case = export_caseinfo_json_extended(case_id)
- if not case:
- export['errors'] = ["Invalid case number"]
- return export
- export['case'] = case
- export['evidences'] = export_case_evidences_json_extended(case_id)
- export['timeline'] = export_case_tm_json_extended(case_id)
- export['iocs'] = export_case_iocs_json_extended(case_id)
- export['assets'] = export_case_assets_json_extended(case_id)
- export['tasks'] = export_case_tasks_json_extended(case_id)
- export['notes'] = export_case_notes_json_extended(case_id)
- export['export_date'] = datetime.datetime.utcnow()
- return export
- def process_md_images_links_for_report(markdown_text):
- """Process images links in markdown for better processing on the generator side
- Creates proper links with FQDN and removal of scale
- """
- markdown = re.sub(r'(/datastore\/file\/view\/\d+\?cid=\d+)( =[\dA-z%]*)\)',
- r"http://127.0.0.1:8000:/\1)", markdown_text)
- return markdown
- def export_caseinfo_json_extended(case_id):
- case = Cases.query.filter(
- Cases.case_id == case_id
- ).first()
- return case
- def export_case_evidences_json_extended(case_id):
- evidences = CaseReceivedFile.query.filter(
- CaseReceivedFile.case_id == case_id
- ).join(
- CaseReceivedFile.case
- ).join(
- CaseReceivedFile.user).all()
- return evidences
- def export_case_tm_json_extended(case_id):
- events = CasesEvent.query.filter(
- CasesEvent.case_id == case_id
- ).all()
- return events
- def export_case_iocs_json_extended(case_id):
- iocs = Ioc.query.filter(
- Ioc.case_id == case_id
- ).all()
- return iocs
- def export_case_assets_json_extended(case_id):
- assets = CaseAssets.query.filter(
- CaseAssets.case_id == case_id
- ).all()
- return assets
- def export_case_tasks_json_extended(case_id):
- tasks = CaseTasks.query.filter(
- CaseTasks.task_case_id == case_id
- ).all()
- return tasks
- def export_case_notes_json_extended(case_id):
- notes_groups = NotesGroup.query.filter(
- NotesGroup.group_case_id == case_id
- ).all()
- for notes_group in notes_groups:
- notes_group = notes_group.__dict__
- notes_group['notes'] = get_notes_from_group(notes_group['group_id'], case_id)
- return notes_groups
- def export_caseinfo_json(case_id):
- case = Cases.query.filter(
- Cases.case_id == case_id
- ).first()
- if not case:
- return None
- case = CaseDetailsSchema().dump(case)
- return case
- def export_case_evidences_json(case_id):
- evidences = CaseReceivedFile.query.filter(
- CaseReceivedFile.case_id == case_id
- ).with_entities(
- CaseReceivedFile.filename,
- CaseReceivedFile.date_added,
- CaseReceivedFile.file_hash,
- User.name.label('added_by'),
- CaseReceivedFile.custom_attributes,
- CaseReceivedFile.file_uuid,
- CaseReceivedFile.id,
- CaseReceivedFile.file_size,
- ).order_by(
- CaseReceivedFile.date_added
- ).join(
- CaseReceivedFile.user
- ).all()
- if evidences:
- return [row._asdict() for row in evidences]
- else:
- return []
- def export_case_notes_json(case_id):
- # Fetch all notes associated with the case
- notes = Notes.query.filter(
- Notes.note_case_id == case_id
- ).all()
- # Initialize the schemas
- note_schema = CaseNoteSchema()
- comments_schema = CommentSchema(many=True)
- # Serialize the notes and their comments
- serialized_notes = []
- for note in notes:
- note_comments = get_case_note_comments(note.note_id)
- serialized_note = note_schema.dump(note)
- serialized_note['comments'] = comments_schema.dump(note_comments)
- serialized_note["note_content"] = process_md_images_links_for_report(serialized_note["note_content"])
- serialized_notes.append(serialized_note)
- return serialized_notes
- def export_case_tm_json(case_id):
- timeline = CasesEvent.query.with_entities(
- CasesEvent.event_id,
- CasesEvent.event_title,
- CasesEvent.event_in_summary,
- CasesEvent.event_date,
- CasesEvent.event_tz,
- CasesEvent.event_date_wtz,
- CasesEvent.event_content,
- CasesEvent.event_tags,
- CasesEvent.event_source,
- CasesEvent.event_raw,
- CasesEvent.custom_attributes,
- EventCategory.name.label('category'),
- User.name.label('last_edited_by'),
- CasesEvent.event_uuid,
- CasesEvent.event_in_graph,
- CasesEvent.event_in_summary,
- CasesEvent.event_color,
- CasesEvent.event_is_flagged
- ).filter(
- CasesEvent.case_id == case_id
- ).order_by(
- CasesEvent.event_date
- ).join(
- CasesEvent.user
- ).outerjoin(
- CasesEvent.category
- ).all()
- tim = []
- for row in timeline:
- ras = row._asdict()
- ras['assets'] = None
- as_list = CaseEventsAssets.query.with_entities(
- CaseAssets.asset_id,
- CaseAssets.asset_name,
- AssetsType.asset_name.label('type')
- ).filter(
- CaseEventsAssets.event_id == row.event_id
- ).join(
- CaseEventsAssets.asset
- ).join(
- CaseAssets.asset_type
- ).all()
- alki = []
- for asset in as_list:
- alki.append("{} ({})".format(asset.asset_name, asset.type))
- ras['assets'] = alki
- iocs_list = CaseEventsIoc.query.with_entities(
- CaseEventsIoc.ioc_id,
- Ioc.ioc_value,
- Ioc.ioc_description,
- Tlp.tlp_name,
- IocType.type_name.label('type')
- ).filter(
- CaseEventsIoc.event_id == row.event_id
- ).join(
- CaseEventsIoc.ioc
- ).join(
- Ioc.ioc_type
- ).join(
- Ioc.tlp
- ).all()
- ras['iocs'] = [ioc._asdict() for ioc in iocs_list]
- tim.append(ras)
- return tim
- def export_case_tasks_json(case_id):
- res = CaseTasks.query.with_entities(
- CaseTasks.task_title,
- TaskStatus.status_name.label('task_status'),
- CaseTasks.task_tags,
- CaseTasks.task_open_date,
- CaseTasks.task_close_date,
- CaseTasks.task_last_update,
- CaseTasks.task_description,
- CaseTasks.custom_attributes,
- CaseTasks.task_uuid,
- CaseTasks.id
- ).filter(
- CaseTasks.task_case_id == case_id
- ).join(
- CaseTasks.status
- ).all()
- tasks = [c._asdict() for c in res]
- task_with_assignees = []
- for task in tasks:
- task_id = task['id']
- get_assignee_list = TaskAssignee.query.with_entities(
- TaskAssignee.task_id,
- User.user,
- User.id,
- User.name
- ).join(
- TaskAssignee.user
- ).filter(
- TaskAssignee.task_id == task_id
- ).all()
- assignee_list = {}
- for member in get_assignee_list:
- if member.task_id not in assignee_list:
- assignee_list[member.task_id] = [{
- 'user': member.user,
- 'name': member.name,
- 'id': member.id
- }]
- else:
- assignee_list[member.task_id].append({
- 'user': member.user,
- 'name': member.name,
- 'id': member.id
- })
- task['task_assignees'] = assignee_list.get(task['id'], [])
- task_with_assignees.append(task)
- return task_with_assignees
- def export_case_assets_json(case_id):
- ret = []
- res = CaseAssets.query.with_entities(
- CaseAssets.asset_id,
- CaseAssets.asset_uuid,
- CaseAssets.asset_name,
- CaseAssets.asset_description,
- CaseAssets.asset_compromise_status_id,
- AssetsType.asset_name.label("type"),
- AnalysisStatus.name.label('analysis_status'),
- CaseAssets.date_added,
- CaseAssets.asset_domain,
- CaseAssets.asset_ip,
- CaseAssets.asset_info,
- CaseAssets.asset_tags,
- CaseAssets.custom_attributes
- ).filter(
- CaseAssets.case_id == case_id
- ).join(
- CaseAssets.asset_type
- ).join(
- CaseAssets.analysis_status
- ).order_by(desc(CaseAssets.asset_compromise_status_id)).all()
- for row in res:
- row = row._asdict()
- row['light_asset_description'] = row['asset_description']
- ial = IocAssetLink.query.with_entities(
- Ioc.ioc_value,
- IocType.type_name,
- Ioc.ioc_description
- ).filter(
- IocAssetLink.asset_id == row['asset_id']
- ).join(
- IocAssetLink.ioc
- ).join(
- Ioc.ioc_type
- ).all()
- if ial:
- row['asset_ioc'] = [row._asdict() for row in ial]
- else:
- row['asset_ioc'] = []
- if row['asset_compromise_status_id'] is None:
- row['asset_compromise_status_id'] = CompromiseStatus.unknown.value
- status_text = CompromiseStatus.unknown.name.replace('_', ' ').title()
- else:
- status_text = CompromiseStatus(row['asset_compromise_status_id']).name.replace('_', ' ').title()
- row['asset_compromise_status'] = status_text
- ret.append(row)
- return ret
- def export_case_comments_json(case_id):
- comments = Comments.query.with_entities(
- Comments.comment_id,
- Comments.comment_uuid,
- Comments.comment_text,
- User.name.label('comment_by'),
- Comments.comment_date,
- ).filter(
- Comments.comment_case_id == case_id
- ).join(
- Comments.user
- ).order_by(
- Comments.comment_date
- ).all()
- return [row._asdict() for row in comments]
|