| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604 |
- # IRIS Source Code
- # Copyright (C) 2022 - DFIR IRIS Team
- # contact@dfir-iris.org
- # Copyright (C) 2021 - Airbus CyberSecurity (SAS)
- # ir@cyberactionlab.net
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 3 of the License, or (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License
- # along with this program; if not, write to the Free Software Foundation,
- # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- import logging as log
- import os
- from datetime import datetime
- from docx_generator.docx_generator import DocxGenerator
- from docx_generator.exceptions import rendering_error
- from flask_login import current_user
- from sqlalchemy import desc
- from app import app
- from app.business.cases import cases_export_to_report_json
- from app.business.cases import cases_export_to_json
- from app.datamgmt.activities.activities_db import get_auto_activities
- from app.datamgmt.activities.activities_db import get_manual_activities
- from app.datamgmt.case.case_db import case_get_desc_crc
- from app.models.models import AssetsType
- from app.models.models import CaseAssets
- from app.models.models import CaseEventsAssets
- from app.models.models import CaseReceivedFile
- from app.models.models import CaseTemplateReport
- from app.models.cases import CasesEvent
- from app.models.models import Ioc
- from app.models.models import IocAssetLink
- from app.iris_engine.reporter.ImageHandler import ImageHandler
- from app.iris_engine.utils.common import IrisJinjaEnv
- LOG_FORMAT = '%(asctime)s :: %(levelname)s :: %(module)s :: %(funcName)s :: %(message)s'
- log.basicConfig(level=log.INFO, format=LOG_FORMAT)
- class IrisReportMaker(object):
- """
- IRIS generical report maker
- """
- def __init__(self, tmp_dir, report_id, caseid, safe_mode=False):
- self._tmp = tmp_dir
- self._report_id = report_id
- self._case_info = {}
- self._caseid = caseid
- self.safe_mode = safe_mode
- def get_case_info(self, doc_type):
- """Returns case information
- Args:
- doc_type (_type_): Investigation or Activities report
- Returns:
- _type_: case info
- """
- if doc_type == 'Investigation':
- case_info = self._get_case_info()
- elif doc_type == 'Activities':
- case_info = self._get_activity_info()
- else:
- log.error("Unknown report type")
- return None
- return case_info
- def _get_activity_info(self):
- auto_activities = get_auto_activities(self._caseid)
- manual_activities = get_manual_activities(self._caseid)
- case_info_in = self._get_case_info()
- # Format information and generate the activity report #
- doc_id = "{}".format(datetime.utcnow().strftime("%y%m%d_%H%M"))
- case_info = {
- 'auto_activities': auto_activities,
- 'manual_activities': manual_activities,
- 'date': datetime.utcnow(),
- 'gen_user': current_user.name,
- 'case': {'name': case_info_in['case'].get('name'),
- 'open_date': case_info_in['case'].get('open_date'),
- 'for_customer': case_info_in['case'].get('client').get('customer_name'),
- 'client': case_info_in['case'].get('client')
- },
- 'doc_id': doc_id
- }
- return case_info
- def _get_case_info(self):
- """
- Retrieve information of the case
- :return:
- """
- case_info = cases_export_to_json(self._caseid)
- # Get customer, user and case title
- case_info['doc_id'] = IrisReportMaker.get_docid()
- case_info['user'] = current_user.name
- # Set date
- case_info['date'] = datetime.utcnow().strftime("%Y-%m-%d")
- return case_info
- @staticmethod
- def get_case_summary(caseid):
- """
- Retrieve the case summary from thehive
- :return:
- """
- _crc32, descr = case_get_desc_crc(caseid)
- return descr
- @staticmethod
- def get_case_files(caseid):
- """
- Retrieve the list of files with their hashes
- :return:
- """
- files = CaseReceivedFile.query.filter(
- CaseReceivedFile.case_id == caseid
- ).with_entities(
- CaseReceivedFile.filename,
- CaseReceivedFile.date_added,
- CaseReceivedFile.file_hash,
- CaseReceivedFile.custom_attributes
- ).order_by(
- CaseReceivedFile.date_added
- ).all()
- if files:
- return [row._asdict() for row in files]
- else:
- return []
- @staticmethod
- def get_case_timeline(caseid):
- """
- Retrieve the case timeline
- :return:
- """
- timeline = CasesEvent.query.filter(
- CasesEvent.case_id == caseid
- ).order_by(
- CasesEvent.event_date
- ).all()
- cache_id = {}
- ras = {}
- tim = []
- for row in timeline:
- ras = row
- setattr(ras, 'asset', None)
- as_list = CaseEventsAssets.query.with_entities(
- CaseAssets.asset_id,
- CaseAssets.asset_name,
- AssetsType.asset_name.label('type')
- ).filter(
- CaseEventsAssets.event_id == row.event_id
- ).join(CaseEventsAssets.asset, CaseAssets.asset_type).all()
- alki = []
- for asset in as_list:
- alki.append("{} ({})".format(asset.asset_name, asset.type))
- setattr(ras, 'asset', "\r\n".join(alki))
- tim.append(ras)
- return tim
- @staticmethod
- def get_case_ioc(caseid):
- """
- Retrieve the list of IOC linked to the case
- :return:
- """
- res = Ioc.query.distinct().with_entities(
- Ioc.ioc_value,
- Ioc.ioc_type,
- Ioc.ioc_description,
- Ioc.ioc_tags,
- Ioc.custom_attributes
- ).filter(
- Ioc.case_id == caseid
- ).order_by(Ioc.ioc_type).all()
- if res:
- return [row._asdict() for row in res]
- else:
- return []
- @staticmethod
- def get_case_assets(caseid):
- """
- Retrieve the assets linked ot the case
- :return:
- """
- ret = []
- res = CaseAssets.query.distinct().with_entities(
- CaseAssets.asset_id,
- CaseAssets.asset_name,
- CaseAssets.asset_description,
- CaseAssets.asset_compromised.label('compromised'),
- AssetsType.asset_name.label("type"),
- CaseAssets.custom_attributes,
- CaseAssets.asset_tags
- ).filter(
- CaseAssets.case_id == caseid
- ).join(
- CaseAssets.asset_type
- ).order_by(desc(CaseAssets.asset_compromised)).all()
- for row in res:
- row = row._asdict()
- row['light_asset_description'] = row['asset_description']
- ial = IocAssetLink.query.with_entities(
- Ioc.ioc_value,
- Ioc.ioc_type,
- Ioc.ioc_description
- ).filter(
- IocAssetLink.asset_id == row['asset_id']
- ).join(
- IocAssetLink.ioc
- ).all()
- if ial:
- row['asset_ioc'] = [row._asdict() for row in ial]
- else:
- row['asset_ioc'] = []
- ret.append(row)
- return ret
- @staticmethod
- def get_docid():
- return "{}".format(
- datetime.utcnow().strftime("%y%m%d_%H%M"))
- @staticmethod
- def markdown_to_text(markdown_string):
- """
- Converts a markdown string to plaintext
- """
- return markdown_string.replace('\n', '</w:t></w:r><w:r/></w:p><w:p><w:r><w:t xml:space="preserve">').replace(
- '#', '')
- class IrisMakeDocReport(IrisReportMaker):
- """
- Generates a DOCX report for the case
- """
- def __init__(self, tmp_dir, report_id, caseid, safe_mode=False):
- self._tmp = tmp_dir
- self._report_id = report_id
- self._case_info = {}
- self._caseid = caseid
- self._safe_mode = safe_mode
- def generate_doc_report(self, doc_type):
- """
- Actually generates the report
- :return:
- """
- if doc_type == 'Investigation':
- case_info = self._get_case_info()
- elif doc_type == 'Activities':
- case_info = self._get_activity_info()
- else:
- log.error("Unknown report type")
- return None
- report = CaseTemplateReport.query.filter(CaseTemplateReport.id == self._report_id).first()
- name = "{}".format("{}.docx".format(report.naming_format))
- name = name.replace("%code_name%", case_info['doc_id'])
- name = name.replace('%customer%', case_info['case']['client']['customer_name'])
- name = name.replace('%case_name%', case_info['case'].get('name'))
- name = name.replace('%date%', datetime.utcnow().strftime("%Y-%m-%d"))
- output_file_path = os.path.join(self._tmp, name)
- try:
- if not self._safe_mode:
- image_handler = ImageHandler(template=None, base_path='/')
- else:
- image_handler = None
- generator = DocxGenerator(image_handler=image_handler)
- generator.generate_docx("/",
- os.path.join(app.config['TEMPLATES_PATH'], report.internal_reference),
- case_info,
- output_file_path
- )
- return output_file_path, ""
- except rendering_error.RenderingError as e:
- return None, e.__str__()
- def _get_activity_info(self):
- auto_activities = get_auto_activities(self._caseid)
- manual_activities = get_manual_activities(self._caseid)
- case_info_in = self._get_case_info()
- # Format information and generate the activity report #
- doc_id = "{}".format(datetime.utcnow().strftime("%y%m%d_%H%M"))
- case_info = {
- 'auto_activities': auto_activities,
- 'manual_activities': manual_activities,
- 'date': datetime.utcnow(),
- 'gen_user': current_user.name,
- 'case': {'name': case_info_in['case'].get('name'),
- 'open_date': case_info_in['case'].get('open_date'),
- 'for_customer': case_info_in['case'].get('for_customer'),
- 'client': case_info_in['case'].get('client')
- },
- 'doc_id': doc_id
- }
- return case_info
- def _get_case_info(self):
- """
- Retrieve information of the case
- :return:
- """
- case_info = cases_export_to_report_json(self._caseid)
- # Get customer, user and case title
- case_info['doc_id'] = IrisMakeDocReport.get_docid()
- case_info['user'] = current_user.name
- # Set date
- case_info['date'] = datetime.utcnow().strftime("%Y-%m-%d")
- return case_info
- @staticmethod
- def get_case_summary(caseid):
- """
- Retrieve the case summary from thehive
- :return:
- """
- _crc32, descr = case_get_desc_crc(caseid)
- return descr
- @staticmethod
- def get_case_files(caseid):
- """
- Retrieve the list of files with their hashes
- :return:
- """
- files = CaseReceivedFile.query.filter(
- CaseReceivedFile.case_id == caseid
- ).with_entities(
- CaseReceivedFile.filename,
- CaseReceivedFile.date_added,
- CaseReceivedFile.file_hash,
- CaseReceivedFile.custom_attributes
- ).order_by(
- CaseReceivedFile.date_added
- ).all()
- if files:
- return [row._asdict() for row in files]
- else:
- return []
- @staticmethod
- def get_case_timeline(caseid):
- """
- Retrieve the case timeline
- :return:
- """
- timeline = CasesEvent.query.filter(
- CasesEvent.case_id == caseid
- ).order_by(
- CasesEvent.event_date
- ).all()
- cache_id = {}
- ras = {}
- tim = []
- for row in timeline:
- ras = row
- setattr(ras, 'asset', None)
- as_list = CaseEventsAssets.query.with_entities(
- CaseAssets.asset_id,
- CaseAssets.asset_name,
- AssetsType.asset_name.label('type')
- ).filter(
- CaseEventsAssets.event_id == row.event_id
- ).join(CaseEventsAssets.asset, CaseAssets.asset_type).all()
- alki = []
- for asset in as_list:
- alki.append("{} ({})".format(asset.asset_name, asset.type))
- setattr(ras, 'asset', "\r\n".join(alki))
- tim.append(ras)
- return tim
- @staticmethod
- def get_case_ioc(caseid):
- """
- Retrieve the list of IOC linked to the case
- :return:
- """
- res = Ioc.query.distinct().with_entities(
- Ioc.ioc_value,
- Ioc.ioc_type,
- Ioc.ioc_description,
- Ioc.ioc_tags,
- Ioc.custom_attributes
- ).filter(
- Ioc.case_id == caseid
- ).order_by(Ioc.ioc_type).all()
- if res:
- return [row._asdict() for row in res]
- else:
- return []
- @staticmethod
- def get_case_assets(caseid):
- """
- Retrieve the assets linked ot the case
- :return:
- """
- ret = []
- res = CaseAssets.query.distinct().with_entities(
- CaseAssets.asset_id,
- CaseAssets.asset_name,
- CaseAssets.asset_description,
- CaseAssets.asset_compromise_status_id.label('compromise_status'),
- AssetsType.asset_name.label("type"),
- CaseAssets.custom_attributes,
- CaseAssets.asset_tags
- ).filter(
- CaseAssets.case_id == caseid
- ).join(
- CaseAssets.asset_type
- ).order_by(desc(CaseAssets.asset_compromise_status_id)).all()
- for row in res:
- row = row._asdict()
- row['light_asset_description'] = row['asset_description']
- ial = IocAssetLink.query.with_entities(
- Ioc.ioc_value,
- Ioc.ioc_type,
- Ioc.ioc_description
- ).filter(
- IocAssetLink.asset_id == row['asset_id']
- ).join(
- IocAssetLink.ioc
- ).all()
- if ial:
- row['asset_ioc'] = [row._asdict() for row in ial]
- else:
- row['asset_ioc'] = []
- ret.append(row)
- return ret
- @staticmethod
- def get_docid():
- return "{}".format(
- datetime.utcnow().strftime("%y%m%d_%H%M"))
- @staticmethod
- def markdown_to_text(markdown_string):
- """
- Converts a markdown string to plaintext
- """
- return markdown_string.replace('\n', '</w:t></w:r><w:r/></w:p><w:p><w:r><w:t xml:space="preserve">').replace(
- '#', '')
- class IrisMakeMdReport(IrisReportMaker):
- """
- Generates a MD report for the case
- """
- def __init__(self, tmp_dir, report_id, caseid, safe_mode=False):
- self._tmp = tmp_dir
- self._report_id = report_id
- self._case_info = {}
- self._caseid = caseid
- self.safe_mode = safe_mode
- def generate_md_report(self, doc_type):
- """
- Generate report file
- """
- case_info = self.get_case_info(doc_type)
- if case_info is None:
- return None
- # Get file extension
- report = CaseTemplateReport.query.filter(
- CaseTemplateReport.id == self._report_id).first()
- _, report_format = os.path.splitext(report.internal_reference)
- case_info['case']['for_customer'] = f"{case_info['case'].get('client').get('customer_name')} (legacy::use client.customer_name)"
- # Prepare report name
- name = "{}".format(("{}" + str(report_format)).format(report.naming_format))
- name = name.replace("%code_name%", case_info['doc_id'])
- name = name.replace(
- '%customer%', case_info['case'].get('client').get('customer_name'))
- name = name.replace('%case_name%', case_info['case'].get('name'))
- name = name.replace('%date%', datetime.utcnow().strftime("%Y-%m-%d"))
- # Build output file
- output_file_path = os.path.join(self._tmp, name)
- try:
- env = IrisJinjaEnv()
- env.filters = app.jinja_env.filters
- template_path = os.path.join(app.config['TEMPLATES_PATH'], report.internal_reference)
- with open(template_path, 'r', encoding="utf-8") as template_file:
- template = env.from_string(template_file.read())
- output_text = template.render(case_info)
- with open(output_file_path, 'w', encoding="utf-8") as html_file:
- html_file.write(output_text)
- except Exception as e:
- log.exception("Error while generating report: {}".format(e))
- return None, e.__str__()
- return output_file_path, 'Report generated'
- class QueuingHandler(log.Handler):
- """A thread safe logging.Handler that writes messages into a queue object.
- Designed to work with LoggingWidget so log messages from multiple
- threads can be shown together in a single ttk.Frame.
- The standard logging.QueueHandler/logging.QueueListener can not be used
- for this because the QueueListener runs in a private thread, not the
- main thread.
- Warning: If multiple threads are writing into this Handler, all threads
- must be joined before calling logging.shutdown() or any other log
- destinations will be corrupted.
- """
- def __init__(self, *args, task_self, message_queue, **kwargs):
- """Initialize by copying the queue and sending everything else to superclass."""
- log.Handler.__init__(self, *args, **kwargs)
- self.message_queue = message_queue
- self.task_self = task_self
- def emit(self, record):
- """Add the formatted log message (sans newlines) to the queue."""
- self.message_queue.append(self.format(record).rstrip('\n'))
- self.task_self.update_state(state='PROGRESS',
- meta=list(self.message_queue))
|