test_coze_api.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Coze工作流API调用测试脚本
  5. """
  6. import requests
  7. import json
  8. import time
  9. class CozeWorkflowAPI:
  10. def __init__(self, access_token):
  11. self.base_url = "https://api.coze.cn/v1"
  12. self.headers = {
  13. "Authorization": f"Bearer {access_token}",
  14. "Content-Type": "application/json"
  15. }
  16. def run_workflow(self, workflow_id, parameters=None):
  17. """
  18. 执行Coze工作流
  19. """
  20. url = f"{self.base_url}/workflow/run"
  21. payload = {
  22. "workflow_id": workflow_id
  23. }
  24. if parameters:
  25. payload["parameters"] = parameters
  26. print(f"请求URL: {url}")
  27. print(f"请求头: {json.dumps(self.headers, indent=2, ensure_ascii=False)}")
  28. print(f"请求体: {json.dumps(payload, indent=2, ensure_ascii=False)}")
  29. print("-" * 50)
  30. try:
  31. response = requests.post(
  32. url,
  33. headers=self.headers,
  34. json=payload,
  35. timeout=600
  36. )
  37. print(f"响应状态码: {response.status_code}")
  38. print(f"响应头: {dict(response.headers)}")
  39. if response.status_code == 200:
  40. result = response.json()
  41. print(f"响应内容: {json.dumps(result, indent=2, ensure_ascii=False)}")
  42. return result
  43. else:
  44. print(f"请求失败: {response.status_code}")
  45. print(f"错误内容: {response.text}")
  46. return None
  47. except requests.exceptions.RequestException as e:
  48. print(f"请求异常: {str(e)}")
  49. return None
  50. except json.JSONDecodeError as e:
  51. print(f"JSON解析错误: {str(e)}")
  52. print(f"原始响应: {response.text}")
  53. return None
  54. def main():
  55. # 配置信息
  56. ACCESS_TOKEN = "pat_RiCayfXkgyXLCOWn9eVjwUmwRHavFzjYqT52COqxdMuWSku8PYMN1qD44OZrFCKg"
  57. WORKFLOW_ID = "7543491577856639012"
  58. # 工作流参数 - 包装为Object类型
  59. parameters = {
  60. "input": {
  61. "subject": "物理",
  62. "type": "选择题",
  63. "difficulty": "困难",
  64. "total_quantity": 3,
  65. "knowledge_point": "万有引力"
  66. }
  67. }
  68. print("=" * 60)
  69. print("Coze工作流API调用测试")
  70. print("=" * 60)
  71. print(f"工作流ID: {WORKFLOW_ID}")
  72. print(f"参数: {json.dumps(parameters, indent=2, ensure_ascii=False)}")
  73. print("=" * 60)
  74. # 初始化API客户端
  75. api = CozeWorkflowAPI(ACCESS_TOKEN)
  76. # 执行工作流
  77. print("开始调用工作流...")
  78. start_time = time.time()
  79. result = api.run_workflow(WORKFLOW_ID, parameters)
  80. end_time = time.time()
  81. print(f"\n调用耗时: {end_time - start_time:.2f}秒")
  82. if result:
  83. print("\n=== 调用成功 ===")
  84. if 'code' in result:
  85. if result['code'] == 0:
  86. print("✅ 工作流执行成功")
  87. if 'data' in result:
  88. print(f"📄 执行结果: {result['data']}")
  89. if 'debug_url' in result:
  90. print(f"🔍 调试链接: {result['debug_url']}")
  91. else:
  92. print(f"❌ 工作流执行失败: {result.get('msg', '未知错误')}")
  93. else:
  94. print("\n=== 调用失败 ===")
  95. print("请检查访问令牌、工作流ID或网络连接")
  96. if __name__ == "__main__":
  97. main()