123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Coze工作流API调用测试脚本
- """
- import requests
- import json
- import time
- class CozeWorkflowAPI:
- def __init__(self, access_token):
- self.base_url = "https://api.coze.cn/v1"
- self.headers = {
- "Authorization": f"Bearer {access_token}",
- "Content-Type": "application/json"
- }
-
- def run_workflow(self, workflow_id, parameters=None):
- """
- 执行Coze工作流
- """
- url = f"{self.base_url}/workflow/run"
- payload = {
- "workflow_id": workflow_id
- }
-
- if parameters:
- payload["parameters"] = parameters
-
- print(f"请求URL: {url}")
- print(f"请求头: {json.dumps(self.headers, indent=2, ensure_ascii=False)}")
- print(f"请求体: {json.dumps(payload, indent=2, ensure_ascii=False)}")
- print("-" * 50)
-
- try:
- response = requests.post(
- url,
- headers=self.headers,
- json=payload,
- timeout=600
- )
-
- print(f"响应状态码: {response.status_code}")
- print(f"响应头: {dict(response.headers)}")
-
- if response.status_code == 200:
- result = response.json()
- print(f"响应内容: {json.dumps(result, indent=2, ensure_ascii=False)}")
- return result
- else:
- print(f"请求失败: {response.status_code}")
- print(f"错误内容: {response.text}")
- return None
-
- except requests.exceptions.RequestException as e:
- print(f"请求异常: {str(e)}")
- return None
- except json.JSONDecodeError as e:
- print(f"JSON解析错误: {str(e)}")
- print(f"原始响应: {response.text}")
- return None
- def main():
- # 配置信息
- ACCESS_TOKEN = "pat_RiCayfXkgyXLCOWn9eVjwUmwRHavFzjYqT52COqxdMuWSku8PYMN1qD44OZrFCKg"
- WORKFLOW_ID = "7543491577856639012"
-
- # 工作流参数 - 包装为Object类型
- parameters = {
- "input": {
- "subject": "物理",
- "type": "选择题",
- "difficulty": "困难",
- "total_quantity": 3,
- "knowledge_point": "万有引力"
- }
- }
-
- print("=" * 60)
- print("Coze工作流API调用测试")
- print("=" * 60)
- print(f"工作流ID: {WORKFLOW_ID}")
- print(f"参数: {json.dumps(parameters, indent=2, ensure_ascii=False)}")
- print("=" * 60)
-
- # 初始化API客户端
- api = CozeWorkflowAPI(ACCESS_TOKEN)
-
- # 执行工作流
- print("开始调用工作流...")
- start_time = time.time()
-
- result = api.run_workflow(WORKFLOW_ID, parameters)
-
- end_time = time.time()
- print(f"\n调用耗时: {end_time - start_time:.2f}秒")
-
- if result:
- print("\n=== 调用成功 ===")
- if 'code' in result:
- if result['code'] == 0:
- print("✅ 工作流执行成功")
- if 'data' in result:
- print(f"📄 执行结果: {result['data']}")
- if 'debug_url' in result:
- print(f"🔍 调试链接: {result['debug_url']}")
- else:
- print(f"❌ 工作流执行失败: {result.get('msg', '未知错误')}")
- else:
- print("\n=== 调用失败 ===")
- print("请检查访问令牌、工作流ID或网络连接")
- if __name__ == "__main__":
- main()
|