123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466 |
- from django.shortcuts import render
- # Create your views here.
- # views.py
- from rest_framework import generics
- from rest_framework.authentication import TokenAuthentication
- from .models import RedTourismSpot
- from .serializers import RedTourismSpotSerializer
- from django.db.models import Q
- from rest_framework import generics, mixins
- class RedSpotSearchAPIView(mixins.ListModelMixin, generics.GenericAPIView):
- pagination_class = None
- serializer_class = RedTourismSpotSerializer
- queryset = RedTourismSpot.objects.all()
- def get_queryset(self):
- queryset = super().get_queryset()
- search_term = self.request.query_params.get('q', None)
- if search_term=='undefined':
- search_term = None
- category = self.request.query_params.get('category', '').strip()
- print(f"接收参数 - 搜索词: {search_term} (类型: {type(search_term)}), 分类: '{category}'")
- print(f"初始查询集数量: {queryset.count()}")
- # 搜索逻辑
- if search_term != None:
- print("检测到q参数存在")
- search_term = search_term.strip()
- if search_term:
- print(f"应用搜索筛选,关键词: '{search_term}'")
- queryset = queryset.filter(
- Q(name__icontains=search_term) |
- Q(location__icontains=search_term) |
- Q(description__icontains=search_term)
- ).distinct()
- else:
- print("q参数为空字符串,不应用搜索筛选")
- else:
- print("未接收到q参数,不应用搜索筛选")
- # 分类逻辑
- if category:
- print(f"应用分类筛选: '{category}'")
- queryset = queryset.filter(category__iexact=category)
- print(f"最终查询集数量: {queryset.count()}")
- return queryset
- def get(self, request, *args, **kwargs):
- return self.list(request, *args, **kwargs)
- from rest_framework.views import APIView
- from rest_framework.response import Response
- from rest_framework import status
- from api.models import UserPlan, UserInfo
- from .serializers import UserPlanSerializer
- import logging
- logger = logging.getLogger(__name__)
- class AddToPlanView(APIView):
- # 移除了Token认证
- authentication_classes = []
- permission_classes = []
- def post(self, request, *args, **kwargs):
- logger.info(f"收到添加行程请求,数据: {request.data}")
- try:
- # 从请求数据中获取用户ID
- user_id = request.data.get('user_id')
- if not user_id:
- return Response(
- {'success': False, 'message': '缺少用户ID'},
- status=status.HTTP_400_BAD_REQUEST
- )
- # 验证用户是否存在
- try:
- user = UserInfo.objects.get(id=user_id)
- except UserInfo.DoesNotExist:
- return Response(
- {'success': False, 'message': '用户不存在'},
- status=status.HTTP_404_NOT_FOUND
- )
- # 处理请求数据
- serializer = UserPlanSerializer(data=request.data)
- if not serializer.is_valid():
- return Response(
- {'success': False, 'message': '数据验证失败', 'errors': serializer.errors},
- status=status.HTTP_400_BAD_REQUEST
- )
- spot_id = serializer.validated_data['spot_id']
- # 检查是否已存在
- if UserPlan.objects.filter(user=user, spot_id=spot_id).exists():
- return Response(
- {'success': False, 'message': '该景点已在您的行程中'},
- status=status.HTTP_400_BAD_REQUEST
- )
- # 保存时关联用户
- instance = serializer.save(user=user)
- return Response(
- {'success': True, 'data': UserPlanSerializer(instance).data},
- status=status.HTTP_201_CREATED
- )
- except Exception as e:
- logger.exception("添加行程时发生异常:")
- return Response(
- {'success': False, 'message': '服务器内部错误'},
- status=status.HTTP_500_INTERNAL_SERVER_ERROR
- )
- from rest_framework.views import APIView
- from rest_framework.response import Response
- from rest_framework import status
- from api.models import UserPlan
- from .serializers import UserPlanSerializer
- from rest_framework.views import APIView
- from rest_framework.response import Response
- from rest_framework import status
- from .serializers import UserPlanSerializer
- class UserPlansView(APIView):
- def get(self, request, user_id, format=None): # 添加 format 参数
- try:
- # 检查用户是否存在(如果需要)
- # if not User.objects.filter(id=user_id).exists():
- # return Response(
- # {'success': False, 'message': '用户不存在'},
- # status=status.HTTP_404_NOT_FOUND
- # )
- plans = UserPlan.objects.filter(user_id=user_id).order_by('-created_at')
- if not plans.exists():
- return Response(
- {'success': True, 'data': [], 'message': '该用户暂无行程'},
- status=status.HTTP_200_OK
- )
- serializer = UserPlanSerializer(plans, many=True)
- return Response({
- 'success': True,
- 'data': serializer.data
- })
- except Exception as e:
- return Response({
- 'success': False,
- 'message': str(e)
- }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
- from rest_framework.views import APIView
- from rest_framework.response import Response
- from rest_framework import status
- from rest_framework.views import APIView
- from rest_framework.response import Response
- from rest_framework import status
- from api.models import UserPlan
- class DeletePlanView(APIView):
- # 移除认证类(不需要 Token)
- authentication_classes = [] # 禁用所有认证
- permission_classes = [] # 禁用所有权限检查
- def post(self, request, plan_id, *args, **kwargs):
- try:
- plan = UserPlan.objects.get(id=plan_id)
- plan.delete()
- return Response(
- {'success': True, 'message': '删除成功'},
- status=status.HTTP_200_OK
- )
- except UserPlan.DoesNotExist:
- return Response(
- {'success': False, 'message': '计划不存在'},
- status=status.HTTP_404_NOT_FOUND
- )
- except Exception as e:
- return Response(
- {'success': False, 'message': '服务器错误: ' + str(e)},
- status=status.HTTP_500_INTERNAL_SERVER_ERROR
- )
- from django.shortcuts import render
- from rest_framework.views import APIView
- from rest_framework.response import Response
- from django.http import JsonResponse
- from rest_framework import status
- # coding: utf-8
- import _thread as thread
- import os
- import time
- import base64
- import datetime
- import hashlib
- import hmac
- import json
- from urllib.parse import urlparse
- import ssl
- from datetime import datetime
- from time import mktime
- from urllib.parse import urlencode
- from wsgiref.handlers import format_date_time
- import websocket
- import openpyxl
- from concurrent.futures import ThreadPoolExecutor, as_completed
- import os
- class Ws_Param(object):
- # 初始化
- def __init__(self, APPID, APIKey, APISecret, gpt_url):
- self.APPID = APPID
- self.APIKey = APIKey
- self.APISecret = APISecret
- self.host = urlparse(gpt_url).netloc
- self.path = urlparse(gpt_url).path
- self.gpt_url = gpt_url
- # 生成url
- def create_url(self):
- # 生成RFC1123格式的时间戳
- now = datetime.now()
- date = format_date_time(mktime(now.timetuple()))
- # 拼接字符串
- signature_origin = "host: " + self.host + "\n"
- signature_origin += "date: " + date + "\n"
- signature_origin += "GET " + self.path + " HTTP/1.1"
- # 进行hmac-sha256进行加密
- signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
- digestmod=hashlib.sha256).digest()
- signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
- authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
- authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
- # 将请求的鉴权参数组合为字典
- v = {
- "authorization": authorization,
- "date": date,
- "host": self.host
- }
- # 拼接鉴权参数,生成url
- url = self.gpt_url + '?' + urlencode(v)
- # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
- return url
- # 收到websocket错误的处理
- def on_error(ws, error):
- print("### error:", error)
- # 收到websocket关闭的处理
- def on_close(ws):
- print("### closed ###")
- # 收到websocket连接建立的处理
- def on_open(ws):
- thread.start_new_thread(run, (ws,))
- def run(ws, *args):
- data = json.dumps(gen_params(appid=ws.appid, query=ws.query, domain=ws.domain))
- ws.send(data)
- # 定义一个全局变量来存储content
- content_all = ""
- # 收到websocket消息的处理
- def on_message(ws, message):
- global content_all
- data = json.loads(message)
- code = data['header']['code']
- if code != 0:
- print(f'请求错误: {code}, {data}')
- ws.close()
- else:
- choices = data["payload"]["choices"]
- status = choices["status"]
- content = choices["text"][0]["content"]
- content_all += content
- print(content, end='')
- if status == 2:
- ws.close()
- def gen_params(appid, query, domain):
- """
- 通过appid和用户的提问来生成请参数
- """
- data = {
- "header": {
- "app_id": "6d30de8d",
- "uid": "1234",
- # "patch_id": [] #接入微调模型,对应服务发布后的resourceid
- },
- "parameter": {
- "chat": {
- "domain": domain,
- "temperature": 0.5,
- "max_tokens": 4096,
- "auditing": "default",
- }
- },
- "payload": {
- "message": {
- "text": [{"role": "user", "content": query}]
- }
- }
- }
- return data
- def main(appid, api_secret, api_key, Spark_url, domain, query):
- wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
- websocket.enableTrace(False)
- wsUrl = wsParam.create_url()
- ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
- ws.appid = appid
- ws.query = query
- ws.domain = domain
- ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
- from django.utils import timezone
- class AITravelPlanView(APIView):
- def post(self, request):
- try:
- data = request.data
- # 参数验证
- locations = data.get('locations', [])
- days = data.get('days', 3)
- budget = data.get('budget', 5000)
- preferences = data.get('preferences', [])
- if not locations:
- return Response(
- {'status': 'error', 'error': '至少需要选择一个地点'},
- status=status.HTTP_400_BAD_REQUEST
- )
- # 构建AI提示词
- prompt = f"""你是一位资深旅游规划师,请为以下需求生成专业旅行计划:
- **基本需求**
- 地点:{", ".join(locations)}
- 天数:{days}天
- 预算:{budget}元
- 偏好:{", ".join(preferences) if preferences else "标准"}
- **输出要求**
- 1. 每日详细行程(时间+地点+活动)
- 2. 交通建议(含费用估算)
- 3. 餐饮推荐(人均消费)
- 4. 住宿建议(符合预算)
- 5. 舒适旅行贴士
- 6. 注意事项
- 格式要求:使用Markdown语法,清晰分段"""
- # 调用AI接口
- try:
- # 重置全局变量
- global content_all
- content_all = ""
- main(
- appid="6d30de8d",
- api_secret="YjMwN2E2YWE3MzU2NGE2YjI5ZDM5ZTMz",
- api_key="a88b5e5be130e0b91fdada536c36ac24",
- Spark_url="wss://spark-api.xf-yun.com/v4.0/chat",
- domain="4.0Ultra",
- query=prompt # 传入构建好的提示词
- )
- # 使用全局变量获取AI响应
- ai_response = content_all
- if not ai_response:
- raise ValueError("AI未返回有效内容")
- return Response({
- 'status': 'success',
- 'data': {
- 'recommendation': ai_response,
- }
- })
- except Exception as ai_error:
- return Response(
- {'status': 'error', 'error': f'AI服务异常: {str(ai_error)}'},
- status=status.HTTP_503_SERVICE_UNAVAILABLE
- )
- except Exception as e:
- return Response(
- {'status': 'error', 'error': f'服务器错误: {str(e)}'},
- status=status.HTTP_500_INTERNAL_SERVER_ERROR
- )
|