Ai_Admin/MyApi/views.py

1260 lines
52 KiB
Python
Raw Normal View History

2024-06-05 05:25:27 +08:00
import os
import tempfile
import time
import re
import http.client
import hashlib
import urllib
import random
from django.core.files.storage import FileSystemStorage
from django.core.paginator import Paginator, PageNotAnInteger, EmptyPage
from django.db.models import Q
from django.forms import model_to_dict
from django.http import JsonResponse
import requests
import json
from datetime import datetime, timedelta
import jwt # 用于生成Token
from django.shortcuts import get_object_or_404
from django.utils.decorators import method_decorator
from django.views import View
import qiniu
from django.views.decorators.csrf import csrf_exempt
from website import settings
from .WXBizDataCrypt import WXBizDataCrypt # 引入微信解密库
from django.contrib.auth.hashers import make_password
from .models import Administrator, FriendRequest, User, VideoExtractionRecord, ApiCallLog, Copywriting, MembershipType, \
RedemptionCard, IDCounter, VideoTask, AssetLibrary
from urllib.parse import quote
from Crypto.Cipher import AES
import base64
from django.utils.dateparse import parse_datetime
# 你的Django设置中的小程序配置
from MyApi.sparkAPI import SparkUtil
from .API_Log import get_logger
log_file = '小程序日志.log'
logger = get_logger('小程序日志', log_file, when='midnight', backup_count=7)
APPID = 'wxb5a1857369e5809e'
SECRET = 'cbdb58343de23967d59fb90c7143a6b0'
from django.utils import timezone
import shortuuid
from threading import Thread
def is_valid_text(text):
text = urllib.parse.unquote(text)
# 定义正则表达式,匹配除了空白字符外的任意字符
regex = r'\S'
# 使用正则表达式进行匹配
return re.search(regex, text) is not None
def contains_sql_or_xss(input_string):
"""
检测输入字符串是否包含SQL注入或XSS攻击的内容
"""
input_string = urllib.parse.unquote(input_string)
# SQL注入检测检查是否包含SQL关键字或语句
sql_keywords = ['SELECT', 'UPDATE', 'DELETE', 'INSERT', 'DROP', 'ALTER', 'TRUNCATE', 'EXEC', 'UNION', 'FROM', 'WHERE', 'JOIN' ,'ldap' ,'~' , '/' ,'\/']
for keyword in sql_keywords:
if keyword.lower() in input_string.lower():
return True
# XSS攻击检测检查是否包含HTML标签和JavaScript代码
# 使用正则表达式检测包含HTML标签和JavaScript代码的情况
pattern = re.compile(r'<[^>]+>|javascript:', re.IGNORECASE)
if pattern.search(input_string):
return True
return False
def decrypt_param(x, t):
# 创建AES解密器对象
key = b'qw5w6SFE2D1jmxyd'
iv = b'345GDFED433223DF'
# 检查 x 和 t 是否为空
if not x or not t:
return False
try:
cipher = AES.new(key, AES.MODE_CBC, iv)
# 将密文进行Base64解码
ciphertext_bytes = base64.b64decode(x)
# 使用解密器解密密文
plaintext_bytes = cipher.decrypt(ciphertext_bytes)
# 删除填充字符
plaintext = plaintext_bytes.rstrip(b'\0').decode('utf-8')
# 比较解密后的明文和 t
if plaintext.rstrip('\x03') == t.rstrip('\x03'):
return True
else:
return False
except Exception as e:
print(f"解密过程出现错误: {e}")
return False
#用于判断次数剩余并增加或者减少额度
def update_usage_count(openid, increment, function_type):
try:
user = User.objects.get(openid=openid)
# 判断会员是否过期如果过期则更新会员字段为False
if user.member_end_time is not None and user.member_end_time < timezone.now().timestamp():
user.is_member = False
user.membership_type = None
user.save()
logger.info(f'会员 {user.id} (openid: {openid}) 已过期,已更新为非会员')
# 定义需要扣除次数的功能 GenerateVideoView ExtendVideoView GenerateImageVideoView
functions_require_coins_for_members = ['GenerateVideoView','ExtendVideoView','GenerateImageVideoView'] # 替换为实际功能名
# 增加或减少用户使用次数
if increment > 0:
user.coins += increment
logger.info(f'用户 {user.id} (openid: {openid}) 使用次数增加 {increment}')
user.save()
return {'success': True}
elif increment < 0:
if user.is_member:
if function_type in functions_require_coins_for_members:
if user.coins + increment >= 0:
user.coins += increment
logger.info(f'会员用户 {user.id} (openid: {openid}) 使用功能 {function_type} 使用次数减少 {abs(increment)} 次 剩余 {user.coins}')
user.save()
return {'success': True}
else:
logger.warning(f'会员用户 {user.id} (openid: {openid}) 使用功能 {function_type} 使用次数不足以减少 {abs(increment)}')
return {'success': False, 'message': '次数不足,请充值'}
else:
logger.info(f'会员用户 {user.id} (openid: {openid}) 使用功能 {function_type} 不需要扣除次数')
return {'success': True}
else:
if user.coins + increment >= 0:
user.coins += increment
logger.info(f'用户 {user.id} (openid: {openid}) 使用功能 {function_type} 使用次数减少 {abs(increment)} 次 剩余 {user.coins}')
user.save()
return {'success': True}
else:
logger.warning(f'用户 {user.id} (openid: {openid}) 使用功能 {function_type} 使用次数不足以减少 {abs(increment)}')
return {'success': False, 'message': '次数不足,请充值'}
return {'success': True}
except User.DoesNotExist:
logger.error(f'用户 (openid: {openid}) 不存在')
return {'success': False, 'message': '用户不存在'}
#用于卡密兑换功能
def update_user_membership(openid, duration_days):
current_time = int(time.time()) # 获取当前时间戳
try:
user = User.objects.get(openid=openid)
# 更新会员状态和时间
user.is_member = True
if user.member_start_time is None: # 如果用户是第一次开会员
user.member_start_time = current_time
logger.info(f'第一次开会员:{current_time} -- {current_time + duration_days * 24 * 3600}')
elif user.member_end_time is None or user.member_end_time < current_time: # 如果会员已经过期
logger.info(f'过期:{current_time} -- {current_time + duration_days * 24 * 3600}')
user.member_start_time = current_time
# 计算会员到期时间
if user.member_end_time is None or user.member_end_time < current_time:
logger.info(f'第一次开会员2{user.member_end_time} -- {current_time + duration_days * 24 * 3600}')
user.member_end_time = current_time + duration_days * 24 * 3600 # 将天数转换为秒
else: # 如果会员尚未过期,则在现有会员结束时间上添加续费的天数
logger.info(f'续费:{user.member_end_time} -- {user.member_end_time + duration_days * 24 * 3600}')
user.member_end_time += duration_days * 24 * 3600
user.save()
return True
except User.DoesNotExist:
# 处理用户不存在的情况
return False
#日志记录
def log_api_call(source, openid, nickname, wxid, wechat_alias, api_name, is_successful, remarks=""):
"""
插入API调用日志记录的函数
"""
ApiCallLog.objects.create(
source=source,
openid=openid,
nickname=nickname,
wxid=wxid,
wechat_alias=wechat_alias,
api_name=api_name,
is_successful=is_successful,
remarks=remarks
)
# log_api_call(
# source='weixin',
# openid='用户的openid',
# nickname='用户昵称',
# wxid='用户微信ID',
# wechat_alias='用户微信号',
# api_name='调用的API名称',
# is_successful=True,
# remarks='备注信息'
# )
def generate_short_id():
# 使用IDCounter模型获取下一个ID
next_id = IDCounter.get_next_id()
return next_id
def miniapp_login(request):
data = json.loads(request.body.decode('utf-8'))
code = data.get('code')
openid = data.get('openid','')
iv = data.get('iv')
launchOptions = data.get('Options', {})
logger.info(f' 登录 {launchOptions}')
print(launchOptions)
if not code:
return JsonResponse({'error': '缺少code参数'}, status=400)
# 使用code获取session_key和openid
url = f'https://api.weixin.qq.com/sns/jscode2session?appid={APPID}&secret={SECRET}&js_code={code}&grant_type=authorization_code'
res = requests.get(url)
result = res.json()
logger.info(f'登录接口返回:{result}')
if 'openid' in result:
openid = result['openid']
session_key = result['session_key']
# 解密encryptedData获取用户信息
# pc = WXBizDataCrypt(APPID, session_key)
# decrypted_data = pc.decrypt(encrypted_data, iv)
# token = generate_token_for_user(openid)
# print(decrypted_data)
if openid:
user, created = User.objects.get_or_create(openid=openid)
# 如果是新用户,则设置其它属性 defaultDailyFreeParseNum totalParseNum
logger.info(f'登录接口1 openid{openid} created {created}')
if created:
logger.info(f'登录接口2 openid{openid} created {created}')
uuid = generate_short_id()
user.wxid = launchOptions.get('query', {}).get('wxid', '') #微信id
user.wechat_number = launchOptions.get('query', {}).get('alias', '') #微信号
user.nickname = uuid
user.scene = launchOptions.get('scene', '') #来源
user.inviter_nickname = launchOptions.get('inviter_nickname', '') #邀请人
user.save()
user = User.objects.get(openid=openid)
user_info = model_to_dict(user)
logger.info(f'新用户openid {openid} --uuid{uuid} 添加成功 {user_info}')
return JsonResponse({'token': session_key, 'userInfo': user_info,})
else:
user = User.objects.get(openid=openid)
user_info = model_to_dict(user)
logger.info(f'老用户 {openid} 进行访问 {user_info}')
return JsonResponse({'token': session_key, 'userInfo': user_info})
else:
logger.error(f'没有openid参数')
return JsonResponse({'error': '没有openid参数'}, status=500)
else:
logger.error(f'微信登录失败 data{result}')
return JsonResponse({'error': '微信登录失败'}, status=500)
#获取用户信息
def userinfo(request):
data = json.loads(request.body)
openid = data.get('openid','')
if contains_sql_or_xss(openid):
logger.error(f'{openid} 用户触发威胁字符检测: {data}')
return JsonResponse({'status': 'error', 'message': '你的国际行为已被记录,我们将保留追责权利'})
if not openid:
return JsonResponse({"status": "erroe",'body': '缺少openid参数'})
else:
increment = -0
function_type = 'luserinfo'
result = update_usage_count(openid, increment, function_type)
membership_types = MembershipType.objects.all().values('type', 'title', 'description', 'price', 'coins','is_quota')
user = User.objects.get(openid=openid)
if user:
user_info = model_to_dict(user)
try:
user_info['member_start_time'] = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(user.member_start_time))
user_info['member_end_time'] = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(user.member_end_time))
except:
user_info['member_start_time'] = user.member_start_time
user_info['member_end_time'] = user.member_end_time
membership_type_list = list(membership_types)
return JsonResponse({"status": "success", 'userInfo': user_info, 'membershipTypeList': membership_type_list})
else:
return JsonResponse({"status": "erroe", 'userInfo': None})
def generate_token_for_user(openid):
# 定义密钥,你应该将其保存在配置文件或环境变量中
secret = 'oigjfs**00--++2s'
# 定义Token的过期时间这里设置为24小时
expiration = datetime.utcnow() + timedelta(days=1)
# 创建Token
token = jwt.encode({
'openid': openid,
'exp': expiration
}, secret, algorithm='HS256')
# PyJWT版本可能会影响返回值类型确保返回字符串
return token.decode('utf-8') if isinstance(token, bytes) else token
# 你的视频提取接口、userId和secretKey
API_URL = "https://h.aaaapp.cn/single_post"
USER_ID = "22F1BE410945DD40F0569BFA197A85C0"
SECRET_KEY = "7801ffd6cceaff579812214a93b948b4"
#提取视频
def video_extraction(request):
# 解析POST请求体
data = json.loads(request.body)
x = request.headers.get('X', '')
t = request.headers.get('T', '')
if decrypt_param(x, t):
try:
logger.info(f'视频提取data: {data} ')
video_url = data.get('url')
uuid = data.get('uuid')
openid = data.get('openid')
increment = - 1
function_type = 'video_extraction'
result = update_usage_count(openid, increment, function_type)
if result['success']:
encoded_video_url = quote(video_url, safe='')
# 这里简化了token验证逻辑实际应用中应根据项目需求进行相应的安全校验
if not uuid or not video_url:
logger.error(f'{encoded_video_url}视频提取失败--未登录-uuid{uuid}--openid{openid}')
return JsonResponse({'error': '未登录'})
# 调用视频提取API
response = requests.post(API_URL, data={
'url': video_url,
'userId': USER_ID,
'secretKey': SECRET_KEY
})
if response.status_code == 200:
logger.info(f'{encoded_video_url}视频提取成功--uuid{uuid}--openid{openid}')
return JsonResponse(response.json())
else:
logger.error(f'提取链接无效{response.text}')
return JsonResponse({'error': '视频提取失败'}, status=response.status_code)
else:
print(f'使用次数更新失败: {result}')
return JsonResponse(result)
except json.JSONDecodeError:
logger.error(f'视频提取出错')
return JsonResponse({'error': '无效的请求'})
else:
return JsonResponse({'error': '非法参数'}, status=400)
#储存好友请求数据
def add_friend_request(request):
if request.method == 'POST':
data = json.loads(request.body)
# 解析申请时间字符串为datetime对象
fromusername = data.get('fromusername')
friend_request = FriendRequest.objects.filter(fromusername=fromusername).first()
if friend_request:
print('已存在')
return JsonResponse({"status": "success", "message": "已存在数据"}, status=200)
else:
data['time'] = datetime.strptime(data['time'], '%Y-%m-%d %H:%M:%S')
friend_request = FriendRequest.objects.create(**data)
friend_request.save()
print('入库成功')
return JsonResponse({"status": "success", "message": "添加好友请求已保存"}, status=200)
else:
return JsonResponse({"status": "error", "message": "只支持POST请求"}, status=405)
#返回使用次数
def total(request):
data = json.loads(request.body)
uuid = data.get('uuid')
try:
user = User.objects.get(nickname=uuid)
total_count = user.usage_count
except User.DoesNotExist:
total_count = 0
return JsonResponse({'total_num': total_count})
#改写文案接口
def rewrite_text(request):
if request.method == 'POST':
try:
data = json.loads(request.body)
logger.info(f'文案改写开始{data}')
text = data.get('text')
uuid = data.get('uuid')
openid = data.get('openid','')
type = data.get('type', '')
wxid = data.get('wxid', '')
alias = data.get('alias', '')
x = request.headers.get('X', '')
t = request.headers.get('T', '')
if decrypt_param(x, t):
# 确保text参数存在
if not text:
logger.error(f'缺少必要的文案参数--uuid{uuid} --openid{openid}')
return JsonResponse({'error': '缺少必要的文案参数'}, status=400)
query = f'{text}。【改写这篇文案,要求吸引人,引起人们共鸣,如果有错别字或者不通顺语句请改正,字数需要和原文差不多】'
# 返回改写后的文案
#增加提取记录
if openid == '' or openid == None:
logger.error(f'改写文案失败 未登录-uuid{uuid}--openid{openid}')
return JsonResponse({'error': '未登录,请重试'})
else:
rest = SparkUtil().query_and_get_data(query)
increment = -1
function_type = 'video_extraction'
result = update_usage_count(openid, increment, function_type)
if result['success']:
# 提取成功用户使用次数+1
user = User.objects.get(openid=openid)
user.usage_count += 1
user.save()
logger.info(f'改写文案成功-{rest}-uuid{uuid}--openid{openid}')
return JsonResponse({
'status': 'success',
# 'original_text': text,
'rewritten_text': rest
})
else:
logger.error(f'ai改写文案次数不足或不是会员-{rest}-uuid{uuid}--openid{openid}')
return JsonResponse(result)
else:
logger.error(
f'改写文案: 【非法参数】\n headers={request.headers}\n data={data} \n ---------------------------------')
return JsonResponse({'error': '非法参数'})
except json.JSONDecodeError:
logger.error(f'ai改写文案----无效的请求格式')
return JsonResponse({'error': '无效的请求格式'})
else:
logger.error(f'ai改写文案----仅支持POST请求')
return JsonResponse({'error': '仅支持POST请求'})
def get_copywriting_list_with_filters(request):
"""
返回带有搜索分类和排序功能的分页文案列表只包括已审核的文案
"""
# 获取请求参数
page = request.GET.get('page', 1)
page_size = request.GET.get('page_size', 10)
search_query = request.GET.get('search', '')
search_query = quote(search_query)
sort_by = request.GET.get('sort_by', 'default') # 默认按综合排序
category = request.GET.get('category', '')
# 构建查询条件
query_conditions = Q(text_content__icontains=search_query) & Q(is_approved=True) # 只选择已审核的文案
# 如果有分类条件,则进一步过滤
if category:
query_conditions &= Q(tag__icontains=category)
# 根据查询条件查询文案记录
if sort_by == 'default':
copywriting_list = Copywriting.objects.filter(query_conditions).order_by('-id', '-added_time')
elif sort_by == 'popularity':
# 按热度排序
copywriting_list = Copywriting.objects.filter(query_conditions).order_by('-popularity', '-added_time')
elif sort_by == 'time':
# 按时间排序
copywriting_list = Copywriting.objects.filter(query_conditions).order_by('-added_time')
else:
# 默认排序:先按热度,然后按时间
copywriting_list = Copywriting.objects.filter(query_conditions).order_by('-id', '-added_time')
# 使用Django的Paginator进行分页
paginator = Paginator(copywriting_list, page_size)
try:
copywritings = paginator.page(page)
except PageNotAnInteger:
copywritings = paginator.page(1)
except EmptyPage:
copywritings = paginator.page(paginator.num_pages)
# 获取当前页的数据
current_page_data = list(copywritings.object_list.values('id', 'text_content', 'tag', 'popularity'))
# 构造响应数据
data = {
'status': 'success',
'data': current_page_data,
'current_page': copywritings.number,
'total_pages': paginator.num_pages,
}
return JsonResponse(data)
#卡密兑换功能
def redeem_card(request):
if request.method == 'POST':
try:
data = json.loads(request.body)
x = request.headers.get('X', '')
t = request.headers.get('T', '')
if decrypt_param(x, t):
card_code = data.get('card_code')
openid = data.get('openid')
uuid = data.get('uuid')
# 根据卡密查询兑换卡信息
logger.info(f'卡密兑换获取到数据: {data}')
try:
card = RedemptionCard.objects.get(code=card_code)
except RedemptionCard.DoesNotExist:
logger.error(f'无效的卡密: {card_code}')
return JsonResponse({'error': '无效的卡密'})
# 检查卡密是否已经被使用
if card.is_used:
logger.error(f'该卡密已被使用: {card_code}')
return JsonResponse({'error': '该卡密已被使用'})
# 检查卡密是否过期
if card.expiry_date < timezone.now():
logger.error(f'该卡密已过期: {card_code}')
return JsonResponse({'error': '该卡密已过期'})
# 根据卡类型执行相应操作
if card.card_type == 'member':
# 如果是会员卡,调用更新会员函数
success = update_user_membership(openid, card.validity_period)
if success:
card.used_by_openid = openid
card.is_used = True
card.used_by_nickname = uuid
card.save()
logger.info(f' {openid} 成功领取: {card.validity_period}天会员 {card_code}')
return JsonResponse({"status": "success", 'message': f'成功领取{card.validity_period}天会员'})
else:
return JsonResponse({'error': '卡密激活失败,请联系管理员'})
elif card.card_type == 'quota':
# 如果是额度卡,调用更新使用次数函数
success = update_usage_count(openid, increment=+card.validity_period)
if success.get('success'):
card.used_by_openid = openid
card.is_used = True
card.used_by_nickname = uuid
card.save()
logger.info(f' {openid} 成功领取:{card.validity_period}次额度 {card_code}')
return JsonResponse({"status": "success", 'message': f'成功领取{card.validity_period}次额度'})
else:
logger.error(f' 卡密激活失败,请联系管理员 {card_code}')
return JsonResponse({'error': '卡密激活失败,请联系管理员'})
else:
logger.error(f' 卡密激活失败 {card_code}')
return JsonResponse({'error': '卡密激活失败'})
else:
return JsonResponse({'error': '非法参数'}, status=400)
except Exception as e:
logger.error(f' 处理请求时出错')
return JsonResponse({'error': '处理请求时出错:' + str(e)})
else:
logger.error(f' 卡密兑换接口 -- 仅支持POST请求')
return JsonResponse({'error': '仅支持POST请求'})
#邀请用户接口
def reward_invitation(request):
if request.method == 'POST':
try:
data = json.loads(request.body)
x = request.headers.get('X', '')
t = request.headers.get('T', '')
if decrypt_param(x, t):
logger.info(f'邀请激励数据: {data}')
openid = data.get('openid', '')
uuid = data.get('uuid', '')
if openid and uuid:
# 查找uuid对应的用户确保是新用户
try:
invited_user = User.objects.get(nickname=uuid)
logger.info(f'用户 {uuid} 已经存在,不能重复邀请')
return JsonResponse({'success': False, 'message': '用户已存在,不能重复邀请'})
except User.DoesNotExist:
# 被邀请用户不存在,说明是新用户,可以发放奖励
try:
inviter_user = User.objects.get(openid=openid)
except User.DoesNotExist:
logger.error(f'邀请人用户 {openid} 不存在')
return JsonResponse({'success': False, 'error': '邀请人用户不存在'})
# 更新邀请人信息
inviter_user.invitees_count += 1
inviter_user.coins += 10
inviter_user.save()
# 创建被邀请用户并设置邀请人信息
new_user = User(nickname=uuid, inviter_nickname=inviter_user.nickname)
new_user.save()
logger.info(f'用户 {openid} 邀请新用户 {uuid} 成功,奖励已发放')
return JsonResponse({'success': True, 'message': '奖励发放成功'})
else:
logger.error(f'用户邀请奖励接口 缺少必要参数')
return JsonResponse({'success': False, 'error': '缺少必要参数'})
else:
return JsonResponse({'success': False, 'error': '非法参数'})
except User.DoesNotExist:
logger.error(f'用户不存在')
return JsonResponse({'success': False, 'error': '用户不存在'})
except Exception as e:
logger.error(f'发放奖励时出错: {str(e)}')
return JsonResponse({'success': False, 'error': '服务器内部错误'})
else:
logger.error(f'用户邀请奖励接口 请求方法不支持')
return JsonResponse({'success': False, 'error': '请求方法不支持'})
def upload_to_qiniu(file_path, key):
access_key = settings.QINIU_ACCESS_KEY
secret_key = settings.QINIU_SECRET_KEY
bucket_name = settings.QINIU_BUCKET_NAME
bucket_domain = settings.QINIU_DOMAIN
q = qiniu.Auth(access_key, secret_key)
token = q.upload_token(bucket_name, key)
ret, info = qiniu.put_file(token, key, file_path)
if info.status_code == 200:
encoded_url = f"https://{bucket_domain}/{quote(key)}"
return encoded_url
else:
raise Exception("上传到七牛云失败")
class UploadImageView(View):
@method_decorator(csrf_exempt)
def post(self, request):
image_file = request.FILES.get('image_file')
if not image_file:
return JsonResponse({"message": "没有提供图像文件"}, status=400)
if not image_file.content_type.startswith('image/'):
return JsonResponse({"message": "文件格式不支持,只支持图片文件"}, status=400)
try:
# 将文件保存到本地文件夹
fs = FileSystemStorage()
filename = fs.save(image_file.name, image_file)
file_url = fs.url(filename)
return JsonResponse({"message": "文件上传成功", "file_url": file_url}, status=200)
except Exception as e:
return JsonResponse({"message": "内部服务器错误", "error": str(e)}, status=500)
appid = '20240601002067404' # 填写你的appid
secretKey = '6pRZ9HCSqGuMqzLO55hB' # 填写你的密钥
def translate(text, from_lang='auto', to_lang='en'):
myurl = '/api/trans/vip/translate'
salt = random.randint(32768, 65536)
sign = appid + text + str(salt) + secretKey
sign = hashlib.md5(sign.encode()).hexdigest()
myurl = (myurl + '?appid=' + appid + '&q=' + urllib.parse.quote(text) +
'&from=' + from_lang + '&to=' + to_lang + '&salt=' + str(salt) +
'&sign=' + sign)
try:
httpClient = http.client.HTTPConnection('api.fanyi.baidu.com')
httpClient.request('GET', myurl)
# response是HTTPResponse对象
response = httpClient.getresponse()
result_all = response.read().decode("utf-8")
result = json.loads(result_all)
return result['trans_result'][0]['dst']
except Exception as e:
return str(e)
finally:
if httpClient:
httpClient.close()
class UserVideoTaskListView(View):
def get(self, request, user_id):
x = request.headers.get('X', '')
t = request.headers.get('T', '')
if decrypt_param(x, t):
try:
video_tasks = VideoTask.objects.filter(nickname=user_id).order_by('-created_at')
page = request.GET.get('page', 1) # 获取页码参数,默认为第一页
page_size = request.GET.get('page_size', 10) # 每页显示的条数默认为10
paginator = Paginator(video_tasks, page_size)
try:
tasks_page = paginator.page(page)
except PageNotAnInteger:
tasks_page = paginator.page(1)
except EmptyPage:
tasks_page = paginator.page(paginator.num_pages)
task_list = []
for task in tasks_page:
task_list.append({
'openid': task.openid,
'nickname': task.nickname,
'task_id': task.task_id,
'task_type': task.task_type,
'status': task.status,
'created_at': task.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': task.updated_at.strftime('%Y-%m-%d %H:%M:%S'),
'text_prompt': task.description_zh,
'width': task.width,
'height': task.height,
'motion_score': task.motion_score,
'style': task.style,
'seconds': task.seconds,
'image_url': task.image_url,
'result_url': task.result_url,
'gif_url': task.gif_url,
'qiniu_url': task.qiniu_url,
'progress': task.progress,
'error_message': task.error_message,
})
response_data = {
"video_tasks": task_list,
"total_tasks": paginator.count,
"total_pages": paginator.num_pages,
"current_page": tasks_page.number,
}
return JsonResponse(response_data, status=200)
except User.DoesNotExist:
return JsonResponse({"message": "用户不存在"}, status=404)
else:
return JsonResponse({"message": "非法参数"},status=400)
class DeleteVideoTaskView(View):
def delete(self, request, user_id, task_id):
x = request.headers.get('X', '')
t = request.headers.get('T', '')
if decrypt_param(x, t):
try:
task = VideoTask.objects.get(nickname=user_id, task_id=task_id)
task.delete()
return JsonResponse({"message": "删除成功"}, status=200)
except User.DoesNotExist:
return JsonResponse({"message": "用户不存在"}, status=404)
except VideoTask.DoesNotExist:
return JsonResponse({"message": "任务不存在"}, status=404)
except Exception as e:
return JsonResponse({"message": f"删除失败: {str(e)}"}, status=500)
else:
return JsonResponse({"message": "非法参数"},status=400)
class AssetLibraryView(View):
def get(self, request):
x = request.headers.get('X', '')
t = request.headers.get('T', '')
if decrypt_param(x, t):
search_query = request.GET.get('search', '')
category = request.GET.get('category', '')
page = request.GET.get('page', 1)
page_size = request.GET.get('page_size', 10)
# 过滤条件
assets = AssetLibrary.objects.filter(is_approved=True).order_by('-generated_at')
if search_query:
assets = assets.filter(description__icontains=search_query)
if category:
assets = assets.filter(category=category)
paginator = Paginator(assets, page_size)
try:
assets_page = paginator.page(page)
except PageNotAnInteger:
assets_page = paginator.page(1)
except EmptyPage:
assets_page = paginator.page(paginator.num_pages)
asset_list = []
for asset in assets_page:
asset_list.append({
"asset_id": asset.id,
'qiniu_url': asset.qiniu_url,
'gif_url': asset.gif_url,
'original_url': asset.original_url,
'duration': asset.duration,
'category': asset.category,
'description': asset.description,
'generated_at': asset.generated_at.strftime('%Y-%m-%d %H:%M:%S'),
'download_count': asset.download_count,
'is_approved': asset.is_approved,
})
response_data = {
'total_count': paginator.count,
'total_pages': paginator.num_pages,
'current_page': assets_page.number,
'assets': asset_list
}
return JsonResponse(response_data, status=200)
else:
return JsonResponse({"message": "非法参数"},status=400)
#统计素材库下载 保存视频 复制链接
def increment_download_count(request):
if request.method == 'POST':
x = request.headers.get('X', '')
t = request.headers.get('T', '')
if decrypt_param(x, t):
data = json.loads(request.body)
asset_id = data.get('asset_id','')
openid = data.get('openid')
increment = -1
function_type = 'increment_download_count'
if not asset_id:
result = update_usage_count(openid, increment, function_type)
if result['success']:
return JsonResponse({'success': True, 'message': 'ok'}, status=200)
else:
logger.error(f' 视频号提取-复制文案-次数不足 {result}')
return JsonResponse(result)
else:
asset = get_object_or_404(AssetLibrary, id=asset_id)
asset.download_count += 1
asset.save()
result = update_usage_count(openid, increment, function_type)
if result['success']:
return JsonResponse({'success': True, 'message': 'ok'}, status=200)
else:
logger.error(f' 次数不足 {result}')
return JsonResponse(result)
else:
return JsonResponse({'error': '非法参数'}, status=400)
return JsonResponse({'success': False, 'message': 'no'}, status=405)
headers = {
"accept": "application/json",
"content-type": "application/json",
"Authorization": "12e76710fad2047db8c0cc6b25987e2a2" # 替换为你的真实授权密钥
}
callback_url = "http://5109tb4417.qicp.vip/myapp/video-generation-callback/"
class GenerateVideoView(View):
def post(self, request):
data = json.loads(request.body)
x = request.headers.get('X', '')
t = request.headers.get('T', '')
if decrypt_param(x, t):
user_id = data.get('user_id')
text_prompt = data.get('text_prompt')
width = data.get('width')
height = data.get('height')
motion_score = data.get('motion_score')
style = data.get('style', '')
description_en = translate(text_prompt, 'auto', 'en')
# 添加风格到描述末尾
if style:
description_en += f".style: {style}"
try:
user = User.objects.get(nickname=user_id)
except User.DoesNotExist:
return JsonResponse({"message": "用户不存在"}, status=404)
# 判断用户是否有使用次数
increment = -10 # 假设每次生成视频需要消耗一次使用次数
function_type = 'GenerateVideoView'
result = update_usage_count(user.openid, increment, function_type)
if not result['success']:
return JsonResponse(result)
payload = {
"text_prompt": description_en,
"width": width,
"height": height,
"motion": motion_score,
"seed": 0,
"upscale": True,
"interpolate": True,
"callback_url": callback_url
}
try:
response = requests.post("https://api.aivideoapi.com/runway/generate/text", json=payload, headers=headers)
response.raise_for_status() # 如果响应状态码不是 200抛出 HTTPError 异常
except requests.exceptions.RequestException as e:
logger.error(f"请求视频生成接口时出错: {e}")
return JsonResponse({"message": "任务创建失败"}, status=500)
response_data = response.json()
if 'uuid' in response_data:
task_id = response_data['uuid']
VideoTask.objects.create(
openid=user.openid,
nickname=user.nickname,
task_id=task_id,
task_type='文生视频',
status='running',
text_prompt=description_en,
description_zh=text_prompt,
width=width,
height=height,
motion_score=motion_score,
style=style,
seconds=4
)
return JsonResponse({"message": "任务创建成功", "task_id": task_id}, status=200)
else:
logger.error(f"任务创建失败,响应: {response_data}")
return JsonResponse({"message": "任务创建失败"}, status=500)
else:
return JsonResponse({"message": "非法参数"}, status=400)
class ExtendVideoView(View):
def post(self, request):
data = json.loads(request.body)
x = request.headers.get('X', '')
t = request.headers.get('T', '')
if decrypt_param(x, t):
user_id = data.get('user_id')
task_id = data.get('task_id')
motion_score = data.get('motion_score', 5)
try:
# 从数据库中获取任务信息
task = VideoTask.objects.get(task_id=task_id)
user = User.objects.get(nickname=user_id)
except VideoTask.DoesNotExist:
return JsonResponse({"message": "任务不存在"}, status=404)
except User.DoesNotExist:
return JsonResponse({"message": "用户不存在"}, status=404)
# 判断用户是否有使用次数
increment = -10 # 假设每次生成视频需要消耗一次使用次数
function_type = 'ExtendVideoView'
result = update_usage_count(user.openid, increment, function_type)
if not result['success']:
return JsonResponse(result)
# 计算当前总时长
current_seconds = int(task.seconds)
# 确保扩展后的总时长不超过16秒
additional_seconds = 4 # 每次增加4秒
if current_seconds + additional_seconds > 16:
return JsonResponse({"message": "扩展后的总时长不能超过16秒"}, status=400)
new_total_seconds = current_seconds + additional_seconds
payload = {
"uuid": task_id,
"motion": motion_score,
"seed": 0,
"upscale": True,
"interpolate": True,
"callback_url": callback_url
}
try:
response = requests.post("https://api.aivideoapi.com/runway/extend", json=payload, headers=headers)
response.raise_for_status() # 如果响应状态码不是 200抛出 HTTPError 异常
except requests.exceptions.RequestException as e:
logger.error(f"请求视频扩展接口时出错: {e}")
return JsonResponse({"message": "任务创建失败"}, status=500)
response_data = response.json()
if 'uuid' in response_data:
new_task_id = response_data['uuid']
VideoTask.objects.create(
openid=task.openid,
nickname=task.nickname,
task_id=new_task_id,
task_type='文生视频',
status='running',
text_prompt=task.text_prompt,
description_zh=task.description_zh,
width=task.width,
height=task.height,
motion_score=motion_score,
style=task.style,
seconds=new_total_seconds,
)
return JsonResponse({"message": "任务创建成功", "task_id": new_task_id}, status=200)
else:
logger.error(f"任务创建失败,响应: {response_data}")
return JsonResponse({"message": "任务创建失败"}, status=500)
else:
return JsonResponse({"message": "非法参数"}, status=400)
class GenerateImageVideoView(View):
def post(self, request):
data = json.loads(request.body)
user_id = data.get('user_id')
text_prompt = data.get('text_prompt')
image_url = data.get('image_url')
motion_score = data.get('motion_score', 5) # M 1-10默认值为5
description_en = translate(text_prompt, 'auto', 'en')
try:
user = User.objects.get(nickname=user_id)
except User.DoesNotExist:
return JsonResponse({"message": "用户不存在"}, status=404)
# 判断用户是否有使用次数
increment = -10 # 假设每次生成视频需要消耗一次使用次数
function_type = 'GenerateImageVideoView'
result = update_usage_count(user.openid, increment, function_type)
if not result['success']:
return JsonResponse(result)
payload = {
"text_prompt": description_en,
"img_prompt": image_url,
"motion": motion_score,
"seed": 0,
"upscale": True,
"interpolate": True,
"callback_url": callback_url
}
try:
response = requests.post("https://api.aivideoapi.com/runway/generate/imageDescription", json=payload, headers=headers)
response.raise_for_status() # 如果响应状态码不是 200抛出 HTTPError 异常
except requests.exceptions.RequestException as e:
logger.error(f"请求视频生成接口时出错: {e}")
return JsonResponse({"message": "任务创建失败"}, status=500)
response_data = response.json()
if 'uuid' in response_data:
task_id = response_data['uuid']
VideoTask.objects.create(
openid=user.openid,
nickname=user.nickname,
task_id=task_id,
task_type='图生视频',
status='running',
text_prompt=description_en,
description_zh=text_prompt,
image_url=image_url,
motion_score=motion_score,
seconds=4
)
return JsonResponse({"message": "任务创建成功", "task_id": task_id}, status=200)
else:
logger.error(f"任务创建失败,响应: {response_data}")
return JsonResponse({"message": "任务创建失败"}, status=500)
class TaskStatusView(View):
def get(self, request, task_id):
if not task_id:
return JsonResponse({"message": "缺少任务ID"}, status=400)
try:
# 从数据库中获取任务
task = VideoTask.objects.get(task_id=task_id)
except VideoTask.DoesNotExist:
return JsonResponse({"message": "任务不存在"}, status=404)
url = f"https://api.aivideoapi.com/status?uuid={task_id}"
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
response_data = response.json()
print(response_data)
except requests.exceptions.RequestException as e:
logger.error(f"查询任务状态时出错: {e}")
return JsonResponse({"message": "查询任务状态失败", "error": str(e)}, status=500)
progress = float(response_data.get('progress', 0)) if response_data.get('progress') is not None else 0.0
task.progress = progress
if task.progress >= 1.0:
if response_data.get('status', task.status) == 'success':
task.status = 'success'
task.result_url = response_data.get('url', task.result_url)
task.gif_url = response_data.get('gif_url', task.gif_url)
AssetLibrary.objects.create(
original_url=task.result_url,
duration=task.seconds,
category=task.style,
description=task.text_prompt,
description_zh=task.description_zh,
generated_by=task.nickname,
gif_url=task.gif_url,
is_approved=True # 假设任务成功的素材自动审核通过
)
elif response_data.get('status', task.status) == 'failed':
task.status = 'failed'
elif progress==0.0:
task.status = 'pending'
else:
task.status = 'running'
# 保存更新后的任务
task.save()
return JsonResponse({
"task_id": task.task_id,
"status": task.status,
"progress": task.progress,
"result_url": task.result_url,
"gif_url": task.gif_url
}, status=200)
class UpdatePendingTasksView(View):
def get(self, request):
pending_tasks = VideoTask.objects.filter(status__in=['running', 'pending'])
task_updates = []
# 打印当前执行时间和任务数量
current_time = timezone.now().strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"当前执行时间:{current_time}, 共 {len(pending_tasks)} 个任务")
for task in pending_tasks:
task_info = {
"task_id": task.task_id,
"status": "正在执行",
"message": ""
}
logger.info(f"{task_info}")
try:
# 调用 QueryTaskStatusView 接口
response = requests.get(f"http://127.0.0.1:55556/myapp/task_status/{task.task_id}/")
if response.status_code == 200:
task_data = response.json()
print(task_data)
task_info = {
"task_id": task.task_id,
"status": "执行成功",
"message": ""
}
logger.info(f"{task_info}")
task_updates.append(task_info)
else:
task_info["status"] = "执行失败"
task_info["message"] = f"请求失败,状态码:{response.status_code}"
logger.error(f"{task_info}")
task_updates.append(task_info)
except Exception as e:
task_info["status"] = "执行失败"
task_info["message"] = f"错误信息:{str(e)}"
logger.error(f"{task_info}")
task_updates.append(task_info)
return JsonResponse({"updated_tasks": task_updates}, status=200)
#每日奖励次数
class DailyCoinsBonusView(View):
def get(self, request):
daily_bonus = int(request.GET.get('daily_bonus', 5)) # 获取每日赠送的金币数量默认为5
try:
# 获取所有用户
users = User.objects.all()
users_count = users.count()
# 为所有用户添加每日金币奖励
for user in users:
user.coins += daily_bonus
user.save()
logger.info(f"总用户数量:{users_count}, 每个用户已赠送 {daily_bonus} 个金币")
return JsonResponse({"message": "每日金币赠送成功"}, status=200)
except Exception as e:
logger.error(f"每日赠送金币时出错: {str(e)}")
return JsonResponse({"message": "每日赠送金币时出错"}, status=500)
class VideoGenerationCallbackView(View):
def get(self, request):
try:
print(request)
print(request.body)
data = request.GET.dict()
logger.info(f'收到视频生成结果回调: {data}')
print(f'收到视频生成结果回调: {data}') # 直接打印到控制台
task_id = data.get('uuid')
status = data.get('status')
progress = float(data.get('progress', 0))
result_url = data.get('url')
gif_url = data.get('gif_url')
if not task_id:
return JsonResponse({'success': False, 'message': '缺少任务ID'}, status=400)
try:
task = VideoTask.objects.get(task_id=task_id)
except VideoTask.DoesNotExist:
return JsonResponse({'success': False, 'message': '任务不存在'}, status=404)
task.progress = progress
if status == 'success' and progress == 1.0:
task.status = 'success'
task.result_url = result_url
task.gif_url = gif_url
elif status == 'failed':
task.status = 'failed'
else:
task.status = 'running'
task.save()
return JsonResponse({'success': True, 'message': '回调接收成功'}, status=200)
except json.JSONDecodeError as e:
logger.error(f'无效的请求数据: {str(e)}')
return JsonResponse({'success': False, 'message': '无效的请求数据'}, status=400)
except Exception as e:
logger.error(f'处理回调时出错: {str(e)}')
return JsonResponse({'success': False, 'message': '服务器内部错误'}, status=500)
def post(self, request):
return JsonResponse({'success': False, 'message': '请求方法不支持'}, status=405)