No Description

context_db.py 3.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. # IRIS Source Code
  2. # contact@dfir-iris.org
  3. #
  4. # This program is free software; you can redistribute it and/or
  5. # modify it under the terms of the GNU Lesser General Public
  6. # License as published by the Free Software Foundation; either
  7. # version 3 of the License, or (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. # Lesser General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Lesser General Public License
  15. # along with this program; if not, write to the Free Software Foundation,
  16. # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  17. from sqlalchemy import and_
  18. from sqlalchemy import case
  19. from sqlalchemy import or_
  20. from sqlalchemy import asc
  21. from sqlalchemy import desc
  22. from app.models.cases import Cases
  23. from app.models.models import Client
  24. from app.models.authorization import CaseAccessLevel
  25. from app.models.authorization import UserCaseEffectiveAccess
  26. from app.datamgmt.authorization import has_deny_all_access_level
  27. def ctx_get_user_cases(user_id, max_results: int = 100):
  28. user_priority_sort = case(
  29. [(Cases.owner_id == user_id, 0)],
  30. else_=1
  31. )
  32. uceas = UserCaseEffectiveAccess.query.with_entities(
  33. Cases.case_id,
  34. Cases.name,
  35. Client.name.label('customer_name'),
  36. Cases.close_date,
  37. Cases.owner_id,
  38. UserCaseEffectiveAccess.access_level
  39. ).join(
  40. UserCaseEffectiveAccess.case
  41. ).join(
  42. Cases.client
  43. ).order_by(
  44. asc(user_priority_sort),
  45. desc(Cases.case_id)
  46. ).filter(
  47. UserCaseEffectiveAccess.user_id == user_id
  48. ).limit(max_results).all()
  49. results = []
  50. for ucea in uceas:
  51. if has_deny_all_access_level(ucea):
  52. continue
  53. row = ucea._asdict()
  54. if ucea.access_level == CaseAccessLevel.read_only.value:
  55. row['access'] = '[Read-only]'
  56. else:
  57. row['access'] = ''
  58. results.append(row)
  59. return results
  60. def ctx_search_user_cases(search, user_id, max_results: int = 100):
  61. user_priority_sort = case(
  62. (Cases.owner_id == user_id, 0),
  63. else_=1
  64. ).label("user_priority")
  65. conditions = []
  66. if not search:
  67. conditions.append(UserCaseEffectiveAccess.user_id == user_id)
  68. else:
  69. conditions.append(and_(
  70. UserCaseEffectiveAccess.user_id == user_id,
  71. or_(
  72. Cases.name.ilike('%{}%'.format(search)),
  73. Client.name.ilike('%{}%'.format(search))
  74. )))
  75. uceas = UserCaseEffectiveAccess.query.with_entities(
  76. Cases.case_id,
  77. Cases.name,
  78. Cases.owner_id,
  79. Client.name.label('customer_name'),
  80. Cases.close_date,
  81. UserCaseEffectiveAccess.access_level
  82. ).join(
  83. UserCaseEffectiveAccess.case
  84. ).join(
  85. Cases.client
  86. ).order_by(
  87. user_priority_sort,
  88. desc(Cases.case_id)
  89. ).filter(
  90. *conditions
  91. ).limit(max_results).all()
  92. results = []
  93. for ucea in uceas:
  94. if has_deny_all_access_level(ucea):
  95. continue
  96. row = ucea._asdict()
  97. if ucea.access_level == CaseAccessLevel.read_only.value:
  98. row['access'] = '[Read-only]'
  99. else:
  100. row['access'] = ''
  101. results.append(row)
  102. return results