Ei kuvausta

tasks.py 4.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. # IRIS Source Code
  2. # Copyright (C) 2024 - DFIR-IRIS
  3. # contact@dfir-iris.org
  4. #
  5. # This program is free software; you can redistribute it and/or
  6. # modify it under the terms of the GNU Lesser General Public
  7. # License as published by the Free Software Foundation; either
  8. # version 3 of the License, or (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. # Lesser General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU Lesser General Public License
  16. # along with this program; if not, write to the Free Software Foundation,
  17. # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18. from datetime import datetime
  19. from flask_sqlalchemy.pagination import Pagination
  20. from flask_login import current_user
  21. from app import db
  22. from app.datamgmt.case.case_tasks_db import delete_task
  23. from app.datamgmt.case.case_tasks_db import add_task
  24. from app.datamgmt.case.case_tasks_db import update_task_assignees
  25. from app.datamgmt.case.case_tasks_db import get_task
  26. from app.datamgmt.case.case_tasks_db import get_filtered_tasks
  27. from app.datamgmt.states import update_tasks_state
  28. from app.iris_engine.module_handler.module_handler import call_modules_hook
  29. from app.iris_engine.utils.tracker import track_activity
  30. from app.models.models import CaseTasks
  31. from app.models.pagination_parameters import PaginationParameters
  32. from app.schema.marshables import CaseTaskSchema
  33. from app.business.errors import BusinessProcessingError
  34. from app.business.errors import ObjectNotFoundError
  35. from marshmallow.exceptions import ValidationError
  36. def _load(request_data, **kwargs):
  37. try:
  38. add_task_schema = CaseTaskSchema()
  39. return add_task_schema.load(request_data, **kwargs)
  40. except ValidationError as e:
  41. raise BusinessProcessingError('Data error', e.messages)
  42. def tasks_delete(task: CaseTasks):
  43. call_modules_hook('on_preload_task_delete', data=task.id)
  44. delete_task(task.id)
  45. update_tasks_state(caseid=task.task_case_id)
  46. call_modules_hook('on_postload_task_delete', data=task.id, caseid=task.task_case_id)
  47. track_activity(f'deleted task "{task.task_title}"')
  48. def tasks_create(case_identifier: int, request_json: dict) -> (str, CaseTasks):
  49. request_data = call_modules_hook('on_preload_task_create', data=request_json, caseid=case_identifier)
  50. if 'task_assignee_id' in request_data or 'task_assignees_id' not in request_data:
  51. raise BusinessProcessingError('task_assignee_id is not valid anymore since v1.5.0')
  52. task_assignee_list = request_data['task_assignees_id']
  53. del request_data['task_assignees_id']
  54. task = _load(request_data)
  55. ctask = add_task(task=task,
  56. assignee_id_list=task_assignee_list,
  57. user_id=current_user.id,
  58. caseid=case_identifier
  59. )
  60. ctask = call_modules_hook('on_postload_task_create', data=ctask, caseid=case_identifier)
  61. if ctask:
  62. track_activity(f'added task "{ctask.task_title}"', caseid=case_identifier)
  63. return f'Task "{ctask.task_title}" added', ctask
  64. raise BusinessProcessingError("Unable to create task for internal reasons")
  65. def tasks_get(identifier) -> CaseTasks:
  66. task = get_task(identifier)
  67. if not task:
  68. raise ObjectNotFoundError()
  69. return task
  70. def tasks_filter(case_identifier, pagination_parameters: PaginationParameters) -> Pagination:
  71. return get_filtered_tasks(case_identifier, pagination_parameters)
  72. def tasks_update(task: CaseTasks, request_json):
  73. case_identifier = task.task_case_id
  74. request_data = call_modules_hook('on_preload_task_update', data=request_json, caseid=case_identifier)
  75. if 'task_assignee_id' in request_data or 'task_assignees_id' not in request_data:
  76. raise BusinessProcessingError('task_assignee_id is not valid anymore since v1.5.0')
  77. task_assignee_list = request_data['task_assignees_id']
  78. del request_data['task_assignees_id']
  79. request_data['id'] = task.id
  80. task = _load(request_data, instance=task)
  81. task.task_userid_update = current_user.id
  82. task.task_last_update = datetime.utcnow()
  83. update_task_assignees(task.id, task_assignee_list, case_identifier)
  84. update_tasks_state(caseid=case_identifier)
  85. db.session.commit()
  86. task = call_modules_hook('on_postload_task_update', data=task, caseid=case_identifier)
  87. if not task:
  88. raise BusinessProcessingError('Unable to update task for internal reasons')
  89. track_activity(f'updated task "{task.task_title}" (status {task.status.status_name})', caseid=case_identifier)
  90. return task