Нет описания

test_burst_db_interaction.py 6.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # IRIS Source Code
  2. # Copyright (C) 2021 - Airbus CyberSecurity (SAS)
  3. # ir@cyberactionlab.net
  4. #
  5. # This program is free software; you can redistribute it and/or
  6. # modify it under the terms of the GNU Lesser General Public
  7. # License as published by the Free Software Foundation; either
  8. # version 3 of the License, or (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. # Lesser General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU Lesser General Public License
  16. # along with this program; if not, write to the Free Software Foundation,
  17. # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18. from unittest import TestCase
  19. import logging
  20. import random
  21. from datetime import datetime
  22. from datetime import timedelta
  23. from app import db
  24. from app.datamgmt.case.case_assets_db import create_asset
  25. from app.datamgmt.case.case_notes_db import add_note
  26. from app.datamgmt.case.case_notes_db import add_note_group
  27. from app.datamgmt.manage.manage_users_db import create_user
  28. from app.models.cases import Cases
  29. from app.models.cases import CasesEvent
  30. from app.models.cases import Client
  31. from app.models.models import CaseEventsAssets
  32. from app.models.authorization import User
  33. from app.post_init import run_post_init
  34. from tests.clean_database import clean_db
  35. class TestBurstDBInteraction(TestCase):
  36. def setUp(self) -> None:
  37. logging.info('SetUp called')
  38. clean_db()
  39. run_post_init()
  40. def tearDown(self) -> None:
  41. logging.info('Teardown called')
  42. clean_db()
  43. @staticmethod
  44. def _create_burst_users(random_nb: int):
  45. user_name = "User "
  46. user_login = "user_"
  47. user_password = "user_"
  48. user_email = "user_"
  49. for i in range(0, random_nb):
  50. create_user(
  51. user_name=f"{user_name}{str(i)}",
  52. user_login=f"{user_login}{str(i)}",
  53. user_password=f"{user_password}{str(i)}",
  54. user_email=f"{user_email}{str(i)}",
  55. user_isadmin=(i % 2 == 0)
  56. )
  57. @staticmethod
  58. def _create_burst_clients(clients_nb: int):
  59. for i in range(clients_nb):
  60. client = Client(f"client_{str(i)}")
  61. db.session.add(client)
  62. db.session.commit()
  63. @staticmethod
  64. def _create_burst_cases(users_nb: int, client_nb: int, cases_nb: int):
  65. for i in range(cases_nb):
  66. logging.info(f"Creating case #{str(i)}")
  67. asset_l = []
  68. case = Cases(
  69. name=f"Test {str(i)}",
  70. description=f"Testing case number {str(i)}",
  71. soc_id=f"SOC{str(i)}",
  72. gen_report=False,
  73. user=(User.query.filter(User.id == random.randrange(1, users_nb)).first()),
  74. client_name=f"client_{str(random.randrange(1, client_nb))}"
  75. )
  76. case.validate_on_build()
  77. case.save()
  78. for ii in range(random.randrange(5, 10)):
  79. ng = add_note_group(
  80. group_title=f"Group #{str(ii)}",
  81. caseid=case.case_id,
  82. userid=random.randrange(1, users_nb),
  83. creationdate=datetime.utcnow()
  84. )
  85. for iii in range(random.randrange(2, 8)):
  86. add_note(
  87. note_title=f"Note #{str(ii)}",
  88. creation_date=datetime.utcnow(),
  89. user_id=random.randrange(1, users_nb),
  90. caseid=case.case_id,
  91. group_id=ng.group_id
  92. )
  93. for ii in range(random.randrange(6, 140)):
  94. asset = create_asset(
  95. asset_name=f"asset_{str(ii)}",
  96. asset_description=f"My asset {str(i)}",
  97. asset_ip='',
  98. asset_info='',
  99. asset_compromised=(ii % 2 == 0),
  100. asset_type=random.randrange(1, 19),
  101. asset_domain='',
  102. date_added=datetime.utcnow(),
  103. date_update=datetime.utcnow(),
  104. caseid=case.case_id,
  105. user_id=random.randrange(1, users_nb),
  106. analysis_status=random.randrange(1, 5)
  107. )
  108. asset_l.append(asset.asset_id)
  109. for ii in range(random.randrange(10, 350)):
  110. event = CasesEvent()
  111. event.case_id = case.case_id
  112. event.user_id = random.randrange(1, users_nb)
  113. event.event_raw = ""
  114. event.event_content = f"My event content @{str(ii)}"
  115. event.event_title = f"My event title @{str(ii)}"
  116. event.event_tags = ''
  117. event.event_color = ''
  118. event.event_date = datetime.utcnow()
  119. event.event_added = datetime.utcnow()
  120. db.session.add(event)
  121. db.session.commit()
  122. for iii in range(random.randrange(0, 5)):
  123. cea = CaseEventsAssets()
  124. cea.asset_id = asset_l[random.randrange(len(asset_l))]
  125. cea.event_id = event.event_id
  126. cea.case_id = case.case_id
  127. db.session.add(cea)
  128. db.session.commit()
  129. @staticmethod
  130. def random_date(start, end):
  131. delta = end - start
  132. int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
  133. random_second = random.randrange(int_delta)
  134. return start + timedelta(seconds=random_second)
  135. @staticmethod
  136. def update_dates():
  137. d1 = datetime.strptime('1/1/2008 1:30 PM', '%m/%d/%Y %I:%M %p')
  138. d2 = datetime.strptime('12/12/2021 4:50 AM', '%m/%d/%Y %I:%M %p')
  139. events = CasesEvent.query.all()
  140. for event in events:
  141. event.event_date = datetime.utcnow()
  142. logging.info(f"Updating event {event.event_title}")
  143. db.session.commit()
  144. def test_burst_creation(self):
  145. start_time = datetime.utcnow()
  146. logging.info(f"Test started at: {start_time.__str__()}")
  147. logging.info('Creating random users')
  148. self._create_burst_users(154)
  149. logging.info('Creating random clients')
  150. self._create_burst_clients(68)
  151. logging.info('Creating random cases')
  152. self._create_burst_cases(154, 68, 1489)
  153. end_time = datetime.utcnow()
  154. logging.info(f"Test ended at: {end_time.__str__()}")
  155. logging.info(f"Elapsed time: {(end_time - start_time).__str__()}")