Нет описания

rest_api.py 2.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. # IRIS Source Code
  2. # Copyright (C) 2023 - DFIR-IRIS
  3. # contact@dfir-iris.org
  4. #
  5. # This program is free software; you can redistribute it and/or
  6. # modify it under the terms of the GNU Lesser General Public
  7. # License as published by the Free Software Foundation; either
  8. # version 3 of the License, or (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. # Lesser General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU Lesser General Public License
  16. # along with this program; if not, write to the Free Software Foundation,
  17. # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18. import requests
  19. from requests.exceptions import ConnectionError
  20. from requests.exceptions import JSONDecodeError
  21. from urllib import parse
  22. class RestApi:
  23. def __init__(self, url, api_key):
  24. self._url = url
  25. self._headers = {'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json'}
  26. def _build_url(self, path):
  27. return parse.urljoin(self._url, path)
  28. @staticmethod
  29. def _convert_response_to_string(response):
  30. try:
  31. return f'{response.status_code} {response.json()}'
  32. except JSONDecodeError:
  33. return f'{response.status_code}'
  34. def post(self, path, payload, query_parameters=None):
  35. url = self._build_url(path)
  36. response = requests.post(url, headers=self._headers, params=query_parameters, json=payload)
  37. response_as_string = self._convert_response_to_string(response)
  38. print(f'POST {url} {payload} => {response_as_string}')
  39. return response
  40. def get(self, path, query_parameters=None):
  41. url = self._build_url(path)
  42. response = requests.get(url, headers=self._headers, params=query_parameters)
  43. response_as_string = self._convert_response_to_string(response)
  44. print(f'GET {url} => {response_as_string}')
  45. return response
  46. def put(self, path, payload):
  47. url = self._build_url(path)
  48. response = requests.put(url, headers=self._headers, json=payload)
  49. response_as_string = self._convert_response_to_string(response)
  50. print(f'PUT {url} {payload} => {response_as_string}')
  51. return response
  52. def delete(self, path, query_parameters=None):
  53. url = self._build_url(path)
  54. response = requests.delete(url, headers=self._headers, params=query_parameters)
  55. response_as_string = self._convert_response_to_string(response)
  56. print(f'DELETE {url} => {response_as_string}')
  57. return response
  58. def is_ready(self):
  59. try:
  60. requests.head(self._url)
  61. return True
  62. except ConnectionError:
  63. return False