#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 测试difficulty字段修复情况 验证数据库中题目的difficulty字段是否正确设置 """ import requests import json import time import jaydebeapi import os # 配置 BASE_URL = "http://localhost:8080/api" # H2数据库配置(内存数据库) H2_URL = "jdbc:h2:mem:testdb" H2_USER = "sa" H2_PASSWORD = "" H2_DRIVER = "org.h2.Driver" # 下载H2驱动(如果不存在) H2_JAR_PATH = "h2.jar" def download_h2_driver(): """下载H2数据库驱动""" if not os.path.exists(H2_JAR_PATH): print("正在下载H2数据库驱动...") import urllib.request try: urllib.request.urlretrieve( "https://repo1.maven.org/maven2/com/h2database/h2/2.2.224/h2-2.2.224.jar", H2_JAR_PATH ) print("✓ H2驱动下载成功") except Exception as e: print(f"✗ H2驱动下载失败: {e}") return False return True def test_h2_console_access(): """测试H2控制台访问""" try: response = requests.get("http://localhost:8080/h2-console", timeout=5) if response.status_code == 200: print("✓ H2控制台可访问") print(" 访问地址: http://localhost:8080/h2-console") print(" JDBC URL: jdbc:h2:mem:testdb") print(" 用户名: sa") print(" 密码: (空)") return True else: print(f"✗ H2控制台访问失败,状态码: {response.status_code}") return False except Exception as e: print(f"✗ H2控制台访问错误: {e}") return False def test_database_connection(): """测试数据库连接(通过H2控制台信息)""" print("\n=== H2内存数据库信息 ===") print("由于H2是内存数据库,只能在应用运行时访问") return test_h2_console_access() def check_existing_difficulty_data(connection): """检查现有数据的difficulty字段""" try: cursor = connection.cursor() # 查询goal_detail表中的difficulty字段 query = """ SELECT id, goal_id, question, difficulty, created_at FROM goal_detail ORDER BY created_at DESC LIMIT 10 """ cursor.execute(query) results = cursor.fetchall() print("\n=== 现有题目difficulty字段检查 ===") if results: for row in results: id_val, goal_id, question, difficulty, created_at = row question_preview = question[:50] + "..." if len(question) > 50 else question print(f"ID: {id_val}, Goal: {goal_id}, Difficulty: {difficulty}, Question: {question_preview}") else: print("数据库中没有找到题目数据") # 统计difficulty字段的分布 stats_query = """ SELECT difficulty, COUNT(*) as count FROM goal_detail GROUP BY difficulty """ cursor.execute(stats_query) stats = cursor.fetchall() print("\n=== Difficulty字段统计 ===") for difficulty, count in stats: print(f"Difficulty: {difficulty if difficulty else 'NULL'}, Count: {count}") cursor.close() return True except Exception as e: print(f"✗ 查询数据库失败: {e}") return False def test_generate_new_question(): """测试生成新题目""" print("\n=== 测试生成新题目 ===") # 准备测试数据 test_data = { "goalId": 1, "subject": "数学", "type": "选择题", "difficulty": "简单", "totalQuantity": 1, "knowledgePoint": "基础运算" } try: # 调用生成题目API response = requests.post( f"{BASE_URL}/ai-question/generate", json=test_data, headers={'Content-Type': 'application/json'}, timeout=30 ) if response.status_code == 200: result = response.json() print(f"✓ 题目生成成功: {result}") return True else: print(f"✗ 题目生成失败,状态码: {response.status_code}") print(f"响应内容: {response.text}") return False except requests.exceptions.RequestException as e: print(f"✗ 请求失败: {e}") return False def check_new_difficulty_data(connection): """检查新生成题目的difficulty字段""" try: cursor = connection.cursor() # 查询最新生成的题目(H2语法) query = """ SELECT id, goal_id, question, difficulty, created_at FROM goal_detail WHERE created_at >= DATEADD('MINUTE', -5, NOW()) ORDER BY created_at DESC """ cursor.execute(query) results = cursor.fetchall() print("\n=== 新生成题目difficulty字段检查 ===") if results: for row in results: id_val, goal_id, question, difficulty, created_at = row question_preview = question[:50] + "..." if len(question) > 50 else question status = "✓" if difficulty is not None else "✗" print(f"{status} ID: {id_val}, Goal: {goal_id}, Difficulty: {difficulty}, Created: {created_at}") print(f" Question: {question_preview}") else: print("没有找到最近5分钟内生成的题目") cursor.close() return len(results) > 0 except Exception as e: print(f"✗ 查询新数据失败: {e}") return False def test_api_endpoints(): """测试相关API端点""" print("\n=== 测试API端点 ===") # 测试健康检查 try: response = requests.get(f"{BASE_URL}/health", timeout=5) print(f"健康检查: {response.status_code}") except: print("健康检查: API不可用") # 测试获取目标列表 try: response = requests.get(f"{BASE_URL}/goals", timeout=5) if response.status_code == 200: goals = response.json() print(f"✓ 目标列表获取成功,共{len(goals)}个目标") if goals: print(f" 示例目标ID: {goals[0].get('id', 'N/A')}") return goals[0].get('id', 1) else: print(f"✗ 目标列表获取失败: {response.status_code}") except Exception as e: print(f"✗ 目标列表获取错误: {e}") return 1 # 默认使用目标ID 1 def main(): """主测试函数""" print("开始测试difficulty字段修复情况...") # 1. 测试数据库连接信息 test_database_connection() # 2. 测试API端点 goal_id = test_api_endpoints() # 3. 测试生成新题目 print("\n=== 测试生成新题目 ===") test_data = { "goalId": goal_id, "subject": "数学", "type": "选择题", "difficulty": "简单", "totalQuantity": 1, "knowledgePoint": "基础运算" } try: response = requests.post( f"{BASE_URL}/ai-question/generate", json=test_data, headers={'Content-Type': 'application/json'}, timeout=30 ) if response.status_code == 200: result = response.json() print(f"✓ 题目生成成功") print(f" 响应数据: {json.dumps(result, ensure_ascii=False, indent=2)}") # 检查响应中是否包含difficulty字段 if isinstance(result, list) and result: for i, question in enumerate(result): difficulty = question.get('difficulty') print(f" 题目{i+1} difficulty字段: {difficulty}") if difficulty is None: print(f" ⚠️ 题目{i+1}的difficulty字段为null") else: print(f" ✓ 题目{i+1}的difficulty字段正常: {difficulty}") else: print(f"✗ 题目生成失败,状态码: {response.status_code}") print(f" 响应内容: {response.text}") except Exception as e: print(f"✗ 题目生成请求失败: {e}") print("\n=== 数据库检查说明 ===") print("由于使用H2内存数据库,请通过以下方式手动检查:") print("1. 访问 http://localhost:8080/h2-console") print("2. 使用连接信息: jdbc:h2:mem:testdb, 用户名: sa, 密码: (空)") print("3. 执行SQL: SELECT id, goal_id, question, difficulty, created_at FROM goal_detail ORDER BY created_at DESC LIMIT 10") print("4. 检查difficulty字段是否为null") print("\n=== 测试完成 ===") if __name__ == "__main__": main()