Нет описания

manage_attribute_db.py 12KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. # IRIS Source Code
  2. # Copyright (C) 2021 - Airbus CyberSecurity (SAS)
  3. # ir@cyberactionlab.net
  4. #
  5. # This program is free software; you can redistribute it and/or
  6. # modify it under the terms of the GNU Lesser General Public
  7. # License as published by the Free Software Foundation; either
  8. # version 3 of the License, or (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. # Lesser General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU Lesser General Public License
  16. # along with this program; if not, write to the Free Software Foundation,
  17. # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18. import json
  19. import logging as logger
  20. from sqlalchemy.orm.attributes import flag_modified
  21. from app import db, app
  22. from app.models.models import CaseAssets
  23. from app.models.models import CaseReceivedFile
  24. from app.models.models import CaseTasks
  25. from app.models.cases import Cases
  26. from app.models.cases import CasesEvent
  27. from app.models.models import Client
  28. from app.models.models import CustomAttribute
  29. from app.models.models import Ioc
  30. from app.models.models import Notes
  31. log = logger.getLogger(__name__)
  32. def update_all_attributes(object_type, previous_attribute, partial_overwrite=False, complete_overwrite=False):
  33. obj_list = []
  34. if object_type == 'ioc':
  35. obj_list = Ioc.query.all()
  36. elif object_type == 'event':
  37. obj_list = CasesEvent.query.all()
  38. elif object_type == 'asset':
  39. obj_list = CaseAssets.query.all()
  40. elif object_type == 'task':
  41. obj_list = CaseTasks.query.all()
  42. elif object_type == 'note':
  43. obj_list = Notes.query.all()
  44. elif object_type == 'evidence':
  45. obj_list = CaseReceivedFile.query.all()
  46. elif object_type == 'case':
  47. obj_list = Cases.query.all()
  48. elif object_type == 'client':
  49. obj_list = Client.query.all()
  50. target_attr = get_default_custom_attributes(object_type)
  51. app.logger.info(f'Migrating {len(obj_list)} objects of type {object_type}')
  52. for obj in obj_list:
  53. if complete_overwrite or obj.custom_attributes is None:
  54. app.logger.info('Achieving complete overwrite')
  55. obj.custom_attributes = target_attr
  56. flag_modified(obj, "custom_attributes")
  57. db.session.commit()
  58. continue
  59. for tab in target_attr:
  60. if obj.custom_attributes.get(tab) is None or partial_overwrite:
  61. app.logger.info(f'Migrating {tab}')
  62. flag_modified(obj, "custom_attributes")
  63. obj.custom_attributes[tab] = target_attr[tab]
  64. else:
  65. for element in target_attr[tab]:
  66. if element not in obj.custom_attributes[tab]:
  67. app.logger.info(f'Migrating {element}')
  68. flag_modified(obj, "custom_attributes")
  69. obj.custom_attributes[tab][element] = target_attr[tab][element]
  70. else:
  71. if obj.custom_attributes[tab][element]['type'] != target_attr[tab][element]['type']:
  72. if (obj.custom_attributes[tab][element]['value'] == target_attr[tab][element]['value']) or \
  73. (obj.custom_attributes[tab][element]['type'] in ('input_string', 'input_text_field') and
  74. target_attr[tab][element]['type'] in ('input_string', 'input_text_field')):
  75. flag_modified(obj, "custom_attributes")
  76. obj.custom_attributes[tab][element]['type'] = target_attr[tab][element]['type']
  77. if 'mandatory' in target_attr[tab][element] \
  78. and obj.custom_attributes[tab][element]['mandatory'] != target_attr[tab][element]['mandatory']:
  79. flag_modified(obj, "custom_attributes")
  80. obj.custom_attributes[tab][element]['mandatory'] = target_attr[tab][element]['mandatory']
  81. if partial_overwrite:
  82. for tab in previous_attribute:
  83. if not target_attr.get(tab):
  84. if obj.custom_attributes.get(tab):
  85. flag_modified(obj, "custom_attributes")
  86. obj.custom_attributes.pop(tab)
  87. for element in previous_attribute[tab]:
  88. if target_attr.get(tab):
  89. if not target_attr[tab].get(element):
  90. if obj.custom_attributes[tab].get(element):
  91. flag_modified(obj, "custom_attributes")
  92. obj.custom_attributes[tab].pop(element)
  93. # Commit will only be effective if we flagged a modification, reducing load on the DB
  94. db.session.commit()
  95. def get_default_custom_attributes(object_type):
  96. ca = CustomAttribute.query.filter(CustomAttribute.attribute_for == object_type).first()
  97. return ca.attribute_content
  98. def add_tab_attribute(obj, tab_name):
  99. """
  100. Add a new custom tab to an object ID
  101. """
  102. if not obj:
  103. return False
  104. attribute = obj.custom_attributes
  105. if tab_name in attribute:
  106. return True
  107. else:
  108. attribute[tab_name] = {}
  109. flag_modified(obj, "custom_attributes")
  110. db.session.commit()
  111. return True
  112. def add_tab_attribute_field(obj, tab_name, field_name, field_type, field_value, mandatory=None, field_options=None):
  113. if not obj:
  114. return False
  115. attribute = obj.custom_attributes
  116. if attribute is None:
  117. attribute = {}
  118. if tab_name not in attribute:
  119. attribute[tab_name] = {}
  120. attr = {
  121. field_name: {
  122. "mandatory": mandatory if mandatory is not None else False,
  123. "type": field_type,
  124. "value": field_value
  125. }
  126. }
  127. if field_options:
  128. attr[field_name]['options'] = field_options
  129. attribute[tab_name][field_name] = attr[field_name]
  130. obj.custom_attributes = attribute
  131. flag_modified(obj, "custom_attributes")
  132. db.session.commit()
  133. return True
  134. def merge_custom_attributes(data, obj_id, object_type, overwrite=False):
  135. obj = None
  136. if obj_id:
  137. if object_type == 'ioc':
  138. obj = Ioc.query.filter(Ioc.ioc_id == obj_id).first()
  139. elif object_type == 'event':
  140. obj = CasesEvent.query.filter(CasesEvent.event_id == obj_id).first()
  141. elif object_type == 'asset':
  142. obj = CaseAssets.query.filter(CaseAssets.asset_id == obj_id).first()
  143. elif object_type == 'task':
  144. obj = CaseTasks.query.filter(CaseTasks.id == obj_id).first()
  145. elif object_type == 'note':
  146. obj = Notes.query.filter(Notes.note_id == obj_id).first()
  147. elif object_type == 'evidence':
  148. obj = CaseReceivedFile.query.filter(CaseReceivedFile.id == obj_id).first()
  149. elif object_type == 'case':
  150. obj = Cases.query.filter(Cases.case_id == obj_id).first()
  151. elif object_type == 'client':
  152. obj = Client.query.filter(Client.client_id == obj_id).first()
  153. if not obj:
  154. return data
  155. if overwrite:
  156. log.warning(f'Overwriting all {object_type}')
  157. return get_default_custom_attributes(object_type)
  158. for tab in data:
  159. if obj.custom_attributes.get(tab) is None:
  160. log.error(f'Missing tab {tab} in {object_type}')
  161. continue
  162. for field in data[tab]:
  163. if field not in obj.custom_attributes[tab]:
  164. log.error(f'Missing field {field} in {object_type}')
  165. else:
  166. if obj.custom_attributes[tab][field]['type'] == 'html':
  167. continue
  168. if obj.custom_attributes[tab][field]['value'] != data[tab][field]:
  169. flag_modified(obj, "custom_attributes")
  170. obj.custom_attributes[tab][field]['value'] = data[tab][field]
  171. # Commit will only be effective if we flagged a modification, reducing load on the DB
  172. db.session.commit()
  173. return obj.custom_attributes
  174. else:
  175. default_attr = get_default_custom_attributes(object_type)
  176. for tab in data:
  177. if default_attr.get(tab) is None:
  178. app.logger.info(f'Missing tab {tab} in {object_type} default attribute')
  179. continue
  180. for field in data[tab]:
  181. if field not in default_attr[tab]:
  182. app.logger.info(f'Missing field {field} in {object_type} default attribute')
  183. else:
  184. default_attr[tab][field]['value'] = data[tab][field]
  185. return default_attr
  186. def validate_attribute(attribute):
  187. logs = []
  188. try:
  189. data = json.loads(attribute)
  190. except Exception as e:
  191. return None, [str(e)]
  192. for tab in data:
  193. for field in data[tab]:
  194. if not data[tab][field].get('type'):
  195. logs.append(f'{tab}::{field} is missing mandatory "type" tag')
  196. continue
  197. field_type = data[tab][field].get('type')
  198. if field_type in ['input_string', 'input_textfield', 'input_checkbox', 'input_select',
  199. 'input_date', 'input_datetime']:
  200. if data[tab][field].get('mandatory') is None:
  201. logs.append(f'{tab} -> {field} of type {field_type} is missing mandatory "mandatory" tag')
  202. elif not isinstance(data[tab][field].get('mandatory'), bool):
  203. logs.append(f'{tab} -> {field} -> "mandatory" expects a value of type bool, '
  204. f'but got {type(data[tab][field].get("mandatory"))}')
  205. if data[tab][field].get('value') is None:
  206. logs.append(f'{tab} -> {field} of type {field_type} is missing mandatory "value" tag')
  207. if field_type == 'input_checkbox' and not isinstance(data[tab][field].get('value'), bool):
  208. logs.append(f'{tab} -> {field} of type {field_type} expects a value of type bool, '
  209. f'but got {type(data[tab][field]["value"])}')
  210. if field_type in ['input_string', 'input_textfield', 'input_date', 'input_datetime']:
  211. if not isinstance(data[tab][field].get('value'), str):
  212. logs.append(f'{tab} -> {field} of type {field_type} expects a value of type str, '
  213. f'but got {type(data[tab][field]["value"])}')
  214. if field_type == 'input_select':
  215. if data[tab][field].get('options') is None:
  216. logs.append(f'{tab} -> {field} of type {field_type} is missing mandatory "options" tag')
  217. continue
  218. if not isinstance(data[tab][field].get('options'), list):
  219. logs.append(f'{tab} -> {field} of type {field_type} expects a value of type list, '
  220. f'but got {type(data[tab][field]["value"])}')
  221. for opt in data[tab][field].get('options'):
  222. if not isinstance(opt, str):
  223. logs.append(f'{tab} -> {field} -> "options" expects a list of str, '
  224. f'but got {type(opt)}')
  225. elif field_type in ['raw', 'html']:
  226. if data[tab][field].get('value') is None:
  227. logs.append(f'{tab} -> {field} of type {field_type} is missing mandatory "value" tag')
  228. else:
  229. logs.append(f'{tab} -> {field}, unknown field type "{field_type}"')
  230. return data, logs