#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Coze工作流API调用测试脚本 """ import requests import json import time class CozeWorkflowAPI: def __init__(self, access_token): self.base_url = "https://api.coze.cn/v1" self.headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } def run_workflow(self, workflow_id, parameters=None): """ 执行Coze工作流 """ url = f"{self.base_url}/workflow/run" payload = { "workflow_id": workflow_id } if parameters: payload["parameters"] = parameters print(f"请求URL: {url}") print(f"请求头: {json.dumps(self.headers, indent=2, ensure_ascii=False)}") print(f"请求体: {json.dumps(payload, indent=2, ensure_ascii=False)}") print("-" * 50) try: response = requests.post( url, headers=self.headers, json=payload, timeout=600 ) print(f"响应状态码: {response.status_code}") print(f"响应头: {dict(response.headers)}") if response.status_code == 200: result = response.json() print(f"响应内容: {json.dumps(result, indent=2, ensure_ascii=False)}") return result else: print(f"请求失败: {response.status_code}") print(f"错误内容: {response.text}") return None except requests.exceptions.RequestException as e: print(f"请求异常: {str(e)}") return None except json.JSONDecodeError as e: print(f"JSON解析错误: {str(e)}") print(f"原始响应: {response.text}") return None def main(): # 配置信息 ACCESS_TOKEN = "pat_RiCayfXkgyXLCOWn9eVjwUmwRHavFzjYqT52COqxdMuWSku8PYMN1qD44OZrFCKg" WORKFLOW_ID = "7543491577856639012" # 工作流参数 - 包装为Object类型 parameters = { "input": { "subject": "物理", "type": "选择题", "difficulty": "困难", "total_quantity": 3, "knowledge_point": "万有引力" } } print("=" * 60) print("Coze工作流API调用测试") print("=" * 60) print(f"工作流ID: {WORKFLOW_ID}") print(f"参数: {json.dumps(parameters, indent=2, ensure_ascii=False)}") print("=" * 60) # 初始化API客户端 api = CozeWorkflowAPI(ACCESS_TOKEN) # 执行工作流 print("开始调用工作流...") start_time = time.time() result = api.run_workflow(WORKFLOW_ID, parameters) end_time = time.time() print(f"\n调用耗时: {end_time - start_time:.2f}秒") if result: print("\n=== 调用成功 ===") if 'code' in result: if result['code'] == 0: print("✅ 工作流执行成功") if 'data' in result: print(f"📄 执行结果: {result['data']}") if 'debug_url' in result: print(f"🔍 调试链接: {result['debug_url']}") else: print(f"❌ 工作流执行失败: {result.get('msg', '未知错误')}") else: print("\n=== 调用失败 ===") print("请检查访问令牌、工作流ID或网络连接") if __name__ == "__main__": main()