Keine Beschreibung

graphql_route.py 5.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. # IRIS Source Code
  2. # Copyright (C) 2024 - DFIR-IRIS
  3. # contact@dfir-iris.org
  4. #
  5. # This program is free software; you can redistribute it and/or
  6. # modify it under the terms of the GNU Lesser General Public
  7. # License as published by the Free Software Foundation; either
  8. # version 3 of the License, or (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. # Lesser General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU Lesser General Public License
  16. # along with this program; if not, write to the Free Software Foundation,
  17. # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18. from functools import wraps
  19. from flask import request
  20. from flask_wtf import FlaskForm
  21. from flask import Blueprint
  22. from flask_login import current_user
  23. from graphql_server.flask import GraphQLView
  24. from graphene import ObjectType
  25. from graphene import Schema
  26. from graphene import Float
  27. from graphene import Int
  28. from graphene import Field
  29. from graphene import String
  30. from graphene_sqlalchemy import SQLAlchemyConnectionField
  31. from app.datamgmt.manage.manage_cases_db import build_filter_case_query
  32. from app.blueprints.access_controls import is_user_authenticated
  33. from app.blueprints.responses import response_error
  34. from app.models.authorization import CaseAccessLevel
  35. from app.blueprints.graphql.cases import CaseObject
  36. from app.blueprints.graphql.iocs import IOCObject
  37. from app.blueprints.graphql.iocs import IOCCreate
  38. from app.blueprints.graphql.iocs import IOCUpdate
  39. from app.blueprints.graphql.iocs import IOCDelete
  40. from app.blueprints.graphql.cases import CaseCreate
  41. from app.blueprints.graphql.cases import CaseDelete
  42. from app.blueprints.graphql.cases import CaseUpdate
  43. from app.blueprints.graphql.cases import CaseConnection
  44. from app.business.cases import cases_get_by_identifier
  45. from app.business.iocs import iocs_get
  46. from app.blueprints.graphql.permissions import permissions_check_current_user_has_some_case_access
  47. import warnings
  48. # Ignore all UserWarnings
  49. warnings.filterwarnings("ignore", category=UserWarning)
  50. class Query(ObjectType):
  51. """This is the IRIS GraphQL queries documentation!"""
  52. cases = SQLAlchemyConnectionField(CaseConnection, classification_id=Float(), client_id=Float(), state_id=Int(),
  53. owner_id=Float(), open_date=String(), name=String(), soc_id=String(),
  54. severity_id=Int(), tags=String(), open_since=Int())
  55. case = Field(CaseObject, case_id=Float(), description='Retrieve a case by its identifier')
  56. ioc = Field(IOCObject, ioc_id=Float(), description='Retrieve an ioc by its identifier')
  57. @staticmethod
  58. def resolve_cases(root, info, classification_id=None, client_id=None, state_id=None, owner_id=None, open_date=None, name=None, soc_id=None,
  59. severity_id=None, tags=None, open_since=None, **kwargs):
  60. return build_filter_case_query(current_user.id, start_open_date=open_date, end_open_date=None, case_customer_id=client_id, case_ids=None,
  61. case_name=name, case_description=None, case_classification_id=classification_id, case_owner_id=owner_id,
  62. case_opening_user_id=None, case_severity_id=severity_id, case_state_id=state_id, case_soc_id=soc_id,
  63. case_tags=tags, case_open_since=open_since)
  64. @staticmethod
  65. def resolve_case(root, info, case_id):
  66. permissions_check_current_user_has_some_case_access(case_id, [CaseAccessLevel.read_only, CaseAccessLevel.full_access])
  67. return cases_get_by_identifier(case_id)
  68. @staticmethod
  69. def resolve_ioc(root, info, ioc_id):
  70. ioc = iocs_get(ioc_id)
  71. permissions_check_current_user_has_some_case_access(ioc.case_id, [CaseAccessLevel.read_only, CaseAccessLevel.full_access])
  72. return ioc
  73. class Mutation(ObjectType):
  74. ioc_create = IOCCreate.Field()
  75. ioc_update = IOCUpdate.Field()
  76. ioc_delete = IOCDelete.Field()
  77. case_create = CaseCreate.Field()
  78. case_delete = CaseDelete.Field()
  79. case_update = CaseUpdate.Field()
  80. def _check_authentication_wrapper(f):
  81. @wraps(f)
  82. def wrap(*args, **kwargs):
  83. if request.method == 'POST':
  84. cookie_session = request.cookies.get('session')
  85. if cookie_session:
  86. form = FlaskForm()
  87. if not form.validate():
  88. return response_error('Invalid CSRF token')
  89. elif request.is_json:
  90. request.json.pop('csrf_token')
  91. if not is_user_authenticated(request):
  92. return response_error('Authentication required', status=401)
  93. return f(*args, **kwargs)
  94. return wrap
  95. def _create_blueprint():
  96. schema = Schema(query=Query, mutation=Mutation)
  97. graphql_view = GraphQLView.as_view('graphql', schema=schema)
  98. graphql_view_with_authentication = _check_authentication_wrapper(graphql_view)
  99. blueprint = Blueprint('graphql', __name__)
  100. blueprint.add_url_rule('/graphql', view_func=graphql_view_with_authentication, methods=['POST'])
  101. return blueprint
  102. graphql_blueprint = _create_blueprint()