import os import tempfile import time import re import http.client import hashlib import urllib import random from django.core.files.storage import FileSystemStorage from django.core.paginator import Paginator, PageNotAnInteger, EmptyPage from django.db.models import Q from django.forms import model_to_dict from django.http import JsonResponse import requests import json from datetime import datetime, timedelta import jwt # 用于生成Token from django.shortcuts import get_object_or_404 from django.utils.decorators import method_decorator from django.views import View import qiniu from django.views.decorators.csrf import csrf_exempt from website import settings from .WXBizDataCrypt import WXBizDataCrypt # 引入微信解密库 from django.contrib.auth.hashers import make_password from .models import Administrator, FriendRequest, User, VideoExtractionRecord, ApiCallLog, Copywriting, MembershipType, \ RedemptionCard, IDCounter, VideoTask, AssetLibrary from urllib.parse import quote from Crypto.Cipher import AES import base64 from django.utils.dateparse import parse_datetime # 你的Django设置中的小程序配置 from MyApi.sparkAPI import SparkUtil from .API_Log import get_logger log_file = '小程序日志.log' logger = get_logger('小程序日志', log_file, when='midnight', backup_count=7) APPID = 'wxb5a1857369e5809e' SECRET = 'cbdb58343de23967d59fb90c7143a6b0' from django.utils import timezone import shortuuid from threading import Thread def is_valid_text(text): text = urllib.parse.unquote(text) # 定义正则表达式,匹配除了空白字符外的任意字符 regex = r'\S' # 使用正则表达式进行匹配 return re.search(regex, text) is not None def contains_sql_or_xss(input_string): """ 检测输入字符串是否包含SQL注入或XSS攻击的内容 """ input_string = urllib.parse.unquote(input_string) # SQL注入检测:检查是否包含SQL关键字或语句 sql_keywords = ['SELECT', 'UPDATE', 'DELETE', 'INSERT', 'DROP', 'ALTER', 'TRUNCATE', 'EXEC', 'UNION', 'FROM', 'WHERE', 'JOIN' ,'ldap' ,'~' , '/' ,'\/'] for keyword in sql_keywords: if keyword.lower() in input_string.lower(): return True # XSS攻击检测:检查是否包含HTML标签和JavaScript代码 # 使用正则表达式检测包含HTML标签和JavaScript代码的情况 pattern = re.compile(r'<[^>]+>|javascript:', re.IGNORECASE) if pattern.search(input_string): return True return False def decrypt_param(x, t): # 创建AES解密器对象 key = b'qw5w6SFE2D1jmxyd' iv = b'345GDFED433223DF' # 检查 x 和 t 是否为空 if not x or not t: return False try: cipher = AES.new(key, AES.MODE_CBC, iv) # 将密文进行Base64解码 ciphertext_bytes = base64.b64decode(x) # 使用解密器解密密文 plaintext_bytes = cipher.decrypt(ciphertext_bytes) # 删除填充字符 plaintext = plaintext_bytes.rstrip(b'\0').decode('utf-8') # 比较解密后的明文和 t if plaintext.rstrip('\x03') == t.rstrip('\x03'): return True else: return False except Exception as e: print(f"解密过程出现错误: {e}") return False #用于判断次数剩余并增加或者减少额度 def update_usage_count(openid, increment, function_type): try: user = User.objects.get(openid=openid) # 判断会员是否过期,如果过期则更新会员字段为False if user.member_end_time is not None and user.member_end_time < timezone.now().timestamp(): user.is_member = False user.membership_type = None user.save() logger.info(f'会员 {user.id} (openid: {openid}) 已过期,已更新为非会员') # 定义需要扣除次数的功能 GenerateVideoView ExtendVideoView GenerateImageVideoView functions_require_coins_for_members = ['GenerateVideoView','ExtendVideoView','GenerateImageVideoView'] # 替换为实际功能名 # 增加或减少用户使用次数 if increment > 0: user.coins += increment logger.info(f'用户 {user.id} (openid: {openid}) 使用次数增加 {increment} 次') user.save() return {'success': True} elif increment < 0: if user.is_member: if function_type in functions_require_coins_for_members: if user.coins + increment >= 0: user.coins += increment logger.info(f'会员用户 {user.id} (openid: {openid}) 使用功能 {function_type} 使用次数减少 {abs(increment)} 次 剩余 {user.coins}') user.save() return {'success': True} else: logger.warning(f'会员用户 {user.id} (openid: {openid}) 使用功能 {function_type} 使用次数不足以减少 {abs(increment)} 次') return {'success': False, 'message': '次数不足,请充值'} else: logger.info(f'会员用户 {user.id} (openid: {openid}) 使用功能 {function_type} 不需要扣除次数') return {'success': True} else: if user.coins + increment >= 0: user.coins += increment logger.info(f'用户 {user.id} (openid: {openid}) 使用功能 {function_type} 使用次数减少 {abs(increment)} 次 剩余 {user.coins}') user.save() return {'success': True} else: logger.warning(f'用户 {user.id} (openid: {openid}) 使用功能 {function_type} 使用次数不足以减少 {abs(increment)} 次') return {'success': False, 'message': '次数不足,请充值'} return {'success': True} except User.DoesNotExist: logger.error(f'用户 (openid: {openid}) 不存在') return {'success': False, 'message': '用户不存在'} #用于卡密兑换功能 def update_user_membership(openid, duration_days): current_time = int(time.time()) # 获取当前时间戳 try: user = User.objects.get(openid=openid) # 更新会员状态和时间 user.is_member = True if user.member_start_time is None: # 如果用户是第一次开会员 user.member_start_time = current_time logger.info(f'第一次开会员:{current_time} -- {current_time + duration_days * 24 * 3600}') elif user.member_end_time is None or user.member_end_time < current_time: # 如果会员已经过期 logger.info(f'过期:{current_time} -- {current_time + duration_days * 24 * 3600}') user.member_start_time = current_time # 计算会员到期时间 if user.member_end_time is None or user.member_end_time < current_time: logger.info(f'第一次开会员2:{user.member_end_time} -- {current_time + duration_days * 24 * 3600}') user.member_end_time = current_time + duration_days * 24 * 3600 # 将天数转换为秒 else: # 如果会员尚未过期,则在现有会员结束时间上添加续费的天数 logger.info(f'续费:{user.member_end_time} -- {user.member_end_time + duration_days * 24 * 3600}') user.member_end_time += duration_days * 24 * 3600 user.save() return True except User.DoesNotExist: # 处理用户不存在的情况 return False #日志记录 def log_api_call(source, openid, nickname, wxid, wechat_alias, api_name, is_successful, remarks=""): """ 插入API调用日志记录的函数。 """ ApiCallLog.objects.create( source=source, openid=openid, nickname=nickname, wxid=wxid, wechat_alias=wechat_alias, api_name=api_name, is_successful=is_successful, remarks=remarks ) # log_api_call( # source='weixin', # openid='用户的openid', # nickname='用户昵称', # wxid='用户微信ID', # wechat_alias='用户微信号', # api_name='调用的API名称', # is_successful=True, # remarks='备注信息' # ) def generate_short_id(): # 使用IDCounter模型获取下一个ID next_id = IDCounter.get_next_id() return next_id def miniapp_login(request): data = json.loads(request.body.decode('utf-8')) code = data.get('code') openid = data.get('openid','') iv = data.get('iv') launchOptions = data.get('Options', {}) logger.info(f' 登录 {launchOptions}') print(launchOptions) if not code: return JsonResponse({'error': '缺少code参数'}, status=400) # 使用code获取session_key和openid url = f'https://api.weixin.qq.com/sns/jscode2session?appid={APPID}&secret={SECRET}&js_code={code}&grant_type=authorization_code' res = requests.get(url) result = res.json() logger.info(f'登录接口返回:{result}') if 'openid' in result: openid = result['openid'] session_key = result['session_key'] if openid: user, created = User.objects.get_or_create(openid=openid) # 如果是新用户,则设置其它属性 defaultDailyFreeParseNum totalParseNum logger.info(f'登录接口1 openid{openid} :created {created}') if created: logger.info(f'登录接口2 openid{openid} :created {created}') uuid = generate_short_id() user.wxid = launchOptions.get('query', {}).get('wxid', '') #微信id user.wechat_number = launchOptions.get('query', {}).get('alias', '') #微信号 user.nickname = uuid user.scene = launchOptions.get('scene', '') #来源 user.inviter_nickname = launchOptions.get('inviter_nickname', '') #邀请人 user.save() user = User.objects.get(openid=openid) user_info = model_to_dict(user) logger.info(f'新用户openid {openid} --uuid{uuid} 添加成功 {user_info}') return JsonResponse({'token': session_key, 'userInfo': user_info,}) else: user = User.objects.get(openid=openid) user_info = model_to_dict(user) logger.info(f'老用户 {openid} 进行访问 {user_info}') return JsonResponse({'token': session_key, 'userInfo': user_info}) else: logger.error(f'没有openid参数') return JsonResponse({'error': '没有openid参数'}, status=500) else: logger.error(f'微信登录失败 data{result}') return JsonResponse({'error': '微信登录失败'}, status=500) #获取用户信息 def userinfo(request): data = json.loads(request.body) openid = data.get('openid', '') if contains_sql_or_xss(openid): logger.error(f'{openid} 用户触发威胁字符检测: {data}') return JsonResponse({'status': 'error', 'message': '你的国际行为已被记录,我们将保留追责权利'}) if not openid: return JsonResponse({"status": "error", 'body': '缺少openid参数'}) try: user = User.objects.get(openid=openid) except User.DoesNotExist: return JsonResponse({"status": "error", 'userInfo': None}) increment = -0 function_type = 'luserinfo' result = update_usage_count(openid, increment, function_type) membership_types = MembershipType.objects.all().values('type', 'title', 'description', 'price', 'coins', 'is_quota') user_info = model_to_dict(user) try: user_info['member_start_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(user.member_start_time)) user_info['member_end_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(user.member_end_time)) except: user_info['member_start_time'] = user.member_start_time user_info['member_end_time'] = user.member_end_time membership_type_list = list(membership_types) return JsonResponse({"status": "success", 'userInfo': user_info, 'membershipTypeList': membership_type_list}) def generate_token_for_user(openid): # 定义密钥,你应该将其保存在配置文件或环境变量中 secret = 'oigjfs**00--++2s' # 定义Token的过期时间,这里设置为24小时 expiration = datetime.utcnow() + timedelta(days=1) # 创建Token token = jwt.encode({ 'openid': openid, 'exp': expiration }, secret, algorithm='HS256') # PyJWT版本可能会影响返回值类型,确保返回字符串 return token.decode('utf-8') if isinstance(token, bytes) else token # 你的视频提取接口、userId和secretKey API_URL = "https://h.aaaapp.cn/single_post" USER_ID = "22F1BE410945DD40F0569BFA197A85C0" SECRET_KEY = "7801ffd6cceaff579812214a93b948b4" #提取视频 def video_extraction(request): # 解析POST请求体 data = json.loads(request.body) x = request.headers.get('X', '') t = request.headers.get('T', '') if decrypt_param(x, t): try: logger.info(f'视频提取data: {data} ') video_url = data.get('url') uuid = data.get('uuid') openid = data.get('openid') increment = - 1 function_type = 'video_extraction' result = update_usage_count(openid, increment, function_type) if result['success']: encoded_video_url = quote(video_url, safe='') # 这里简化了token验证逻辑,实际应用中应根据项目需求进行相应的安全校验 if not uuid or not video_url: logger.error(f'{encoded_video_url}视频提取失败--未登录-uuid:{uuid}--openid:{openid}') return JsonResponse({'error': '未登录'}) # 调用视频提取API response = requests.post(API_URL, data={ 'url': video_url, 'userId': USER_ID, 'secretKey': SECRET_KEY }) if response.status_code == 200: logger.info(f'{encoded_video_url}视频提取成功--uuid:{uuid}--openid:{openid}') return JsonResponse(response.json()) else: logger.error(f'提取链接无效{response.text}') return JsonResponse({'error': '视频提取失败'}, status=response.status_code) else: print(f'使用次数更新失败: {result}') return JsonResponse(result) except json.JSONDecodeError: logger.error(f'视频提取出错') return JsonResponse({'error': '无效的请求'}) else: return JsonResponse({'error': '非法参数'}, status=400) #储存好友请求数据 def add_friend_request(request): if request.method == 'POST': data = json.loads(request.body) # 解析申请时间字符串为datetime对象 fromusername = data.get('fromusername') friend_request = FriendRequest.objects.filter(fromusername=fromusername).first() if friend_request: print('已存在') return JsonResponse({"status": "success", "message": "已存在数据"}, status=200) else: data['time'] = datetime.strptime(data['time'], '%Y-%m-%d %H:%M:%S') friend_request = FriendRequest.objects.create(**data) friend_request.save() print('入库成功') return JsonResponse({"status": "success", "message": "添加好友请求已保存"}, status=200) else: return JsonResponse({"status": "error", "message": "只支持POST请求"}, status=405) #返回使用次数 def total(request): data = json.loads(request.body) uuid = data.get('uuid') try: user = User.objects.get(nickname=uuid) total_count = user.usage_count except User.DoesNotExist: total_count = 0 return JsonResponse({'total_num': total_count}) #改写文案接口 def rewrite_text(request): if request.method == 'POST': try: data = json.loads(request.body) logger.info(f'文案改写开始{data}') text = data.get('text') uuid = data.get('uuid') openid = data.get('openid','') type = data.get('type', '') wxid = data.get('wxid', '') alias = data.get('alias', '') x = request.headers.get('X', '') t = request.headers.get('T', '') if decrypt_param(x, t): # 确保text参数存在 if not text: logger.error(f'缺少必要的文案参数--uuid:{uuid} --openid:{openid}') return JsonResponse({'error': '缺少必要的文案参数'}, status=400) query = f'{text}。【改写这篇文案,要求吸引人,引起人们共鸣,如果有错别字或者不通顺语句请改正,字数需要和原文差不多】' # 返回改写后的文案 #增加提取记录 if openid == '' or openid == None: logger.error(f'改写文案失败 未登录-uuid:{uuid}--openid:{openid}') return JsonResponse({'error': '未登录,请重试'}) else: rest = SparkUtil().query_and_get_data(query) increment = -1 function_type = 'video_extraction' result = update_usage_count(openid, increment, function_type) if result['success']: # 提取成功用户使用次数+1 user = User.objects.get(openid=openid) user.usage_count += 1 user.save() logger.info(f'改写文案成功-{rest}-uuid:{uuid}--openid:{openid}') return JsonResponse({ 'status': 'success', # 'original_text': text, 'rewritten_text': rest }) else: logger.error(f'ai改写文案次数不足或不是会员-{rest}-uuid:{uuid}--openid:{openid}') return JsonResponse(result) else: logger.error( f'改写文案: 【非法参数】\n headers={request.headers}\n data={data} \n ---------------------------------') return JsonResponse({'error': '非法参数'}) except json.JSONDecodeError: logger.error(f'ai改写文案----无效的请求格式') return JsonResponse({'error': '无效的请求格式'}) else: logger.error(f'ai改写文案----仅支持POST请求') return JsonResponse({'error': '仅支持POST请求'}) def get_copywriting_list_with_filters(request): """ 返回带有搜索、分类和排序功能的分页文案列表,只包括已审核的文案 """ # 获取请求参数 page = request.GET.get('page', 1) page_size = request.GET.get('page_size', 10) search_query = request.GET.get('search', '') search_query = quote(search_query) sort_by = request.GET.get('sort_by', 'default') # 默认按综合排序 category = request.GET.get('category', '') # 构建查询条件 query_conditions = Q(text_content__icontains=search_query) & Q(is_approved=True) # 只选择已审核的文案 # 如果有分类条件,则进一步过滤 if category: query_conditions &= Q(tag__icontains=category) # 根据查询条件查询文案记录 if sort_by == 'default': copywriting_list = Copywriting.objects.filter(query_conditions).order_by('-id', '-added_time') elif sort_by == 'popularity': # 按热度排序 copywriting_list = Copywriting.objects.filter(query_conditions).order_by('-popularity', '-added_time') elif sort_by == 'time': # 按时间排序 copywriting_list = Copywriting.objects.filter(query_conditions).order_by('-added_time') else: # 默认排序:先按热度,然后按时间 copywriting_list = Copywriting.objects.filter(query_conditions).order_by('-id', '-added_time') # 使用Django的Paginator进行分页 paginator = Paginator(copywriting_list, page_size) try: copywritings = paginator.page(page) except PageNotAnInteger: copywritings = paginator.page(1) except EmptyPage: copywritings = paginator.page(paginator.num_pages) # 获取当前页的数据 current_page_data = list(copywritings.object_list.values('id', 'text_content', 'tag', 'popularity')) # 构造响应数据 data = { 'status': 'success', 'data': current_page_data, 'current_page': copywritings.number, 'total_pages': paginator.num_pages, } return JsonResponse(data) #卡密兑换功能 def redeem_card(request): if request.method == 'POST': try: data = json.loads(request.body) x = request.headers.get('X', '') t = request.headers.get('T', '') if decrypt_param(x, t): card_code = data.get('card_code') openid = data.get('openid') uuid = data.get('uuid') # 根据卡密查询兑换卡信息 logger.info(f'卡密兑换获取到数据: {data}') try: card = RedemptionCard.objects.get(code=card_code) except RedemptionCard.DoesNotExist: logger.error(f'无效的卡密: {card_code}') return JsonResponse({'error': '无效的卡密'}) # 检查卡密是否已经被使用 if card.is_used: logger.error(f'该卡密已被使用: {card_code}') return JsonResponse({'error': '该卡密已被使用'}) # 检查卡密是否过期 if card.expiry_date < timezone.now(): logger.error(f'该卡密已过期: {card_code}') return JsonResponse({'error': '该卡密已过期'}) # 根据卡类型执行相应操作 if card.card_type == 'member': # 如果是会员卡,调用更新会员函数 success = update_user_membership(openid, card.validity_period) if success: card.used_by_openid = openid card.is_used = True card.used_by_nickname = uuid card.save() logger.info(f' {openid} 成功领取: {card.validity_period}天会员 {card_code}') return JsonResponse({"status": "success", 'message': f'成功领取{card.validity_period}天会员'}) else: return JsonResponse({'error': '卡密激活失败,请联系管理员'}) elif card.card_type == 'quota': # 如果是额度卡,调用更新使用次数函数 success = update_usage_count(openid, increment=+card.validity_period,function_type='redeem_card') if success.get('success'): card.used_by_openid = openid card.is_used = True card.used_by_nickname = uuid card.save() logger.info(f' {openid} 成功领取:{card.validity_period}次额度 {card_code}') return JsonResponse({"status": "success", 'message': f'成功领取{card.validity_period}次额度'}) else: logger.error(f' 卡密激活失败,请联系管理员 {card_code}') return JsonResponse({'error': '卡密激活失败,请联系管理员'}) else: logger.error(f' 卡密激活失败 {card_code}') return JsonResponse({'error': '卡密激活失败'}) else: return JsonResponse({'error': '非法参数'}, status=400) except Exception as e: logger.error(f' 处理请求时出错') return JsonResponse({'error': '处理请求时出错:' + str(e)}) else: logger.error(f' 卡密兑换接口 -- 仅支持POST请求') return JsonResponse({'error': '仅支持POST请求'}) #邀请用户接口 def reward_invitation(request): if request.method == 'POST': try: data = json.loads(request.body) x = request.headers.get('X', '') t = request.headers.get('T', '') if decrypt_param(x, t): logger.info(f'邀请激励数据: {data}') openid = data.get('openid', '') uuid = data.get('uuid', '') if openid and uuid: # 查找uuid对应的用户,确保是新用户 try: invited_user = User.objects.get(nickname=uuid) logger.info(f'用户 {uuid} 已经存在,不能重复邀请') return JsonResponse({'success': False, 'message': '用户已存在,不能重复邀请'}) except User.DoesNotExist: # 被邀请用户不存在,说明是新用户,可以发放奖励 try: inviter_user = User.objects.get(openid=openid) except User.DoesNotExist: logger.error(f'邀请人用户 {openid} 不存在') return JsonResponse({'success': False, 'error': '邀请人用户不存在'}) # 更新邀请人信息 inviter_user.invitees_count += 1 inviter_user.coins += 10 inviter_user.save() # 创建被邀请用户并设置邀请人信息 new_user = User(nickname=uuid, inviter_nickname=inviter_user.nickname) new_user.save() logger.info(f'用户 {openid} 邀请新用户 {uuid} 成功,奖励已发放') return JsonResponse({'success': True, 'message': '奖励发放成功'}) else: logger.error(f'用户邀请奖励接口 缺少必要参数') return JsonResponse({'success': False, 'error': '缺少必要参数'}) else: return JsonResponse({'success': False, 'error': '非法参数'}) except User.DoesNotExist: logger.error(f'用户不存在') return JsonResponse({'success': False, 'error': '用户不存在'}) except Exception as e: logger.error(f'发放奖励时出错: {str(e)}') return JsonResponse({'success': False, 'error': '服务器内部错误'}) else: logger.error(f'用户邀请奖励接口 请求方法不支持') return JsonResponse({'success': False, 'error': '请求方法不支持'}) def upload_to_qiniu(file_path, key): access_key = settings.QINIU_ACCESS_KEY secret_key = settings.QINIU_SECRET_KEY bucket_name = settings.QINIU_BUCKET_NAME bucket_domain = settings.QINIU_DOMAIN q = qiniu.Auth(access_key, secret_key) token = q.upload_token(bucket_name, key) ret, info = qiniu.put_file(token, key, file_path) if info.status_code == 200: encoded_url = f"https://{bucket_domain}/{quote(key)}" return encoded_url else: raise Exception("上传到七牛云失败") class UploadImageView(View): @method_decorator(csrf_exempt) def post(self, request): image_file = request.FILES.get('image_file') if not image_file: return JsonResponse({"message": "没有提供图像文件"}, status=400) if not image_file.content_type.startswith('image/'): return JsonResponse({"message": "文件格式不支持,只支持图片文件"}, status=400) try: # 将文件保存到本地文件夹 fs = FileSystemStorage() filename = fs.save(image_file.name, image_file) file_url = fs.url(filename) return JsonResponse({"message": "文件上传成功", "file_url": file_url}, status=200) except Exception as e: return JsonResponse({"message": "内部服务器错误", "error": str(e)}, status=500) appid = '20240601002067404' # 填写你的appid secretKey = '6pRZ9HCSqGuMqzLO55hB' # 填写你的密钥 def translate(text, from_lang='auto', to_lang='en'): myurl = '/api/trans/vip/translate' salt = random.randint(32768, 65536) sign = appid + text + str(salt) + secretKey sign = hashlib.md5(sign.encode()).hexdigest() myurl = (myurl + '?appid=' + appid + '&q=' + urllib.parse.quote(text) + '&from=' + from_lang + '&to=' + to_lang + '&salt=' + str(salt) + '&sign=' + sign) try: httpClient = http.client.HTTPConnection('api.fanyi.baidu.com') httpClient.request('GET', myurl) # response是HTTPResponse对象 response = httpClient.getresponse() result_all = response.read().decode("utf-8") result = json.loads(result_all) return result['trans_result'][0]['dst'] except Exception as e: return str(e) finally: if httpClient: httpClient.close() class UserVideoTaskListView(View): def get(self, request, user_id): x = request.headers.get('X', '') t = request.headers.get('T', '') if decrypt_param(x, t): try: video_tasks = VideoTask.objects.filter(nickname=user_id).order_by('-created_at') page = request.GET.get('page', 1) # 获取页码参数,默认为第一页 page_size = request.GET.get('page_size', 10) # 每页显示的条数,默认为10 paginator = Paginator(video_tasks, page_size) try: tasks_page = paginator.page(page) except PageNotAnInteger: tasks_page = paginator.page(1) except EmptyPage: tasks_page = paginator.page(paginator.num_pages) task_list = [] for task in tasks_page: task_list.append({ 'openid': task.openid, 'nickname': task.nickname, 'task_id': task.task_id, 'task_type': task.task_type, 'status': task.status, 'created_at': task.created_at.strftime('%Y-%m-%d %H:%M:%S'), 'updated_at': task.updated_at.strftime('%Y-%m-%d %H:%M:%S'), 'text_prompt': task.description_zh, 'width': task.width, 'height': task.height, 'motion_score': task.motion_score, 'style': task.style, 'seconds': task.seconds, 'image_url': task.image_url, 'result_url': task.result_url, 'gif_url': task.gif_url, 'qiniu_url': task.qiniu_url, 'progress': task.progress, 'error_message': task.error_message, }) response_data = { "video_tasks": task_list, "total_tasks": paginator.count, "total_pages": paginator.num_pages, "current_page": tasks_page.number, } return JsonResponse(response_data, status=200) except User.DoesNotExist: return JsonResponse({"message": "用户不存在"}, status=404) else: return JsonResponse({"message": "非法参数"},status=400) class DeleteVideoTaskView(View): def delete(self, request, user_id, task_id): x = request.headers.get('X', '') t = request.headers.get('T', '') if decrypt_param(x, t): try: task = VideoTask.objects.get(nickname=user_id, task_id=task_id) task.delete() return JsonResponse({"message": "删除成功"}, status=200) except User.DoesNotExist: return JsonResponse({"message": "用户不存在"}, status=404) except VideoTask.DoesNotExist: return JsonResponse({"message": "任务不存在"}, status=404) except Exception as e: return JsonResponse({"message": f"删除失败: {str(e)}"}, status=500) else: return JsonResponse({"message": "非法参数"},status=400) class AssetLibraryView(View): def get(self, request): x = request.headers.get('X', '') t = request.headers.get('T', '') if decrypt_param(x, t): search_query = request.GET.get('search', '') category = request.GET.get('category', '') page = request.GET.get('page', 1) page_size = request.GET.get('page_size', 10) # 过滤条件 assets = AssetLibrary.objects.filter(is_approved=True).order_by('-generated_at') if search_query: assets = assets.filter(description__icontains=search_query) if category: assets = assets.filter(category=category) paginator = Paginator(assets, page_size) try: assets_page = paginator.page(page) except PageNotAnInteger: assets_page = paginator.page(1) except EmptyPage: assets_page = paginator.page(paginator.num_pages) asset_list = [] for asset in assets_page: asset_list.append({ "asset_id": asset.id, 'qiniu_url': asset.qiniu_url, 'gif_url': asset.gif_url, 'original_url': asset.original_url, 'duration': asset.duration, 'category': asset.category, 'description': asset.description, 'generated_at': asset.generated_at.strftime('%Y-%m-%d %H:%M:%S'), 'download_count': asset.download_count, 'is_approved': asset.is_approved, }) response_data = { 'total_count': paginator.count, 'total_pages': paginator.num_pages, 'current_page': assets_page.number, 'assets': asset_list } return JsonResponse(response_data, status=200) else: return JsonResponse({"message": "非法参数"},status=400) #统计素材库下载 保存视频 复制链接 def increment_download_count(request): if request.method == 'POST': x = request.headers.get('X', '') t = request.headers.get('T', '') if decrypt_param(x, t): data = json.loads(request.body) asset_id = data.get('asset_id','') openid = data.get('openid') increment = -1 function_type = 'increment_download_count' if not asset_id: result = update_usage_count(openid, increment, function_type) if result['success']: return JsonResponse({'success': True, 'message': 'ok'}, status=200) else: logger.error(f' 视频号提取-复制文案-次数不足 {result}') return JsonResponse(result) else: asset = get_object_or_404(AssetLibrary, id=asset_id) asset.download_count += 1 asset.save() result = update_usage_count(openid, increment, function_type) if result['success']: return JsonResponse({'success': True, 'message': 'ok'}, status=200) else: logger.error(f' 次数不足 {result}') return JsonResponse(result) else: return JsonResponse({'error': '非法参数'}, status=400) return JsonResponse({'success': False, 'message': 'no'}, status=405) headers = { "accept": "application/json", "content-type": "application/json", "Authorization": "12e76710fad2047db8c0cc6b25987e2a2" # 替换为你的真实授权密钥 } callback_url = "http://5109tb4417.qicp.vip/myapp/video-generation-callback/" class GenerateVideoView(View): def post(self, request): data = json.loads(request.body) x = request.headers.get('X', '') t = request.headers.get('T', '') if decrypt_param(x, t): user_id = data.get('user_id') text_prompt = data.get('text_prompt') width = data.get('width') height = data.get('height') motion_score = data.get('motion_score') style = data.get('style', '') description_en = translate(text_prompt, 'auto', 'en') # 添加风格到描述末尾 if style: description_en += f".style: {style}" try: user = User.objects.get(nickname=user_id) except User.DoesNotExist: return JsonResponse({"message": "用户不存在"}, status=404) # 判断用户是否有使用次数 increment = -10 # 假设每次生成视频需要消耗一次使用次数 function_type = 'GenerateVideoView' result = update_usage_count(user.openid, increment, function_type) if not result['success']: return JsonResponse(result) payload = { "text_prompt": description_en, "width": width, "height": height, "motion": motion_score, "seed": 0, "upscale": True, "interpolate": True, "callback_url": callback_url } try: response = requests.post("https://api.aivideoapi.com/runway/generate/text", json=payload, headers=headers) response.raise_for_status() # 如果响应状态码不是 200,抛出 HTTPError 异常 except requests.exceptions.RequestException as e: logger.error(f"请求视频生成接口时出错: {e}") return JsonResponse({"message": "任务创建失败"}, status=500) response_data = response.json() if 'uuid' in response_data: task_id = response_data['uuid'] VideoTask.objects.create( openid=user.openid, nickname=user.nickname, task_id=task_id, task_type='文生视频', status='running', text_prompt=description_en, description_zh=text_prompt, width=width, height=height, motion_score=motion_score, style=style, seconds=4 ) return JsonResponse({"message": "任务创建成功", "task_id": task_id}, status=200) else: logger.error(f"任务创建失败,响应: {response_data}") return JsonResponse({"message": "任务创建失败"}, status=500) else: return JsonResponse({"message": "非法参数"}, status=400) class ExtendVideoView(View): def post(self, request): data = json.loads(request.body) x = request.headers.get('X', '') t = request.headers.get('T', '') if decrypt_param(x, t): user_id = data.get('user_id') task_id = data.get('task_id') motion_score = data.get('motion_score', 5) try: # 从数据库中获取任务信息 task = VideoTask.objects.get(task_id=task_id) user = User.objects.get(nickname=user_id) except VideoTask.DoesNotExist: return JsonResponse({"message": "任务不存在"}, status=404) except User.DoesNotExist: return JsonResponse({"message": "用户不存在"}, status=404) # 判断用户是否有使用次数 increment = -10 # 假设每次生成视频需要消耗一次使用次数 function_type = 'ExtendVideoView' result = update_usage_count(user.openid, increment, function_type) if not result['success']: return JsonResponse(result) # 计算当前总时长 current_seconds = int(task.seconds) # 确保扩展后的总时长不超过16秒 additional_seconds = 4 # 每次增加4秒 if current_seconds + additional_seconds > 16: return JsonResponse({"message": "扩展后的总时长不能超过16秒"}, status=400) new_total_seconds = current_seconds + additional_seconds payload = { "uuid": task_id, "motion": motion_score, "seed": 0, "upscale": True, "interpolate": True, "callback_url": callback_url } try: response = requests.post("https://api.aivideoapi.com/runway/extend", json=payload, headers=headers) response.raise_for_status() # 如果响应状态码不是 200,抛出 HTTPError 异常 except requests.exceptions.RequestException as e: logger.error(f"请求视频扩展接口时出错: {e}") return JsonResponse({"message": "任务创建失败"}, status=500) response_data = response.json() if 'uuid' in response_data: new_task_id = response_data['uuid'] VideoTask.objects.create( openid=task.openid, nickname=task.nickname, task_id=new_task_id, task_type='文生视频', status='running', text_prompt=task.text_prompt, description_zh=task.description_zh, width=task.width, height=task.height, motion_score=motion_score, style=task.style, seconds=new_total_seconds, ) return JsonResponse({"message": "任务创建成功", "task_id": new_task_id}, status=200) else: logger.error(f"任务创建失败,响应: {response_data}") return JsonResponse({"message": "任务创建失败"}, status=500) else: return JsonResponse({"message": "非法参数"}, status=400) class GenerateImageVideoView(View): def post(self, request): data = json.loads(request.body) user_id = data.get('user_id') text_prompt = data.get('text_prompt') image_url = data.get('image_url') motion_score = data.get('motion_score', 5) # M 1-10,默认值为5 description_en = translate(text_prompt, 'auto', 'en') try: user = User.objects.get(nickname=user_id) except User.DoesNotExist: return JsonResponse({"message": "用户不存在"}, status=404) # 判断用户是否有使用次数 increment = -10 # 假设每次生成视频需要消耗一次使用次数 function_type = 'GenerateImageVideoView' result = update_usage_count(user.openid, increment, function_type) if not result['success']: return JsonResponse(result) payload = { "text_prompt": description_en, "img_prompt": image_url, "motion": motion_score, "seed": 0, "upscale": True, "interpolate": True, "callback_url": callback_url } try: response = requests.post("https://api.aivideoapi.com/runway/generate/imageDescription", json=payload, headers=headers) response.raise_for_status() # 如果响应状态码不是 200,抛出 HTTPError 异常 except requests.exceptions.RequestException as e: logger.error(f"请求视频生成接口时出错: {e}") return JsonResponse({"message": "任务创建失败"}, status=500) response_data = response.json() if 'uuid' in response_data: task_id = response_data['uuid'] VideoTask.objects.create( openid=user.openid, nickname=user.nickname, task_id=task_id, task_type='图生视频', status='running', text_prompt=description_en, description_zh=text_prompt, image_url=image_url, motion_score=motion_score, seconds=4 ) return JsonResponse({"message": "任务创建成功", "task_id": task_id}, status=200) else: logger.error(f"任务创建失败,响应: {response_data}") return JsonResponse({"message": "任务创建失败"}, status=500) class TaskStatusView(View): def get(self, request, task_id): if not task_id: return JsonResponse({"message": "缺少任务ID"}, status=400) try: # 从数据库中获取任务 task = VideoTask.objects.get(task_id=task_id) except VideoTask.DoesNotExist: return JsonResponse({"message": "任务不存在"}, status=404) url = f"https://api.aivideoapi.com/status?uuid={task_id}" try: response = requests.get(url, headers=headers) response.raise_for_status() response_data = response.json() print(response_data) except requests.exceptions.RequestException as e: logger.error(f"查询任务状态时出错: {e}") return JsonResponse({"message": "查询任务状态失败", "error": str(e)}, status=500) progress = float(response_data.get('progress', 0)) if response_data.get('progress') is not None else 0.0 task.progress = progress if task.progress >= 1.0: if response_data.get('status', task.status) == 'success': task.status = 'success' task.result_url = response_data.get('url', task.result_url) task.gif_url = response_data.get('gif_url', task.gif_url) AssetLibrary.objects.create( original_url=task.result_url, duration=task.seconds, category=task.style, description=task.text_prompt, description_zh=task.description_zh, generated_by=task.nickname, gif_url=task.gif_url, is_approved=True # 假设任务成功的素材自动审核通过 ) elif response_data.get('status', task.status) == 'failed': task.status = 'failed' elif progress==0.0: task.status = 'pending' else: task.status = 'running' # 保存更新后的任务 task.save() return JsonResponse({ "task_id": task.task_id, "status": task.status, "progress": task.progress, "result_url": task.result_url, "gif_url": task.gif_url }, status=200) class UpdatePendingTasksView(View): def get(self, request): pending_tasks = VideoTask.objects.filter(status__in=['running', 'pending']) task_updates = [] # 打印当前执行时间和任务数量 current_time = timezone.now().strftime('%Y-%m-%d %H:%M:%S') logger.info(f"当前执行时间:{current_time}, 共 {len(pending_tasks)} 个任务") for task in pending_tasks: task_info = { "task_id": task.task_id, "status": "正在执行", "message": "" } logger.info(f"{task_info}") try: # 调用 QueryTaskStatusView 接口 response = requests.get(f"http://127.0.0.1:55556/myapp/task_status/{task.task_id}/") if response.status_code == 200: task_data = response.json() print(task_data) task_info = { "task_id": task.task_id, "status": "执行成功", "message": "" } logger.info(f"{task_info}") task_updates.append(task_info) else: task_info["status"] = "执行失败" task_info["message"] = f"请求失败,状态码:{response.status_code}" logger.error(f"{task_info}") task_updates.append(task_info) except Exception as e: task_info["status"] = "执行失败" task_info["message"] = f"错误信息:{str(e)}" logger.error(f"{task_info}") task_updates.append(task_info) return JsonResponse({"updated_tasks": task_updates}, status=200) #每日奖励次数 class DailyCoinsBonusView(View): def get(self, request): daily_bonus = int(request.GET.get('daily_bonus', 5)) # 获取每日赠送的金币数量,默认为5 try: # 获取所有用户 users = User.objects.all() users_count = users.count() # 为所有用户添加每日金币奖励 for user in users: user.coins += daily_bonus user.save() logger.info(f"总用户数量:{users_count}, 每个用户已赠送 {daily_bonus} 个金币") return JsonResponse({"message": "每日金币赠送成功"}, status=200) except Exception as e: logger.error(f"每日赠送金币时出错: {str(e)}") return JsonResponse({"message": "每日赠送金币时出错"}, status=500) class VideoGenerationCallbackView(View): def get(self, request): try: print(request) print(request.body) data = request.GET.dict() logger.info(f'收到视频生成结果回调: {data}') print(f'收到视频生成结果回调: {data}') # 直接打印到控制台 task_id = data.get('uuid') status = data.get('status') progress = float(data.get('progress', 0)) result_url = data.get('url') gif_url = data.get('gif_url') if not task_id: return JsonResponse({'success': False, 'message': '缺少任务ID'}, status=400) try: task = VideoTask.objects.get(task_id=task_id) except VideoTask.DoesNotExist: return JsonResponse({'success': False, 'message': '任务不存在'}, status=404) task.progress = progress if status == 'success' and progress == 1.0: task.status = 'success' task.result_url = result_url task.gif_url = gif_url elif status == 'failed': task.status = 'failed' else: task.status = 'running' task.save() return JsonResponse({'success': True, 'message': '回调接收成功'}, status=200) except json.JSONDecodeError as e: logger.error(f'无效的请求数据: {str(e)}') return JsonResponse({'success': False, 'message': '无效的请求数据'}, status=400) except Exception as e: logger.error(f'处理回调时出错: {str(e)}') return JsonResponse({'success': False, 'message': '服务器内部错误'}, status=500) def post(self, request): return JsonResponse({'success': False, 'message': '请求方法不支持'}, status=405)