Sin descripción

manage_case_templates_db.py 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. # IRIS Source Code
  2. # contact@dfir-iris.org
  3. #
  4. # This program is free software; you can redistribute it and/or
  5. # modify it under the terms of the GNU Lesser General Public
  6. # License as published by the Free Software Foundation; either
  7. # version 3 of the License, or (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. # Lesser General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Lesser General Public License
  15. # along with this program; if not, write to the Free Software Foundation,
  16. # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  17. import marshmallow
  18. from datetime import datetime
  19. from typing import List, Optional, Union
  20. from app import db
  21. from app.datamgmt.case.case_tasks_db import add_task
  22. from app.datamgmt.manage.manage_case_classifications_db import get_case_classification_by_name
  23. from app.iris_engine.module_handler.module_handler import call_modules_hook
  24. from app.models.cases import Cases
  25. from app.models.models import CaseTemplate
  26. from app.models.models import Tags
  27. from app.models.models import NoteDirectory
  28. from app.models.authorization import User
  29. from app.schema.marshables import CaseSchema
  30. from app.schema.marshables import CaseTaskSchema
  31. from app.schema.marshables import CaseNoteDirectorySchema
  32. from app.schema.marshables import CaseNoteSchema
  33. def get_case_templates_list() -> List[dict]:
  34. """Get a list of case templates
  35. Returns:
  36. List[dict]: List of case templates
  37. """
  38. case_templates = CaseTemplate.query.with_entities(
  39. CaseTemplate.id,
  40. CaseTemplate.name,
  41. CaseTemplate.display_name,
  42. CaseTemplate.description,
  43. CaseTemplate.title_prefix,
  44. CaseTemplate.author,
  45. CaseTemplate.created_at,
  46. CaseTemplate.classification,
  47. CaseTemplate.updated_at,
  48. User.name.label('added_by')
  49. ).join(
  50. CaseTemplate.created_by_user
  51. ).all()
  52. c_cl = [row._asdict() for row in case_templates]
  53. return c_cl
  54. def get_case_template_by_id(cur_id: int) -> CaseTemplate:
  55. """Get a case template
  56. Args:
  57. cur_id (int): case template id
  58. Returns:
  59. CaseTemplate: Case template
  60. """
  61. case_template = CaseTemplate.query.filter_by(id=cur_id).first()
  62. return case_template
  63. def delete_case_template_by_id(case_template_id: int):
  64. """Delete a case template
  65. Args:
  66. case_template_id (int): case template id
  67. """
  68. CaseTemplate.query.filter_by(id=case_template_id).delete()
  69. def validate_case_template(data: dict, update: bool = False) -> Optional[str]:
  70. try:
  71. if not update:
  72. # If it's not an update, we check the required fields
  73. if "name" not in data:
  74. return "Name is required."
  75. if "display_name" not in data or not data["display_name"].strip():
  76. data["display_name"] = data["name"]
  77. # We check that name is not empty
  78. if "name" in data and not data["name"].strip():
  79. return "Name cannot be empty."
  80. # We check that author length is not above 128 chars
  81. if "author" in data and len(data["author"]) > 128:
  82. return "Author cannot be longer than 128 characters."
  83. # We check that author length is not above 128 chars
  84. if "author" in data and len(data["author"]) > 128:
  85. return "Author cannot be longer than 128 characters."
  86. # We check that prefix length is not above 32 chars
  87. if "title_prefix" in data and len(data["title_prefix"]) > 32:
  88. return "Prefix cannot be longer than 32 characters."
  89. # We check that tags, if any, are a list of strings
  90. if "tags" in data:
  91. if not isinstance(data["tags"], list):
  92. return "Tags must be a list."
  93. for tag in data["tags"]:
  94. if not isinstance(tag, str):
  95. return "Each tag must be a string."
  96. # We check that tasks, if any, are a list of dictionaries with mandatory keys
  97. if "tasks" in data:
  98. if not isinstance(data["tasks"], list):
  99. return "Tasks must be a list."
  100. for task in data["tasks"]:
  101. if not isinstance(task, dict):
  102. return "Each task must be a dictionary."
  103. if "title" not in task:
  104. return "Each task must have a 'title' field."
  105. if "tags" in task:
  106. if not isinstance(task["tags"], list):
  107. return "Task tags must be a list."
  108. for tag in task["tags"]:
  109. if not isinstance(tag, str):
  110. return "Each tag must be a string."
  111. # We check that note groups, if any, are a list of dictionaries with mandatory keys
  112. if "note_groups" in data:
  113. return "Note groups has been replaced by note_directories."
  114. if "note_directories" in data:
  115. if not isinstance(data["note_directories"], list):
  116. return "Note directories must be a list."
  117. for note_dir in data["note_directories"]:
  118. if not isinstance(note_dir, dict):
  119. return "Each note directory must be a dictionary."
  120. if "title" not in note_dir:
  121. return "Each note directory must have a 'title' field."
  122. if "notes" in note_dir:
  123. if not isinstance(note_dir["notes"], list):
  124. return "Notes must be a list."
  125. for note in note_dir["notes"]:
  126. if not isinstance(note, dict):
  127. return "Each note must be a dictionary."
  128. if "title" not in note:
  129. return "Each note must have a 'title' field."
  130. # If all checks succeeded, we return None to indicate everything is has been validated
  131. return None
  132. except Exception as e:
  133. return str(e)
  134. def case_template_pre_modifier(case_schema: CaseSchema, case_template_id: str):
  135. case_template = get_case_template_by_id(int(case_template_id))
  136. if not case_template:
  137. return None
  138. if case_template.title_prefix:
  139. case_schema.name = case_template.title_prefix + " " + case_schema.name[0]
  140. case_classification = get_case_classification_by_name(case_template.classification)
  141. if case_classification:
  142. case_schema.classification_id = case_classification.id
  143. return case_schema
  144. def case_template_populate_tasks(case: Cases, case_template: CaseTemplate):
  145. logs = []
  146. # Update case tasks
  147. for task_template in case_template.tasks:
  148. try:
  149. # validate before saving
  150. task_schema = CaseTaskSchema()
  151. # Remap case task template fields
  152. # Set status to "To Do" which is ID 1
  153. mapped_task_template = {
  154. "task_title": task_template['title'],
  155. "task_description": task_template['description'] if task_template.get('description') else "",
  156. "task_tags": ",".join(tag for tag in task_template["tags"]) if task_template.get('tags') else "",
  157. "task_status_id": 1
  158. }
  159. mapped_task_template = call_modules_hook('on_preload_task_create', data=mapped_task_template, caseid=case.case_id)
  160. task = task_schema.load(mapped_task_template)
  161. assignee_id_list = []
  162. ctask = add_task(task=task,
  163. assignee_id_list=assignee_id_list,
  164. user_id=case.user_id,
  165. caseid=case.case_id
  166. )
  167. ctask = call_modules_hook('on_postload_task_create', data=ctask, caseid=case.case_id)
  168. if not ctask:
  169. logs.append("Unable to create task for internal reasons")
  170. except marshmallow.exceptions.ValidationError as e:
  171. logs.append(e.messages)
  172. return logs
  173. def case_template_populate_notes(case: Cases, note_dir_template: dict, ng: NoteDirectory):
  174. logs = []
  175. if note_dir_template.get("notes"):
  176. for note_template in note_dir_template["notes"]:
  177. # validate before saving
  178. note_schema = CaseNoteSchema()
  179. mapped_note_template = {
  180. "directory_id": ng.id,
  181. "note_title": note_template["title"],
  182. "note_content": note_template["content"] if note_template.get("content") else ""
  183. }
  184. mapped_note_template = call_modules_hook('on_preload_note_create',
  185. data=mapped_note_template,
  186. caseid=case.case_id)
  187. note_schema.verify_directory_id(mapped_note_template, caseid=ng.case_id)
  188. note = note_schema.load(mapped_note_template)
  189. note.note_creationdate = datetime.utcnow()
  190. note.note_lastupdate = datetime.utcnow()
  191. note.note_user = case.user_id
  192. note.note_case_id = case.case_id
  193. db.session.add(note)
  194. note = call_modules_hook('on_postload_note_create', data=note, caseid=case.case_id)
  195. if not note:
  196. logs.append("Unable to add note for internal reasons")
  197. break
  198. return logs
  199. def case_template_populate_note_groups(case: Cases, case_template: CaseTemplate):
  200. logs = []
  201. # Update case tasks
  202. if case_template.note_directories:
  203. case_template.note_directories = case_template.note_directories
  204. for note_dir_template in case_template.note_directories:
  205. try:
  206. # validate before saving
  207. note_dir_schema = CaseNoteDirectorySchema()
  208. # Remap case task template fields
  209. # Set status to "To Do" which is ID 1
  210. mapped_note_dir_template = {
  211. "name": note_dir_template['title'],
  212. "parent_id": None,
  213. "case_id": case.case_id
  214. }
  215. note_dir = note_dir_schema.load(mapped_note_dir_template)
  216. db.session.add(note_dir)
  217. db.session.commit()
  218. if not note_dir:
  219. logs.append("Unable to add note group for internal reasons")
  220. break
  221. logs = case_template_populate_notes(case, note_dir_template, note_dir)
  222. except marshmallow.exceptions.ValidationError as e:
  223. logs.append(e.messages)
  224. return logs
  225. def case_template_post_modifier(case: Cases, case_template_id: Union[str, int]):
  226. case_template = get_case_template_by_id(int(case_template_id))
  227. logs = []
  228. if not case_template:
  229. logs.append(f"Case template {case_template_id} not found")
  230. return None, logs
  231. # Update summary, we want to append in order not to skip the initial case description
  232. case.description += "\n" + case_template.summary
  233. # Update case tags
  234. for tag_str in case_template.tags:
  235. tag = Tags(tag_title=tag_str)
  236. tag = tag.save()
  237. case.tags.append(tag)
  238. # Update case tasks
  239. logs = case_template_populate_tasks(case, case_template)
  240. if logs:
  241. return case, logs
  242. # Update case note groups
  243. logs = case_template_populate_note_groups(case, case_template)
  244. if logs:
  245. return case, logs
  246. db.session.commit()
  247. return case, logs