|
- from django.shortcuts import render
- from rest_framework.views import APIView
- from rest_framework.response import Response
- from django.http import JsonResponse
- from rest_framework import status
- # coding: utf-8
- import _thread as thread
- import os
- import time
- import base64
- import datetime
- import hashlib
- import hmac
- import json
- from urllib.parse import urlparse
- import ssl
- from datetime import datetime
- from time import mktime
- from urllib.parse import urlencode
- from wsgiref.handlers import format_date_time
- import websocket
- import openpyxl
- from concurrent.futures import ThreadPoolExecutor, as_completed
- import os
- class Ws_Param(object):
- # 初始化
- def __init__(self, APPID, APIKey, APISecret, gpt_url):
- self.APPID = APPID
- self.APIKey = APIKey
- self.APISecret = APISecret
- self.host = urlparse(gpt_url).netloc
- self.path = urlparse(gpt_url).path
- self.gpt_url = gpt_url
- # 生成url
- def create_url(self):
- # 生成RFC1123格式的时间戳
- now = datetime.now()
- date = format_date_time(mktime(now.timetuple()))
- # 拼接字符串
- signature_origin = "host: " + self.host + "\n"
- signature_origin += "date: " + date + "\n"
- signature_origin += "GET " + self.path + " HTTP/1.1"
- # 进行hmac-sha256进行加密
- signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
- digestmod=hashlib.sha256).digest()
- signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
- authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
- authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
- # 将请求的鉴权参数组合为字典
- v = {
- "authorization": authorization,
- "date": date,
- "host": self.host
- }
- # 拼接鉴权参数,生成url
- url = self.gpt_url + '?' + urlencode(v)
- # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
- return url
- # 收到websocket错误的处理
- def on_error(ws, error):
- print("### error:", error)
- # 收到websocket关闭的处理
- def on_close(ws):
- print("### closed ###")
- # 收到websocket连接建立的处理
- def on_open(ws):
- thread.start_new_thread(run, (ws,))
- def run(ws, *args):
- data = json.dumps(gen_params(appid=ws.appid, query=ws.query, domain=ws.domain))
- ws.send(data)
- # 定义一个全局变量来存储content
- content_all = ""
- # 收到websocket消息的处理
- def on_message(ws, message):
- global content_all
- data = json.loads(message)
- code = data['header']['code']
- if code != 0:
- print(f'请求错误: {code}, {data}')
- ws.close()
- else:
- choices = data["payload"]["choices"]
- status = choices["status"]
- content = choices["text"][0]["content"]
- content_all += content
- print(content, end='')
- if status == 2:
- ws.close()
- def gen_params(appid, query, domain):
- """
- 通过appid和用户的提问来生成请参数
- """
- data = {
- "header": {
- "app_id": "6d30de8d",
- "uid": "1234",
- # "patch_id": [] #接入微调模型,对应服务发布后的resourceid
- },
- "parameter": {
- "chat": {
- "domain": domain,
- "temperature": 0.5,
- "max_tokens": 4096,
- "auditing": "default",
- }
- },
- "payload": {
- "message": {
- "text": [{"role": "user", "content": query}]
- }
- }
- }
- return data
- def main(appid, api_secret, api_key, Spark_url, domain, query):
- wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
- websocket.enableTrace(False)
- wsUrl = wsParam.create_url()
- ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
- ws.appid = appid
- ws.query = query
- ws.domain = domain
- ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
- class DataAPIView(APIView):
- def post(self, request):
- message = request.data.get('message')
- print(message)
- main(
- appid="6d30de8d",
- api_secret="YjMwN2E2YWE3MzU2NGE2YjI5ZDM5ZTMz",
- api_key="a88b5e5be130e0b91fdada536c36ac24",
- # appid、api_secret、api_key三个服务认证信息请前往开放平台控制台查看(https://console.xfyun.cn/services/bm35)
- # Spark_url="wss://spark-api.xf-yun.com/v4.0/chat", # Max环境的地址
- Spark_url="wss://spark-api.xf-yun.com/v4.0/chat", # 4.0Ultra环境的地址
- # Spark_url = "wss://spark-api.xf-yun.com/v3.1/chat" # Pro环境的地址
- # Spark_url = "wss://spark-api.xf-yun.com/v1.1/chat" # Lite环境的地址
- # domain="generalv3.5", # Max版本
- domain="4.0Ultra", # 4.0Ultra 版本
- # domain = "generalv3" # Pro版本
- # domain = "lite" # Lite版本址
- query=message
- )
- return Response({
- 'status': 'success',
- 'message': '数据接收成功',
- 'data': request.data,
- 'content': content_all # 将content_all发送给前端
- }, status=status.HTTP_200_OK)
- from django.http import JsonResponse
- from .models import Question # 导入模型
- import random
- def random_question(request):
- """
- 返回随机题目及选项,格式示例:
- {
- "id": 1,
- "text": "遵义会议的意义是?",
- "choices": [
- {"id": 1, "text": "确立了毛泽东的领导地位", "is_correct": true},
- {"id": 2, "text": "宣告长征胜利结束", "is_correct": false}
- ]
- }
- """
- try:
- # 1. 获取所有问题并随机选择
- all_questions = list(Question.objects.prefetch_related('choices').all())
- if not all_questions:
- return JsonResponse({"error": "题库为空"}, status=404)
- selected_question = random.choice(all_questions)
- # 2. 构建选项数据
- choices_data = [
- {
- "id": choice.id,
- "text": choice.text,
- "is_correct": choice.is_correct
- }
- for choice in selected_question.choices.all()
- ]
- # 3. 组织响应数据
- response_data = {
- "id": selected_question.id,
- "text": selected_question.text,
- "choices": choices_data
- }
- return JsonResponse(response_data)
- except Exception as e:
- return JsonResponse({"error": str(e)}, status=500)
- import os
- import django
- # 设置环境变量和初始化 Django
- os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
- django.setup() # 关键!必须先调用
- from rest_framework import generics
- from .models import Question
- from .serializers import QuestionSerializer
- class QuestionListAPIView(generics.ListAPIView):
- queryset = Question.objects.all()
- serializer_class = QuestionSerializer
- class RandomQuestionAPIView(generics.ListAPIView):
- serializer_class = QuestionSerializer
- def get_queryset(self):
- return Question.objects.order_by('?')[:1] # 随机返回1题
- # views.py
- from rest_framework.views import APIView
- from rest_framework.response import Response
- class SubmitAnswerAPIView(APIView):
- def post(self, request):
- user_answer = request.data.get('answer') # 接收小程序提交的答案
- return Response({"status": "success", "user_answer": user_answer})
|