Ei kuvausta

run_function.py 1.8KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. # This is a script to test a function by itself
  2. import requests
  3. import json
  4. def invoke(url, headers, message):
  5. # Used for testing
  6. try:
  7. ret = requests.post(url, headers=headers, json=message, timeout=5)
  8. print(ret.text)
  9. print(ret.status_code)
  10. except requests.exceptions.ConnectionError as e:
  11. print(f"Requesterror: {e}")
  12. def invoke_multi(url, headers, message):
  13. cnt = 0
  14. maxcnt = 100
  15. print("Running %d requests towards %s." % (maxcnt, url))
  16. while(1):
  17. try:
  18. ret = requests.post(url, headers=headers, json=message, timeout=1)
  19. print(ret.status_code)
  20. except requests.exceptions.ConnectionError as e:
  21. print(f"Connectionerror: {e}")
  22. except requests.exceptions.ReadTimeout as e:
  23. print(f"Readtimeout: {e}")
  24. cnt += 1
  25. if cnt == maxcnt:
  26. break
  27. print("Done :)")
  28. if __name__ == "__main__":
  29. # Specific thingies for hello_world
  30. message = {
  31. "parameters": [{
  32. "id_": "asd",
  33. "name": "call",
  34. "value": "REPEAT THIS DATA PLEASE THANKS",
  35. "variant": "STATIC_VALUE",
  36. }],
  37. "name": "repeat_back_to_me",
  38. "execution_id": "asd",
  39. "label": "",
  40. "position": "",
  41. "app_name": "hello_world",
  42. "app_version": "1.0.0",
  43. "label": "lul",
  44. "priority": "1",
  45. "id_": "test",
  46. "id": "test",
  47. "authorization": "hey",
  48. }
  49. apikey = ""
  50. headers = {
  51. "Content-Type": "application/json",
  52. "Authorization": f"Bearer {apikey}"
  53. }
  54. location = "europe-west2"
  55. functionname = "hello-world-1-0-6"
  56. project = "shuffler"
  57. url = f"https://{location}-{project}.cloudfunctions.net/{functionname}"
  58. print(url)
  59. invoke(url, headers, message)
  60. #invoke_multi(url, headers, message)