Brak opisu

case_db.py 5.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. # IRIS Source Code
  2. # Copyright (C) 2021 - Airbus CyberSecurity (SAS)
  3. # ir@cyberactionlab.net
  4. #
  5. # This program is free software; you can redistribute it and/or
  6. # modify it under the terms of the GNU Lesser General Public
  7. # License as published by the Free Software Foundation; either
  8. # version 3 of the License, or (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. # Lesser General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU Lesser General Public License
  16. # along with this program; if not, write to the Free Software Foundation,
  17. # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18. import binascii
  19. from sqlalchemy import and_
  20. from sqlalchemy import exists
  21. from sqlalchemy import select
  22. from app import db
  23. from app.datamgmt.manage.manage_tags_db import add_db_tag
  24. from app.models.authorization import User
  25. from app.models.cases import CaseProtagonist
  26. from app.models.cases import Cases
  27. from app.models.models import CaseTemplateReport, ReviewStatus
  28. from app.models.models import Client
  29. from app.models.models import Languages
  30. from app.models.models import ReportType
  31. def get_case_summary(caseid):
  32. case_summary = Cases.query.filter(
  33. Cases.case_id == caseid
  34. ).with_entities(
  35. Cases.name.label('case_name'),
  36. Cases.open_date.label('case_open'),
  37. User.name.label('user'),
  38. Client.name.label('customer')
  39. ).join(
  40. Cases.user
  41. ).join(
  42. Cases.client
  43. ).first()
  44. return case_summary
  45. def get_case(caseid) -> Cases:
  46. return Cases.query.filter(Cases.case_id == caseid).first()
  47. def case_db_exists(identifier):
  48. stmt = select(exists().where(Cases.case_id == identifier))
  49. return db.session.scalar(stmt)
  50. def get_case_client_id(caseid):
  51. customer = Cases.query.with_entities(
  52. Client.client_id
  53. ).filter(
  54. Cases.case_id == caseid
  55. ).join(Cases.client).first()
  56. return customer.client_id
  57. def case_get_desc(caseid):
  58. case_desc = Cases.query.with_entities(
  59. Cases.description
  60. ).filter(
  61. Cases.case_id == caseid
  62. ).first()
  63. return case_desc
  64. def case_get_desc_crc(caseid):
  65. partial_case = case_get_desc(caseid)
  66. if partial_case:
  67. desc = partial_case.description
  68. if not desc:
  69. desc = ""
  70. desc_crc32 = binascii.crc32(desc.encode('utf-8'))
  71. else:
  72. desc = None
  73. desc_crc32 = None
  74. return desc_crc32, desc
  75. def case_set_desc_crc(desc, caseid):
  76. lcase = get_case(caseid)
  77. if lcase:
  78. if not desc:
  79. desc = ""
  80. lcase.description = desc
  81. db.session.commit()
  82. return True
  83. return False
  84. def get_case_report_template():
  85. reports = CaseTemplateReport.query.with_entities(
  86. CaseTemplateReport.id,
  87. CaseTemplateReport.name,
  88. Languages.name,
  89. CaseTemplateReport.description
  90. ).filter(and_(
  91. Languages.id == CaseTemplateReport.language_id,
  92. ReportType.name == "Investigation"
  93. )).join(
  94. CaseTemplateReport.report_type
  95. ).all()
  96. return reports
  97. def save_case_tags(tags, case):
  98. if tags is None:
  99. return
  100. case.tags.clear()
  101. for tag in tags.split(','):
  102. tag = tag.strip()
  103. if tag:
  104. tg = add_db_tag(tag)
  105. case.tags.append(tg)
  106. db.session.commit()
  107. def get_case_tags(case_id):
  108. case = Cases.query.get(case_id)
  109. if case:
  110. return [tag.tag_title for tag in case.tags]
  111. return []
  112. def get_activities_report_template():
  113. reports = CaseTemplateReport.query.with_entities(
  114. CaseTemplateReport.id,
  115. CaseTemplateReport.name,
  116. Languages.name,
  117. CaseTemplateReport.description
  118. ).filter(and_(
  119. ReportType.name == "Activities",
  120. Languages.id == CaseTemplateReport.language_id
  121. )).outerjoin(
  122. CaseTemplateReport.report_type
  123. ).outerjoin(
  124. CaseTemplateReport.language
  125. ).all()
  126. return reports
  127. def case_name_exists(case_name, client_name):
  128. res = Cases.query.with_entities(
  129. Cases.name, Client.name
  130. ).filter(and_(
  131. Cases.name == case_name,
  132. Client.name == client_name
  133. )).join(
  134. Cases.client
  135. ).first()
  136. return True if res else False
  137. def register_case_protagonists(case_id, protagonists):
  138. if protagonists is None:
  139. return
  140. CaseProtagonist.query.filter(
  141. CaseProtagonist.case_id == case_id
  142. ).delete()
  143. for protagonist in protagonists:
  144. for key in ['role', 'name']:
  145. if not protagonist.get(key):
  146. continue
  147. cp = CaseProtagonist()
  148. cp.case_id = case_id
  149. cp.role = protagonist.get('role')
  150. cp.name = protagonist.get('name')
  151. cp.contact = protagonist.get('contact')
  152. db.session.add(cp)
  153. db.session.commit()
  154. def get_review_id_from_name(review_name):
  155. status = ReviewStatus.query.filter(ReviewStatus.status_name == review_name).first()
  156. if status:
  157. return status.id
  158. return None