| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- # IRIS Source Code
- # Copyright (C) 2021 - Airbus CyberSecurity (SAS)
- # ir@cyberactionlab.net
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 3 of the License, or (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License
- # along with this program; if not, write to the Free Software Foundation,
- # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- from flask_login import current_user
- from sqlalchemy import and_
- from sqlalchemy import desc
- from app import db
- from app.models.models import CaseTasks
- from app.models.models import TaskAssignee
- from app.models.models import ReviewStatus
- from app.models.cases import Cases
- from app.models.models import GlobalTasks
- from app.models.models import TaskStatus
- from app.models.authorization import User
- def list_global_tasks():
- ct = GlobalTasks.query.with_entities(
- GlobalTasks.id.label("task_id"),
- GlobalTasks.task_uuid,
- GlobalTasks.task_title,
- GlobalTasks.task_description,
- GlobalTasks.task_last_update,
- GlobalTasks.task_tags,
- User.name.label('user_name'),
- GlobalTasks.task_assignee_id,
- GlobalTasks.task_status_id,
- TaskStatus.status_name,
- TaskStatus.status_bscolor
- ).join(
- GlobalTasks.user_assigned
- ).order_by(
- desc(TaskStatus.status_name)
- ).join(
- GlobalTasks.status
- ).all()
- return ct
- def get_global_task(task_id):
- ct = GlobalTasks.query.with_entities(
- GlobalTasks.id.label("task_id"),
- GlobalTasks.task_uuid,
- GlobalTasks.task_title,
- GlobalTasks.task_description,
- GlobalTasks.task_last_update,
- GlobalTasks.task_tags,
- User.name.label('user_name'),
- GlobalTasks.task_assignee_id,
- GlobalTasks.task_status_id,
- TaskStatus.status_name,
- TaskStatus.status_bscolor
- ).filter(
- GlobalTasks.id == task_id
- ).join(
- GlobalTasks.user_assigned
- ).join(
- GlobalTasks.status
- ).order_by(
- desc(TaskStatus.status_name)
- ).first()
- return ct
- def get_tasks_status():
- return TaskStatus.query.all()
- def list_user_reviews():
- ct = Cases.query.with_entities(
- Cases.case_id,
- Cases.name,
- ReviewStatus.status_name,
- ReviewStatus.id.label('status_id')
- ).join(
- Cases.review_status
- ).filter(
- Cases.reviewer_id == current_user.id,
- ReviewStatus.status_name != 'Reviewed',
- ReviewStatus.status_name != 'Not reviewed'
- ).all()
- return ct
- def list_user_tasks():
- ct = CaseTasks.query.with_entities(
- CaseTasks.id.label("task_id"),
- CaseTasks.task_title,
- CaseTasks.task_description,
- CaseTasks.task_last_update,
- CaseTasks.task_tags,
- Cases.name.label('task_case'),
- CaseTasks.task_case_id.label('case_id'),
- CaseTasks.task_status_id,
- TaskStatus.status_name,
- TaskStatus.status_bscolor
- ).join(
- CaseTasks.case
- ).order_by(
- desc(TaskStatus.status_name)
- ).filter(and_(
- TaskStatus.status_name != 'Done',
- TaskStatus.status_name != 'Canceled'
- )).join(
- CaseTasks.status,
- ).filter(and_(
- TaskAssignee.task_id == CaseTasks.id,
- TaskAssignee.user_id == current_user.id
- )).all()
- return ct
- def update_gtask_status(task_id, status):
- if task_id != 0:
- task = GlobalTasks.query.filter(
- GlobalTasks.id == task_id
- ).first()
- try:
- task.task_status_id = status
- db.session.commit()
- return task
- except:
- pass
- return None
- def update_utask_status(task_id, status, case_id):
- if task_id != 0:
- task = CaseTasks.query.filter(
- CaseTasks.id == task_id,
- CaseTasks.task_case_id == case_id
- ).first()
- if task:
- try:
- task.task_status_id = status
- db.session.commit()
- return True
- except:
- pass
- return False
- def get_task_status(task_status_id):
- ret = TaskStatus.query.filter(
- TaskStatus.id == task_status_id
- ).first()
- return ret
- def list_user_cases(show_all=False):
- if show_all:
- return Cases.query.filter(
- Cases.owner_id == current_user.id
- ).all()
- return Cases.query.filter(
- Cases.owner_id == current_user.id,
- Cases.close_date == None
- ).all()
|