| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- # IRIS Source Code
- # Copyright (C) 2024 - DFIR-IRIS
- # contact@dfir-iris.org
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 3 of the License, or (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License
- # along with this program; if not, write to the Free Software Foundation,
- # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- from datetime import datetime
- from flask_sqlalchemy.pagination import Pagination
- from flask_login import current_user
- from app import db
- from app.datamgmt.case.case_tasks_db import delete_task
- from app.datamgmt.case.case_tasks_db import add_task
- from app.datamgmt.case.case_tasks_db import update_task_assignees
- from app.datamgmt.case.case_tasks_db import get_task
- from app.datamgmt.case.case_tasks_db import get_filtered_tasks
- from app.datamgmt.states import update_tasks_state
- from app.iris_engine.module_handler.module_handler import call_modules_hook
- from app.iris_engine.utils.tracker import track_activity
- from app.models.models import CaseTasks
- from app.models.pagination_parameters import PaginationParameters
- from app.schema.marshables import CaseTaskSchema
- from app.business.errors import BusinessProcessingError
- from app.business.errors import ObjectNotFoundError
- from marshmallow.exceptions import ValidationError
- def _load(request_data, **kwargs):
- try:
- add_task_schema = CaseTaskSchema()
- return add_task_schema.load(request_data, **kwargs)
- except ValidationError as e:
- raise BusinessProcessingError('Data error', e.messages)
- def tasks_delete(task: CaseTasks):
- call_modules_hook('on_preload_task_delete', data=task.id)
- delete_task(task.id)
- update_tasks_state(caseid=task.task_case_id)
- call_modules_hook('on_postload_task_delete', data=task.id, caseid=task.task_case_id)
- track_activity(f'deleted task "{task.task_title}"')
- def tasks_create(case_identifier: int, request_json: dict) -> (str, CaseTasks):
- request_data = call_modules_hook('on_preload_task_create', data=request_json, caseid=case_identifier)
- if 'task_assignee_id' in request_data or 'task_assignees_id' not in request_data:
- raise BusinessProcessingError('task_assignee_id is not valid anymore since v1.5.0')
- task_assignee_list = request_data['task_assignees_id']
- del request_data['task_assignees_id']
- task = _load(request_data)
- ctask = add_task(task=task,
- assignee_id_list=task_assignee_list,
- user_id=current_user.id,
- caseid=case_identifier
- )
- ctask = call_modules_hook('on_postload_task_create', data=ctask, caseid=case_identifier)
- if ctask:
- track_activity(f'added task "{ctask.task_title}"', caseid=case_identifier)
- return f'Task "{ctask.task_title}" added', ctask
- raise BusinessProcessingError("Unable to create task for internal reasons")
- def tasks_get(identifier) -> CaseTasks:
- task = get_task(identifier)
- if not task:
- raise ObjectNotFoundError()
- return task
- def tasks_filter(case_identifier, pagination_parameters: PaginationParameters) -> Pagination:
- return get_filtered_tasks(case_identifier, pagination_parameters)
- def tasks_update(task: CaseTasks, request_json):
- case_identifier = task.task_case_id
- request_data = call_modules_hook('on_preload_task_update', data=request_json, caseid=case_identifier)
- if 'task_assignee_id' in request_data or 'task_assignees_id' not in request_data:
- raise BusinessProcessingError('task_assignee_id is not valid anymore since v1.5.0')
- task_assignee_list = request_data['task_assignees_id']
- del request_data['task_assignees_id']
- request_data['id'] = task.id
- task = _load(request_data, instance=task)
- task.task_userid_update = current_user.id
- task.task_last_update = datetime.utcnow()
- update_task_assignees(task.id, task_assignee_list, case_identifier)
- update_tasks_state(caseid=case_identifier)
- db.session.commit()
- task = call_modules_hook('on_postload_task_update', data=task, caseid=case_identifier)
- if not task:
- raise BusinessProcessingError('Unable to update task for internal reasons')
- track_activity(f'updated task "{task.task_title}" (status {task.status.status_name})', caseid=case_identifier)
- return task
|