| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- # IRIS Source Code
- # contact@dfir-iris.org
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 3 of the License, or (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License
- # along with this program; if not, write to the Free Software Foundation,
- # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- from flask_login import current_user
- from sqlalchemy import and_
- from app import db
- from app.datamgmt.case.case_db import get_case
- from app.datamgmt.manage.manage_cases_db import list_cases_id
- from app.iris_engine.access_control.utils import ac_access_level_mask_from_val_list, ac_ldp_group_removal
- from app.iris_engine.access_control.utils import ac_access_level_to_list
- from app.iris_engine.access_control.utils import ac_auto_update_user_effective_access
- from app.iris_engine.access_control.utils import ac_permission_to_list
- from app.models.cases import Cases
- from app.models.authorization import Group
- from app.models.authorization import GroupCaseAccess
- from app.models.authorization import User
- from app.models.authorization import UserGroup
- from app.schema.marshables import AuthorizationGroupSchema
- def get_groups_list():
- groups = Group.query.all()
- return groups
- def get_groups_list_hr_perms():
- groups = get_groups_list()
- get_membership_list = UserGroup.query.with_entities(
- UserGroup.group_id,
- User.user,
- User.id,
- User.name
- ).join(UserGroup.user).all()
- membership_list = {}
- for member in get_membership_list:
- if member.group_id not in membership_list:
- membership_list[member.group_id] = [{
- 'user': member.user,
- 'name': member.name,
- 'id': member.id
- }]
- else:
- membership_list[member.group_id].append({
- 'user': member.user,
- 'name': member.name,
- 'id': member.id
- })
- groups = AuthorizationGroupSchema().dump(groups, many=True)
- for group in groups:
- perms = ac_permission_to_list(group['group_permissions'])
- group['group_permissions_list'] = perms
- group['group_members'] = membership_list.get(group['group_id'], [])
- return groups
- def get_group(group_id):
- group = Group.query.filter(Group.group_id == group_id).first()
- return group
- def get_group_by_name(group_name):
- groups = Group.query.filter(Group.group_name == group_name)
- return groups.first()
- def get_group_with_members(group_id):
- group = get_group(group_id)
- if not group:
- return None
- get_membership_list = UserGroup.query.with_entities(
- UserGroup.group_id,
- User.user,
- User.id,
- User.name
- ).join(
- UserGroup.user
- ).filter(
- UserGroup.group_id == group_id
- ).all()
- membership_list = {}
- for member in get_membership_list:
- if member.group_id not in membership_list:
- membership_list[member.group_id] = [{
- 'user': member.user,
- 'name': member.name,
- 'id': member.id
- }]
- else:
- membership_list[member.group_id].append({
- 'user': member.user,
- 'name': member.name,
- 'id': member.id
- })
- perms = ac_permission_to_list(group.group_permissions)
- setattr(group, 'group_permissions_list', perms)
- setattr(group, 'group_members', membership_list.get(group.group_id, []))
- return group
- def get_group_details(group_id):
- group = get_group_with_members(group_id)
- if not group:
- return group
- group_accesses = GroupCaseAccess.query.with_entities(
- GroupCaseAccess.access_level,
- GroupCaseAccess.case_id,
- Cases.name.label('case_name')
- ).join(
- GroupCaseAccess.case
- ).filter(
- GroupCaseAccess.group_id == group_id
- ).all()
- group_cases_access = []
- for kgroup in group_accesses:
- group_cases_access.append({
- "access_level": kgroup.access_level,
- "access_level_list": ac_access_level_to_list(kgroup.access_level),
- "case_id": kgroup.case_id,
- "case_name": kgroup.case_name
- })
- setattr(group, 'group_cases_access', group_cases_access)
- return group
- def update_group_members(group, members):
- if not group:
- return None
- cur_groups = UserGroup.query.with_entities(
- UserGroup.user_id
- ).filter(UserGroup.group_id == group.group_id).all()
- set_cur_groups = set([grp[0] for grp in cur_groups])
- set_members = set(int(mber) for mber in members)
- users_to_add = set_members - set_cur_groups
- users_to_remove = set_cur_groups - set_members
- for uid in users_to_add:
- user = User.query.filter(User.id == uid).first()
- if user:
- ug = UserGroup()
- ug.group_id = group.group_id
- ug.user_id = user.id
- db.session.add(ug)
- db.session.commit()
- ac_auto_update_user_effective_access(uid)
- for uid in users_to_remove:
- if current_user.id == uid and ac_ldp_group_removal(uid, group.group_id):
- continue
- UserGroup.query.filter(
- and_(UserGroup.group_id == group.group_id,
- UserGroup.user_id == uid)
- ).delete()
- db.session.commit()
- ac_auto_update_user_effective_access(uid)
- return group
- def remove_user_from_group(group, member):
- if not group:
- return None
- UserGroup.query.filter(
- and_(UserGroup.group_id == group.group_id,
- UserGroup.user_id == member.id)
- ).delete()
- db.session.commit()
- ac_auto_update_user_effective_access(member.id)
- return group
- def delete_group(group):
- if not group:
- return None
- UserGroup.query.filter(UserGroup.group_id == group.group_id).delete()
- GroupCaseAccess.query.filter(GroupCaseAccess.group_id == group.group_id).delete()
- db.session.delete(group)
- db.session.commit()
- def add_case_access_to_group(group, cases_list, access_level):
- if not group:
- return None, "Invalid group"
- for case_id in cases_list:
- case = get_case(case_id)
- if not case:
- return None, "Invalid case ID"
- access_level_mask = ac_access_level_mask_from_val_list([access_level])
- ocas = GroupCaseAccess.query.filter(
- and_(
- GroupCaseAccess.case_id == case_id,
- GroupCaseAccess.group_id == group.group_id
- )).all()
- if ocas:
- for oca in ocas:
- db.session.delete(oca)
- oca = GroupCaseAccess()
- oca.group_id = group.group_id
- oca.access_level = access_level_mask
- oca.case_id = case_id
- db.session.add(oca)
- db.session.commit()
- return group, "Updated"
- def add_all_cases_access_to_group(group, access_level):
- if not group:
- return None, "Invalid group"
- for case_id in list_cases_id():
- access_level_mask = ac_access_level_mask_from_val_list([access_level])
- ocas = GroupCaseAccess.query.filter(
- and_(
- GroupCaseAccess.case_id == case_id,
- GroupCaseAccess.group_id == group.group_id
- )).all()
- if ocas:
- for oca in ocas:
- db.session.delete(oca)
- oca = GroupCaseAccess()
- oca.group_id = group.group_id
- oca.access_level = access_level_mask
- oca.case_id = case_id
- db.session.add(oca)
- db.session.commit()
- return group, "Updated"
- def remove_case_access_from_group(group_id, case_id):
- if not group_id or type(group_id) is not int:
- return
- if not case_id or type(case_id) is not int:
- return
- GroupCaseAccess.query.filter(
- and_(
- GroupCaseAccess.case_id == case_id,
- GroupCaseAccess.group_id == group_id
- )).delete()
- db.session.commit()
- return
- def remove_cases_access_from_group(group_id, cases_list):
- if not group_id or type(group_id) is not int:
- return False, "Invalid group"
- if not cases_list or type(cases_list[0]) is not int:
- return False, "Invalid cases list"
- GroupCaseAccess.query.filter(
- and_(
- GroupCaseAccess.case_id.in_(cases_list),
- GroupCaseAccess.group_id == group_id
- )).delete()
- db.session.commit()
- return True, "Updated"
|