| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 |
- # IRIS Source Code
- # Copyright (C) 2021 - Airbus CyberSecurity (SAS)
- # ir@cyberactionlab.net
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 3 of the License, or (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License
- # along with this program; if not, write to the Free Software Foundation,
- # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- from datetime import datetime
- from flask_login import current_user
- from sqlalchemy import desc
- from sqlalchemy import and_
- from app import db
- from app.datamgmt.conversions import convert_sort_direction
- from app.datamgmt.manage.manage_attribute_db import get_default_custom_attributes
- from app.datamgmt.manage.manage_users_db import get_users_list_restricted_from_case
- from app.datamgmt.states import update_tasks_state
- from app.models.models import CaseTasks
- from app.models.models import TaskAssignee
- from app.models.cases import Cases
- from app.models.models import Comments
- from app.models.models import TaskComments
- from app.models.models import TaskStatus
- from app.models.authorization import User
- from app.models.pagination_parameters import PaginationParameters
- def get_tasks_status():
- return TaskStatus.query.all()
- def get_filtered_tasks(case_identifier, pagination_parameters: PaginationParameters):
- query = CaseTasks.query.filter(
- CaseTasks.task_case_id == case_identifier
- ).join(
- CaseTasks.status
- ).order_by(
- desc(TaskStatus.status_name)
- )
- sort_by = pagination_parameters.get_order_by()
- if sort_by is not None:
- order_func = convert_sort_direction(pagination_parameters.get_direction())
- if hasattr(CaseTasks, sort_by):
- query = query.order_by(order_func(getattr(CaseTasks, sort_by)))
- return query.paginate(page=pagination_parameters.get_page(), per_page=pagination_parameters.get_per_page(), error_out=False)
- def get_tasks_with_assignees(caseid):
- tasks = CaseTasks.query.with_entities(
- CaseTasks.id.label('task_id'),
- CaseTasks.task_uuid,
- CaseTasks.task_title,
- CaseTasks.task_description,
- CaseTasks.task_open_date,
- CaseTasks.task_tags,
- CaseTasks.task_status_id,
- TaskStatus.status_name,
- TaskStatus.status_bscolor
- ).filter(
- CaseTasks.task_case_id == caseid
- ).join(
- CaseTasks.status
- ).order_by(
- desc(TaskStatus.status_name)
- ).all()
- if not tasks:
- return None
- tasks = [c._asdict() for c in tasks]
- task_with_assignees = []
- for task in tasks:
- task_id = task['task_id']
- get_assignee_list = TaskAssignee.query.with_entities(
- TaskAssignee.task_id,
- User.user,
- User.id,
- User.name
- ).join(
- TaskAssignee.user
- ).filter(
- TaskAssignee.task_id == task_id
- ).all()
- assignee_list = {}
- for member in get_assignee_list:
- if member.task_id not in assignee_list:
- assignee_list[member.task_id] = [{
- 'user': member.user,
- 'name': member.name,
- 'id': member.id
- }]
- else:
- assignee_list[member.task_id].append({
- 'user': member.user,
- 'name': member.name,
- 'id': member.id
- })
- task['task_assignees'] = assignee_list.get(task['task_id'], [])
- task_with_assignees.append(task)
- return task_with_assignees
- def get_task(task_id: int) -> CaseTasks:
- return CaseTasks.query.filter(CaseTasks.id == task_id).first()
- def get_task_assignees(task_identifier: int):
- get_assignee_list = TaskAssignee.query.with_entities(
- TaskAssignee.task_id,
- User.user,
- User.id,
- User.name
- ).join(
- TaskAssignee.user
- ).filter(
- TaskAssignee.task_id == task_identifier
- ).all()
- assignee_list = {}
- for member in get_assignee_list:
- if member.task_id not in assignee_list:
- assignee_list[member.task_id] = [{
- 'user': member.user,
- 'name': member.name,
- 'id': member.id
- }]
- else:
- assignee_list[member.task_id].append({
- 'user': member.user,
- 'name': member.name,
- 'id': member.id
- })
- return assignee_list.get(task_identifier, [])
- def update_task_status(task_status, task_id, caseid):
- task = get_task(task_id)
- if task:
- try:
- task.task_status_id = task_status
- update_tasks_state(caseid=caseid)
- db.session.commit()
- return True
- except:
- return False
- else:
- return False
- def update_task_assignees(task_identifier, task_assignee_list, caseid):
- cur_assignee_list = TaskAssignee.query.with_entities(
- TaskAssignee.user_id
- ).filter(TaskAssignee.task_id == task_identifier).all()
- # Some formatting
- set_cur_assignees = set([assignee[0] for assignee in cur_assignee_list])
- set_assignees = set(int(assignee) for assignee in task_assignee_list)
- assignees_to_add = set_assignees - set_cur_assignees
- assignees_to_remove = set_cur_assignees - set_assignees
- allowed_users = [u.get('user_id') for u in get_users_list_restricted_from_case(caseid)]
- for uid in assignees_to_add:
- if uid not in allowed_users:
- continue
- user = User.query.filter(User.id == uid).first()
- if user:
- ta = TaskAssignee()
- ta.task_id = task_identifier
- ta.user_id = user.id
- db.session.add(ta)
- for uid in assignees_to_remove:
- TaskAssignee.query.filter(
- and_(TaskAssignee.task_id == task_identifier,
- TaskAssignee.user_id == uid)
- ).delete()
- db.session.commit()
- def add_task(task, assignee_id_list, user_id, caseid):
- now = datetime.now()
- task.task_case_id = caseid
- task.task_userid_open = user_id
- task.task_userid_update = user_id
- task.task_open_date = now
- task.task_last_update = now
- task.custom_attributes = task.custom_attributes if task.custom_attributes else get_default_custom_attributes('task')
- db.session.add(task)
- update_tasks_state(caseid=caseid)
- db.session.commit()
- update_task_status(task.task_status_id, task.id, caseid)
- update_task_assignees(task.id, assignee_id_list, caseid)
- return task
- def get_case_task_comments(task_id):
- return Comments.query.filter(
- TaskComments.comment_task_id == task_id
- ).join(
- TaskComments,
- Comments.comment_id == TaskComments.comment_id
- ).order_by(
- Comments.comment_date.asc()
- ).all()
- def add_comment_to_task(task_id, comment_id):
- ec = TaskComments()
- ec.comment_task_id = task_id
- ec.comment_id = comment_id
- db.session.add(ec)
- db.session.commit()
- def get_case_tasks_comments_count(tasks_list):
- return TaskComments.query.filter(
- TaskComments.comment_task_id.in_(tasks_list)
- ).with_entities(
- TaskComments.comment_task_id,
- TaskComments.comment_id
- ).group_by(
- TaskComments.comment_task_id,
- TaskComments.comment_id
- ).all()
- def get_case_task_comment(task_id, comment_id):
- return TaskComments.query.filter(
- TaskComments.comment_task_id == task_id,
- TaskComments.comment_id == comment_id
- ).with_entities(
- Comments.comment_id,
- Comments.comment_text,
- Comments.comment_date,
- Comments.comment_update_date,
- Comments.comment_uuid,
- User.name,
- User.user
- ).join(
- TaskComments.comment
- ).join(
- Comments.user
- ).first()
- def delete_task(task_id):
- with db.session.begin_nested():
- TaskAssignee.query.filter(
- TaskAssignee.task_id == task_id
- ).delete()
- com_ids = TaskComments.query.with_entities(
- TaskComments.comment_id
- ).filter(
- TaskComments.comment_task_id == task_id
- ).all()
- com_ids = [c.comment_id for c in com_ids]
- TaskComments.query.filter(TaskComments.comment_id.in_(com_ids)).delete()
- Comments.query.filter(Comments.comment_id.in_(com_ids)).delete()
- CaseTasks.query.filter(
- CaseTasks.id == task_id
- ).delete()
- def delete_task_comment(task_id, comment_id):
- comment = Comments.query.filter(
- Comments.comment_id == comment_id,
- Comments.comment_user_id == current_user.id
- ).first()
- if not comment:
- return False, "You are not allowed to delete this comment"
- TaskComments.query.filter(
- TaskComments.comment_task_id == task_id,
- TaskComments.comment_id == comment_id
- ).delete()
- db.session.delete(comment)
- db.session.commit()
- return True, "Comment deleted"
- def get_tasks_cases_mapping(open_cases_only=False):
- condition = Cases.close_date == None if open_cases_only else True
- return CaseTasks.query.filter(
- condition
- ).with_entities(
- CaseTasks.task_case_id,
- CaseTasks.task_status_id
- ).join(
- CaseTasks.case
- ).all()
|