| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- # IRIS Source Code
- # contact@dfir-iris.org
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 3 of the License, or (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License
- # along with this program; if not, write to the Free Software Foundation,
- # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- import marshmallow
- from datetime import datetime
- from typing import List, Optional, Union
- from app import db
- from app.datamgmt.case.case_tasks_db import add_task
- from app.datamgmt.manage.manage_case_classifications_db import get_case_classification_by_name
- from app.iris_engine.module_handler.module_handler import call_modules_hook
- from app.models.cases import Cases
- from app.models.models import CaseTemplate
- from app.models.models import Tags
- from app.models.models import NoteDirectory
- from app.models.authorization import User
- from app.schema.marshables import CaseSchema
- from app.schema.marshables import CaseTaskSchema
- from app.schema.marshables import CaseNoteDirectorySchema
- from app.schema.marshables import CaseNoteSchema
- def get_case_templates_list() -> List[dict]:
- """Get a list of case templates
- Returns:
- List[dict]: List of case templates
- """
- case_templates = CaseTemplate.query.with_entities(
- CaseTemplate.id,
- CaseTemplate.name,
- CaseTemplate.display_name,
- CaseTemplate.description,
- CaseTemplate.title_prefix,
- CaseTemplate.author,
- CaseTemplate.created_at,
- CaseTemplate.classification,
- CaseTemplate.updated_at,
- User.name.label('added_by')
- ).join(
- CaseTemplate.created_by_user
- ).all()
- c_cl = [row._asdict() for row in case_templates]
- return c_cl
- def get_case_template_by_id(cur_id: int) -> CaseTemplate:
- """Get a case template
- Args:
- cur_id (int): case template id
- Returns:
- CaseTemplate: Case template
- """
- case_template = CaseTemplate.query.filter_by(id=cur_id).first()
- return case_template
- def delete_case_template_by_id(case_template_id: int):
- """Delete a case template
- Args:
- case_template_id (int): case template id
- """
- CaseTemplate.query.filter_by(id=case_template_id).delete()
- def validate_case_template(data: dict, update: bool = False) -> Optional[str]:
- try:
- if not update:
- # If it's not an update, we check the required fields
- if "name" not in data:
- return "Name is required."
- if "display_name" not in data or not data["display_name"].strip():
- data["display_name"] = data["name"]
- # We check that name is not empty
- if "name" in data and not data["name"].strip():
- return "Name cannot be empty."
- # We check that author length is not above 128 chars
- if "author" in data and len(data["author"]) > 128:
- return "Author cannot be longer than 128 characters."
- # We check that author length is not above 128 chars
- if "author" in data and len(data["author"]) > 128:
- return "Author cannot be longer than 128 characters."
- # We check that prefix length is not above 32 chars
- if "title_prefix" in data and len(data["title_prefix"]) > 32:
- return "Prefix cannot be longer than 32 characters."
- # We check that tags, if any, are a list of strings
- if "tags" in data:
- if not isinstance(data["tags"], list):
- return "Tags must be a list."
- for tag in data["tags"]:
- if not isinstance(tag, str):
- return "Each tag must be a string."
- # We check that tasks, if any, are a list of dictionaries with mandatory keys
- if "tasks" in data:
- if not isinstance(data["tasks"], list):
- return "Tasks must be a list."
- for task in data["tasks"]:
- if not isinstance(task, dict):
- return "Each task must be a dictionary."
- if "title" not in task:
- return "Each task must have a 'title' field."
- if "tags" in task:
- if not isinstance(task["tags"], list):
- return "Task tags must be a list."
- for tag in task["tags"]:
- if not isinstance(tag, str):
- return "Each tag must be a string."
- # We check that note groups, if any, are a list of dictionaries with mandatory keys
- if "note_groups" in data:
- return "Note groups has been replaced by note_directories."
- if "note_directories" in data:
- if not isinstance(data["note_directories"], list):
- return "Note directories must be a list."
- for note_dir in data["note_directories"]:
- if not isinstance(note_dir, dict):
- return "Each note directory must be a dictionary."
- if "title" not in note_dir:
- return "Each note directory must have a 'title' field."
- if "notes" in note_dir:
- if not isinstance(note_dir["notes"], list):
- return "Notes must be a list."
- for note in note_dir["notes"]:
- if not isinstance(note, dict):
- return "Each note must be a dictionary."
- if "title" not in note:
- return "Each note must have a 'title' field."
- # If all checks succeeded, we return None to indicate everything is has been validated
- return None
- except Exception as e:
- return str(e)
- def case_template_pre_modifier(case_schema: CaseSchema, case_template_id: str):
- case_template = get_case_template_by_id(int(case_template_id))
- if not case_template:
- return None
- if case_template.title_prefix:
- case_schema.name = case_template.title_prefix + " " + case_schema.name[0]
- case_classification = get_case_classification_by_name(case_template.classification)
- if case_classification:
- case_schema.classification_id = case_classification.id
- return case_schema
- def case_template_populate_tasks(case: Cases, case_template: CaseTemplate):
- logs = []
- # Update case tasks
- for task_template in case_template.tasks:
- try:
- # validate before saving
- task_schema = CaseTaskSchema()
- # Remap case task template fields
- # Set status to "To Do" which is ID 1
- mapped_task_template = {
- "task_title": task_template['title'],
- "task_description": task_template['description'] if task_template.get('description') else "",
- "task_tags": ",".join(tag for tag in task_template["tags"]) if task_template.get('tags') else "",
- "task_status_id": 1
- }
- mapped_task_template = call_modules_hook('on_preload_task_create', data=mapped_task_template, caseid=case.case_id)
- task = task_schema.load(mapped_task_template)
- assignee_id_list = []
- ctask = add_task(task=task,
- assignee_id_list=assignee_id_list,
- user_id=case.user_id,
- caseid=case.case_id
- )
- ctask = call_modules_hook('on_postload_task_create', data=ctask, caseid=case.case_id)
- if not ctask:
- logs.append("Unable to create task for internal reasons")
- except marshmallow.exceptions.ValidationError as e:
- logs.append(e.messages)
- return logs
- def case_template_populate_notes(case: Cases, note_dir_template: dict, ng: NoteDirectory):
- logs = []
- if note_dir_template.get("notes"):
- for note_template in note_dir_template["notes"]:
- # validate before saving
- note_schema = CaseNoteSchema()
- mapped_note_template = {
- "directory_id": ng.id,
- "note_title": note_template["title"],
- "note_content": note_template["content"] if note_template.get("content") else ""
- }
- mapped_note_template = call_modules_hook('on_preload_note_create',
- data=mapped_note_template,
- caseid=case.case_id)
- note_schema.verify_directory_id(mapped_note_template, caseid=ng.case_id)
- note = note_schema.load(mapped_note_template)
- note.note_creationdate = datetime.utcnow()
- note.note_lastupdate = datetime.utcnow()
- note.note_user = case.user_id
- note.note_case_id = case.case_id
- db.session.add(note)
- note = call_modules_hook('on_postload_note_create', data=note, caseid=case.case_id)
- if not note:
- logs.append("Unable to add note for internal reasons")
- break
- return logs
- def case_template_populate_note_groups(case: Cases, case_template: CaseTemplate):
- logs = []
- # Update case tasks
- if case_template.note_directories:
- case_template.note_directories = case_template.note_directories
- for note_dir_template in case_template.note_directories:
- try:
- # validate before saving
- note_dir_schema = CaseNoteDirectorySchema()
- # Remap case task template fields
- # Set status to "To Do" which is ID 1
- mapped_note_dir_template = {
- "name": note_dir_template['title'],
- "parent_id": None,
- "case_id": case.case_id
- }
- note_dir = note_dir_schema.load(mapped_note_dir_template)
- db.session.add(note_dir)
- db.session.commit()
- if not note_dir:
- logs.append("Unable to add note group for internal reasons")
- break
- logs = case_template_populate_notes(case, note_dir_template, note_dir)
- except marshmallow.exceptions.ValidationError as e:
- logs.append(e.messages)
- return logs
- def case_template_post_modifier(case: Cases, case_template_id: Union[str, int]):
- case_template = get_case_template_by_id(int(case_template_id))
- logs = []
- if not case_template:
- logs.append(f"Case template {case_template_id} not found")
- return None, logs
- # Update summary, we want to append in order not to skip the initial case description
- case.description += "\n" + case_template.summary
- # Update case tags
- for tag_str in case_template.tags:
- tag = Tags(tag_title=tag_str)
- tag = tag.save()
- case.tags.append(tag)
- # Update case tasks
- logs = case_template_populate_tasks(case, case_template)
- if logs:
- return case, logs
- # Update case note groups
- logs = case_template_populate_note_groups(case, case_template)
- if logs:
- return case, logs
- db.session.commit()
- return case, logs
|