501 lines
19 KiB
Python
Executable File
501 lines
19 KiB
Python
Executable File
import time
|
||
|
||
import requests
|
||
import json
|
||
import uuid
|
||
import os
|
||
from django.conf import settings
|
||
from django.core.exceptions import ObjectDoesNotExist
|
||
from django.http import JsonResponse, HttpResponse
|
||
from django.views.decorators.csrf import csrf_exempt
|
||
from django.contrib.auth.decorators import login_required
|
||
from django.views.decorators.http import require_http_methods
|
||
from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
|
||
from datetime import datetime
|
||
from .create_tiktok_video import create_video
|
||
from .create_avatar_video import create_avatar_video_payload
|
||
from .create_music_video import create_music_video_payload
|
||
from .models import VideoGeneration, User, Plan, Order, WebsiteInfo
|
||
from .base import logger,synthesize_speech,upload_avatar,deduct_points
|
||
|
||
VIDEO_API_KEY = 'c4c0f516-33c0-4f5c-9d29-3b1d4e6a93c3'
|
||
VIDEO_API_URL = 'https://www.typeframes.com/api/public/v2/render'
|
||
WEBHOOK_URL = 'https://www.typeframes.cc/api/webhook/'
|
||
|
||
def generate_video_id(user_id):
|
||
timestamp = int(time.time())
|
||
video_id = f"{timestamp}_{user_id}"
|
||
return video_id
|
||
|
||
@csrf_exempt
|
||
@require_http_methods(["POST"])
|
||
def handle_generate_tiktok_video_request(request):
|
||
"""
|
||
处理生成 create-tiktok-video 视频请求的接口
|
||
:param request: HTTP 请求对象
|
||
:return: 包含生成状态或错误信息的JSON响应
|
||
"""
|
||
try:
|
||
if not request.user.is_authenticated:
|
||
return JsonResponse({'code': 401, 'message': '用户未登录'})
|
||
|
||
data = json.loads(request.body)
|
||
text = data.get('text', '')
|
||
voice_name = data.get('voice', '')
|
||
style = data.get('style', 'general')
|
||
rate = data.get('rate', 0)
|
||
media_type = data.get('mediaType', '')
|
||
ratio = data.get('ratio', '')
|
||
slug = data.get('slug', 'create-tiktok-video')
|
||
caption_preset = data.get('captionPreset', 'Wrap 1')
|
||
caption_position = data.get('captionPosition', 'bottom')
|
||
disableCaptions = data.get('disableCaptions',True)
|
||
generationPreset = data.get('generationPreset','LEONARDO') #ANIME
|
||
if not all([text, voice_name, media_type, ratio, slug, caption_preset, caption_position]):
|
||
return JsonResponse({'code': 400, 'message': '缺少必要参数'})
|
||
|
||
user = User.objects.get(id=request.user.id)
|
||
|
||
# 计算所需积分
|
||
result = deduct_points(user, "create-tiktok-video", text=text)
|
||
if result is not True:
|
||
return result # 返回积分不足的响应
|
||
|
||
video_gen = VideoGeneration.objects.create(
|
||
user=user,
|
||
text=text,
|
||
voice_name=voice_name,
|
||
style=style,
|
||
rate=rate,
|
||
media_type=media_type,
|
||
ratio=ratio,
|
||
slug=slug,
|
||
video_id=generate_video_id(user.id)
|
||
)
|
||
|
||
audio_result = synthesize_speech(text, voice_name, style, rate)
|
||
if audio_result['code'] != 200:
|
||
video_gen.status = 'failed'
|
||
video_gen.save()
|
||
return JsonResponse(audio_result)
|
||
|
||
audio_url = audio_result['audio_url']
|
||
video_gen.audio_url = audio_url
|
||
video_gen.status = 'in_progress'
|
||
video_gen.save()
|
||
|
||
video_result = create_video(
|
||
audio_url, generationPreset,media_type, text, ratio, voice_name, user.id, slug,disableCaptions, caption_preset, caption_position, video_id=video_gen.video_id
|
||
)
|
||
if video_result['code'] == 200:
|
||
video_gen.status = 'Pending'
|
||
video_gen.pid = video_result['data']['pid']
|
||
video_gen.save()
|
||
return JsonResponse(video_result)
|
||
else:
|
||
video_gen.status = 'Failed'
|
||
video_gen.save()
|
||
return JsonResponse({"code": 405, "message": "生成失败"})
|
||
|
||
except json.JSONDecodeError:
|
||
return JsonResponse({"code": 400, "message": "无效的JSON数据"})
|
||
except Exception as e:
|
||
logger.error(f"处理视频生成请求时出错: {str(e)}")
|
||
return JsonResponse({"code": 500, "message": str(e)})
|
||
|
||
@csrf_exempt
|
||
@require_http_methods(["POST"])
|
||
def handle_generate_avatar_video_request(request):
|
||
"""
|
||
处理生成 create-avatar-video 视频请求的接口
|
||
:param request: HTTP 请求对象
|
||
:return: 包含生成状态或错误信息的JSON响应
|
||
"""
|
||
try:
|
||
if not request.user.is_authenticated:
|
||
return JsonResponse({'code': 401, 'message': '用户未登录'})
|
||
|
||
data = json.loads(request.body)
|
||
text = data.get('text', '')
|
||
voice_name = data.get('voice', '')
|
||
style = data.get('style', 'general')
|
||
rate = data.get('rate', 0)
|
||
media_type = data.get('mediaType', '')
|
||
ratio = data.get('ratio', '9 / 16')
|
||
slug = 'create-avatar-video'
|
||
caption_preset = data.get('captionPreset', 'Wrap 1')
|
||
caption_position = data.get('captionPosition', 'bottom')
|
||
selected_avatar = data.get('selectedAvatar', '')
|
||
disableCaptions = data.get('disableCaptions',True)
|
||
generationPreset = data.get('generationPreset','LEONARDO') #ANIME
|
||
if not all([text, voice_name, media_type, ratio, slug, caption_preset, caption_position, selected_avatar]):
|
||
return JsonResponse({'code': 400, 'message': '缺少必要参数'})
|
||
|
||
user = User.objects.get(id=request.user.id)
|
||
|
||
# 计算所需积分
|
||
result = deduct_points(user, "create-avatar-video", text=text)
|
||
if result is not True:
|
||
return result # 返回积分不足的响应
|
||
|
||
video_gen = VideoGeneration.objects.create(
|
||
user=user,
|
||
text=text,
|
||
voice_name=voice_name,
|
||
style=style,
|
||
rate=rate,
|
||
media_type=media_type,
|
||
ratio=ratio,
|
||
slug=slug,
|
||
video_id=generate_video_id(user.id)
|
||
)
|
||
|
||
audio_result = synthesize_speech(text, voice_name, style, rate)
|
||
if audio_result['code'] != 200:
|
||
video_gen.status = 'failed'
|
||
video_gen.save()
|
||
return JsonResponse(audio_result)
|
||
|
||
audio_url = audio_result['audio_url']
|
||
video_gen.audio_url = audio_url
|
||
video_gen.status = 'in_progress'
|
||
video_gen.save()
|
||
|
||
video_result = create_avatar_video_payload(
|
||
audio_url,generationPreset, media_type, text, ratio, voice_name, user.id, slug,disableCaptions, caption_preset, caption_position, selected_avatar, video_id=video_gen.video_id
|
||
)
|
||
if video_result['code'] == 200:
|
||
video_gen.status = 'Pending'
|
||
video_gen.pid = video_result['data']['pid']
|
||
video_gen.save()
|
||
return JsonResponse(video_result)
|
||
else:
|
||
video_gen.status = 'Failed'
|
||
video_gen.save()
|
||
return JsonResponse({"code": 405, "message": "生成失败"})
|
||
|
||
except json.JSONDecodeError:
|
||
return JsonResponse({"code": 400, "message": "无效的JSON数据"})
|
||
except Exception as e:
|
||
logger.error(f"处理头像视频生成请求时出错: {str(e)}")
|
||
return JsonResponse({"code": 500, "message": str(e)})
|
||
|
||
@csrf_exempt
|
||
@require_http_methods(["POST"])
|
||
def handle_generate_music_video_request(request):
|
||
"""
|
||
处理生成 create-music-video 视频请求的接口
|
||
"""
|
||
try:
|
||
if not request.user.is_authenticated:
|
||
return JsonResponse({'code': 401, 'message': '用户未登录'})
|
||
|
||
data = json.loads(request.body)
|
||
audio_url = data.get('audioUrl', '')
|
||
media_type = data.get('mediaType', 'stockVideo')
|
||
ratio = data.get('ratio', '9 / 16')
|
||
slug = data.get('slug', 'create-music-video')
|
||
caption_preset = data.get('captionPreset', 'Wrap 1')
|
||
caption_position = data.get('captionPosition', 'bottom')
|
||
disableCaptions = data.get('disableCaptions',True)
|
||
generationPreset = data.get('generationPreset','LEONARDO') #ANIME
|
||
if not all([audio_url, media_type, ratio, slug, caption_preset, caption_position]):
|
||
return JsonResponse({'code': 400, 'message': '缺少必要参数'})
|
||
|
||
user = User.objects.get(id=request.user.id)
|
||
|
||
result = deduct_points(user, "create-music-video")
|
||
if result is not True:
|
||
return result # 返回积分不足的响应
|
||
|
||
video_gen = VideoGeneration.objects.create(
|
||
user=user,
|
||
text='',
|
||
voice_name='',
|
||
style='',
|
||
rate=0,
|
||
media_type=media_type,
|
||
ratio=ratio,
|
||
slug=slug,
|
||
video_id=generate_video_id(user.id)
|
||
)
|
||
|
||
video_result = create_music_video_payload(
|
||
audio_url=audio_url,
|
||
generationPreset=generationPreset,
|
||
media_type=media_type,
|
||
text='',
|
||
ratio=ratio,
|
||
voice_name='',
|
||
style='',
|
||
rate=0,
|
||
user_id=user.id,
|
||
slug=slug,
|
||
disableCaptions=disableCaptions,
|
||
caption_preset=caption_preset,
|
||
caption_position=caption_position,
|
||
video_id=video_gen.video_id
|
||
)
|
||
print(video_result)
|
||
if video_result['code'] == 200:
|
||
video_gen.status = 'Pending'
|
||
video_gen.pid = video_result['data']['pid']
|
||
video_gen.save()
|
||
return JsonResponse(video_result)
|
||
else:
|
||
video_gen.status = 'Failed'
|
||
video_gen.save()
|
||
return JsonResponse({"code": 405, "message": "生成失败"})
|
||
except json.JSONDecodeError:
|
||
return JsonResponse({"code": 400, "message": "无效的JSON数据"})
|
||
except Exception as e:
|
||
logger.error(f"处理音乐视频生成请求时出错: {str(e)}")
|
||
return JsonResponse({"code": 500, "message": str(e)})
|
||
|
||
|
||
|
||
@csrf_exempt
|
||
def webhook(request):
|
||
"""
|
||
处理生成视频回调的函数
|
||
"""
|
||
if request.method == 'POST':
|
||
try:
|
||
# 获取原始的 POST 数据
|
||
raw_data = request.body
|
||
logger.info("Received raw data:\n%s\n", raw_data)
|
||
|
||
# 解析 JSON 数据
|
||
data = json.loads(raw_data)
|
||
if not data:
|
||
return HttpResponse(status=400) # Bad Request
|
||
|
||
# 记录完整的接收到的数据
|
||
logger.info("Received complete callback data:\n%s\n", json.dumps(data, indent=4))
|
||
|
||
# 提取基本信息
|
||
video_url = data.get('videoUrl', '') # 编辑视频 URL
|
||
video_id = data.get('videoId', '') # 视频 ID
|
||
user_id = request.GET.get('userid', '')
|
||
|
||
if user_id and video_id:
|
||
print('回调更新状态')
|
||
try:
|
||
user = User.objects.get(id=user_id)
|
||
video_gen = VideoGeneration.objects.filter(user=user, video_id=video_id).first()
|
||
if video_gen:
|
||
if video_url:
|
||
video_gen.video_url = video_url
|
||
video_gen.status = 'completed'
|
||
else:
|
||
video_gen.status = 'failed'
|
||
video_gen.save()
|
||
|
||
# 构建输出数据
|
||
output = {
|
||
'videoUrl': video_url,
|
||
'status': video_gen.status
|
||
}
|
||
|
||
# 记录基本信息
|
||
logger.info("Updated video generation status and video URL:\n%s\n", json.dumps(output, indent=4))
|
||
else:
|
||
logger.warning(f"No video generation found for user_id: {user_id} and video_id: {video_id}")
|
||
except ObjectDoesNotExist:
|
||
logger.error(f"User with id {user_id} does not exist")
|
||
else:
|
||
logger.error("User ID or Video ID not provided in the webhook URL or callback data")
|
||
|
||
return HttpResponse(status=200)
|
||
except json.JSONDecodeError as e:
|
||
logger.error("Invalid JSON data: %s", str(e))
|
||
return HttpResponse(status=400) # Bad Request
|
||
except Exception as e:
|
||
logger.error("Error: %s", str(e))
|
||
return HttpResponse(status=500) # Internal Server Error
|
||
else:
|
||
return HttpResponse(status=405) # Method Not Allowed
|
||
|
||
|
||
#获取视频列表
|
||
@csrf_exempt
|
||
@require_http_methods(["GET"])
|
||
@login_required
|
||
def get_video_list(request):
|
||
"""
|
||
获取视频列表的函数,支持分页
|
||
"""
|
||
try:
|
||
user = request.user
|
||
|
||
# 检查是否有用户ID参数
|
||
user_id = request.GET.get('user_id', None)
|
||
|
||
# 获取分页参数
|
||
page = request.GET.get('page', 1)
|
||
page_size = request.GET.get('page_size', 10)
|
||
|
||
# 如果有user_id参数,并且当前用户是管理员,则获取该用户的视频列表
|
||
if user_id and user.is_staff:
|
||
try:
|
||
specific_user = User.objects.get(id=user_id)
|
||
video_list = VideoGeneration.objects.filter(user=specific_user).order_by('-created_at')
|
||
except ObjectDoesNotExist:
|
||
return JsonResponse({'code': 404, 'message': '用户不存在'}, status=404)
|
||
# 如果没有user_id参数或者当前用户不是管理员,则获取当前用户的视频列表
|
||
elif not user.is_staff:
|
||
video_list = VideoGeneration.objects.filter(user=user).order_by('-created_at')
|
||
else:
|
||
# 管理员返回所有记录
|
||
video_list = VideoGeneration.objects.all().order_by('-created_at')
|
||
|
||
# 使用Paginator进行分页
|
||
paginator = Paginator(video_list, page_size)
|
||
|
||
try:
|
||
videos = paginator.page(page)
|
||
except PageNotAnInteger:
|
||
videos = paginator.page(1)
|
||
except EmptyPage:
|
||
videos = paginator.page(paginator.num_pages)
|
||
|
||
video_data = []
|
||
for video in videos:
|
||
video_data.append({
|
||
'id': video.id,
|
||
'text': video.text,
|
||
'voice_name': video.voice_name,
|
||
'style': video.style,
|
||
'rate': video.rate,
|
||
'media_type': video.media_type,
|
||
'ratio': video.ratio,
|
||
'audio_url': video.audio_url,
|
||
'video_url': video.video_url,
|
||
'status': video.status,
|
||
'created_at': video.created_at,
|
||
'updated_at': video.updated_at,
|
||
'pid':video.pid,
|
||
'video_id':video.video_id,
|
||
'slug': video.slug,
|
||
})
|
||
|
||
response_data = {
|
||
'code': 200,
|
||
'data': {"list":video_data,'page': videos.number,
|
||
'page_size': paginator.per_page,
|
||
'total_pages': paginator.num_pages,
|
||
'total_videos': paginator.count}
|
||
}
|
||
|
||
return JsonResponse(response_data)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error fetching video list: {str(e)}")
|
||
return JsonResponse({"code": 500, "message": str(e)}, status=500)
|
||
|
||
@require_http_methods(["GET"])
|
||
def get_plans(request):
|
||
"""
|
||
获取所有套餐的API接口,包含code和message字段
|
||
"""
|
||
try:
|
||
# 查询所有套餐
|
||
plans = Plan.objects.all()
|
||
|
||
# 构建响应数据
|
||
plans_data = [
|
||
{ "id": plan.id,
|
||
"title": plan.title,
|
||
"description": plan.description,
|
||
"price": float(plan.price), # 将Decimal类型转换为float类型
|
||
"credits_per_month": plan.credits_per_month,
|
||
"created_at": plan.created_at,
|
||
"updated_at": plan.updated_at,
|
||
"is_promotional": plan.is_promotional,
|
||
"unlimited_exports": plan.unlimited_exports,
|
||
"smart_music_sync": plan.smart_music_sync
|
||
}
|
||
for plan in plans
|
||
]
|
||
|
||
# 返回包含code和message的JSON响应
|
||
return JsonResponse({"code": 200, "message": "获取成功", "data": plans_data}, status=200)
|
||
except Exception as e:
|
||
# 处理异常并返回错误信息
|
||
return JsonResponse({"code": 500, "message": f"服务器错误: {str(e)}"}, status=500)
|
||
|
||
|
||
@csrf_exempt
|
||
@require_http_methods(["GET"])
|
||
def get_orders(request):
|
||
"""
|
||
获取订单列表,支持分页、状态筛选和排序
|
||
"""
|
||
try:
|
||
if not request.user.is_authenticated:
|
||
return JsonResponse({'code': 401, 'message': '用户未登录'})
|
||
status = request.GET.get('status', None)
|
||
page = request.GET.get('page', 1)
|
||
page_size = request.GET.get('page_size', 10)
|
||
|
||
# 过滤订单状态
|
||
if status:
|
||
orders = Order.objects.filter(status=status).order_by('-created_at')
|
||
else:
|
||
orders = Order.objects.all().order_by('-created_at')
|
||
|
||
# 分页
|
||
paginator = Paginator(orders, page_size)
|
||
try:
|
||
orders_page = paginator.page(page)
|
||
except PageNotAnInteger:
|
||
orders_page = paginator.page(1)
|
||
except EmptyPage:
|
||
orders_page = paginator.page(paginator.num_pages)
|
||
|
||
# 构建响应数据
|
||
order_list = [
|
||
{
|
||
'order_id': order.order_id,
|
||
'user_id': order.user.id,
|
||
'username': order.user.username,
|
||
'amount': float(order.amount),
|
||
'payment_method': order.payment_method,
|
||
'status': order.status,
|
||
'created_at': order.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'updated_at': order.updated_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||
}
|
||
for order in orders_page
|
||
]
|
||
|
||
return JsonResponse({
|
||
'code': 200,
|
||
'message': '获取成功',
|
||
'data': order_list,
|
||
'page': orders_page.number,
|
||
'total_pages': paginator.num_pages,
|
||
'total_orders': paginator.count,
|
||
})
|
||
except Exception as e:
|
||
return JsonResponse({'code': 500, 'message': str(e)})
|
||
|
||
|
||
@csrf_exempt
|
||
@require_http_methods(["DELETE"])
|
||
def delete_order(request, order_id):
|
||
"""
|
||
删除指定订单
|
||
"""
|
||
try:
|
||
if not request.user.is_authenticated:
|
||
return JsonResponse({'code': 401, 'message': '用户未登录'})
|
||
order = Order.objects.get(order_id=order_id)
|
||
order.delete()
|
||
return JsonResponse({'code': 200, 'message': '订单删除成功'})
|
||
except Order.DoesNotExist:
|
||
return JsonResponse({'code': 404, 'message': '订单不存在'})
|
||
except Exception as e:
|
||
return JsonResponse({'code': 500, 'message': str(e)})
|
||
|
||
|