Ai_Admin/WebAdmin/api.py

509 lines
19 KiB
Python
Raw Normal View History

2024-09-23 17:56:57 +08:00
import time
import requests
import json
import uuid
import os
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.http import JsonResponse, HttpResponse
from django.views.decorators.csrf import csrf_exempt
from django.contrib.auth.decorators import login_required
from django.views.decorators.http import require_http_methods
from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
from datetime import datetime
from .create_tiktok_video import create_video
from .create_avatar_video import create_avatar_video_payload
from .create_music_video import create_music_video_payload
from .models import VideoGeneration, User, Plan, Order, WebsiteInfo
from .base import logger,synthesize_speech,upload_avatar,deduct_points
VIDEO_API_KEY = 'c4c0f516-33c0-4f5c-9d29-3b1d4e6a93c3'
VIDEO_API_URL = 'https://www.typeframes.com/api/public/v2/render'
WEBHOOK_URL = 'https://www.typeframes.cc/api/webhook/'
def generate_video_id(user_id):
timestamp = int(time.time())
video_id = f"{timestamp}_{user_id}"
return video_id
@csrf_exempt
@require_http_methods(["POST"])
def handle_generate_tiktok_video_request(request):
"""
处理生成 create-tiktok-video 视频请求的接口
:param request: HTTP 请求对象
:return: 包含生成状态或错误信息的JSON响应
"""
try:
if not request.user.is_authenticated:
return JsonResponse({'code': 401, 'message': '用户未登录'})
data = json.loads(request.body)
text = data.get('text', '')
voice_name = data.get('voice', '')
style = data.get('style', 'general')
rate = data.get('rate', 0)
media_type = data.get('mediaType', '')
ratio = data.get('ratio', '')
slug = data.get('slug', 'create-tiktok-video')
caption_preset = data.get('captionPreset', 'Wrap 1')
caption_position = data.get('captionPosition', 'bottom')
disableCaptions = data.get('disableCaptions',True)
generationPreset = data.get('generationPreset','LEONARDO') #ANIME
if not all([text, voice_name, media_type, ratio, slug, caption_preset, caption_position]):
return JsonResponse({'code': 400, 'message': '缺少必要参数'})
user = User.objects.get(id=request.user.id)
# 计算所需积分
result = deduct_points(user, "create-tiktok-video", text=text)
if result is not True:
return result # 返回积分不足的响应
video_gen = VideoGeneration.objects.create(
user=user,
text=text,
voice_name=voice_name,
style=style,
rate=rate,
media_type=media_type,
ratio=ratio,
slug=slug,
video_id=generate_video_id(user.id)
)
audio_result = synthesize_speech(text, voice_name, style, rate)
if audio_result['code'] != 200:
video_gen.status = 'failed'
video_gen.save()
return JsonResponse(audio_result)
audio_url = audio_result['audio_url']
video_gen.audio_url = audio_url
video_gen.status = 'in_progress'
video_gen.save()
video_result = create_video(
audio_url, generationPreset,media_type, text, ratio, voice_name, user.id, slug,disableCaptions, caption_preset, caption_position, video_id=video_gen.video_id
)
if video_result['code'] == 200:
video_gen.status = 'Pending'
video_gen.pid = video_result['data']['pid']
video_gen.save()
return JsonResponse(video_result)
else:
video_gen.status = 'Failed'
video_gen.save()
return JsonResponse({"code": 405, "message": "生成失败"})
except json.JSONDecodeError:
return JsonResponse({"code": 400, "message": "无效的JSON数据"})
except Exception as e:
logger.error(f"处理视频生成请求时出错: {str(e)}")
return JsonResponse({"code": 500, "message": str(e)})
@csrf_exempt
@require_http_methods(["POST"])
def handle_generate_avatar_video_request(request):
"""
处理生成 create-avatar-video 视频请求的接口
:param request: HTTP 请求对象
:return: 包含生成状态或错误信息的JSON响应
"""
try:
if not request.user.is_authenticated:
return JsonResponse({'code': 401, 'message': '用户未登录'})
data = json.loads(request.body)
text = data.get('text', '')
voice_name = data.get('voice', '')
style = data.get('style', 'general')
rate = data.get('rate', 0)
media_type = data.get('mediaType', '')
ratio = data.get('ratio', '9 / 16')
slug = 'create-avatar-video'
caption_preset = data.get('captionPreset', 'Wrap 1')
caption_position = data.get('captionPosition', 'bottom')
selected_avatar = data.get('selectedAvatar', '')
disableCaptions = data.get('disableCaptions',True)
generationPreset = data.get('generationPreset','LEONARDO') #ANIME
if not all([text, voice_name, media_type, ratio, slug, caption_preset, caption_position, selected_avatar]):
return JsonResponse({'code': 400, 'message': '缺少必要参数'})
user = User.objects.get(id=request.user.id)
# 计算所需积分
result = deduct_points(user, "create-avatar-video", text=text)
if result is not True:
return result # 返回积分不足的响应
video_gen = VideoGeneration.objects.create(
user=user,
text=text,
voice_name=voice_name,
style=style,
rate=rate,
media_type=media_type,
ratio=ratio,
slug=slug,
video_id=generate_video_id(user.id)
)
audio_result = synthesize_speech(text, voice_name, style, rate)
if audio_result['code'] != 200:
video_gen.status = 'failed'
video_gen.save()
return JsonResponse(audio_result)
audio_url = audio_result['audio_url']
video_gen.audio_url = audio_url
video_gen.status = 'in_progress'
video_gen.save()
video_result = create_avatar_video_payload(
audio_url,generationPreset, media_type, text, ratio, voice_name, user.id, slug,disableCaptions, caption_preset, caption_position, selected_avatar, video_id=video_gen.video_id
)
if video_result['code'] == 200:
video_gen.status = 'Pending'
video_gen.pid = video_result['data']['pid']
video_gen.save()
return JsonResponse(video_result)
else:
video_gen.status = 'Failed'
video_gen.save()
return JsonResponse({"code": 405, "message": "生成失败"})
except json.JSONDecodeError:
return JsonResponse({"code": 400, "message": "无效的JSON数据"})
except Exception as e:
logger.error(f"处理头像视频生成请求时出错: {str(e)}")
return JsonResponse({"code": 500, "message": str(e)})
@csrf_exempt
@require_http_methods(["POST"])
def handle_generate_music_video_request(request):
"""
处理生成 create-music-video 视频请求的接口
"""
try:
if not request.user.is_authenticated:
return JsonResponse({'code': 401, 'message': '用户未登录'})
data = json.loads(request.body)
audio_url = data.get('audioUrl', '')
media_type = data.get('mediaType', 'stockVideo')
ratio = data.get('ratio', '9 / 16')
slug = data.get('slug', 'create-music-video')
caption_preset = data.get('captionPreset', 'Wrap 1')
caption_position = data.get('captionPosition', 'bottom')
disableCaptions = data.get('disableCaptions',True)
generationPreset = data.get('generationPreset','LEONARDO') #ANIME
if not all([audio_url, media_type, ratio, slug, caption_preset, caption_position]):
return JsonResponse({'code': 400, 'message': '缺少必要参数'})
user = User.objects.get(id=request.user.id)
result = deduct_points(user, "create-music-video")
if result is not True:
return result # 返回积分不足的响应
video_gen = VideoGeneration.objects.create(
user=user,
text='',
voice_name='',
style='',
rate=0,
media_type=media_type,
ratio=ratio,
slug=slug,
video_id=generate_video_id(user.id)
)
video_result = create_music_video_payload(
audio_url=audio_url,
generationPreset=generationPreset,
media_type=media_type,
text='',
ratio=ratio,
voice_name='',
style='',
rate=0,
user_id=user.id,
slug=slug,
disableCaptions=disableCaptions,
caption_preset=caption_preset,
caption_position=caption_position,
video_id=video_gen.video_id
)
print(video_result)
if video_result['code'] == 200:
video_gen.status = 'Pending'
video_gen.pid = video_result['data']['pid']
video_gen.save()
return JsonResponse(video_result)
else:
video_gen.status = 'Failed'
video_gen.save()
return JsonResponse({"code": 405, "message": "生成失败"})
except json.JSONDecodeError:
return JsonResponse({"code": 400, "message": "无效的JSON数据"})
except Exception as e:
logger.error(f"处理音乐视频生成请求时出错: {str(e)}")
return JsonResponse({"code": 500, "message": str(e)})
@csrf_exempt
def webhook(request):
"""
处理生成视频回调的函数
"""
if request.method == 'POST':
try:
# 获取原始的 POST 数据
raw_data = request.body
logger.info("Received raw data:\n%s\n", raw_data)
# 解析 JSON 数据
data = json.loads(raw_data)
if not data:
return HttpResponse(status=400) # Bad Request
# 提取基本信息
video_url = data.get('videoUrl', '') # 编辑视频 URL
video_id = data.get('videoId', '') # 视频 ID
user_id = request.GET.get('userid', '')
if user_id and video_id:
print('回调更新状态')
try:
user = User.objects.get(id=user_id)
video_gen = VideoGeneration.objects.filter(user=user, video_id=video_id).first()
if video_gen:
if video_url:
video_gen.video_url = video_url
video_gen.status = 'completed'
else:
video_gen.status = 'failed'
video_gen.save()
# 构建输出数据
output = {
'videoUrl': video_url,
'status': video_gen.status
}
# 记录基本信息
logger.info("Updated video generation status and video URL:\n%s\n", json.dumps(output, indent=4))
else:
logger.warning(f"No video generation found for user_id: {user_id} and video_id: {video_id}")
except ObjectDoesNotExist:
logger.error(f"User with id {user_id} does not exist")
else:
logger.error("User ID or Video ID not provided in the webhook URL or callback data")
return HttpResponse(status=200)
except json.JSONDecodeError as e:
logger.error("Invalid JSON data: %s", str(e))
return HttpResponse(status=400) # Bad Request
except Exception as e:
logger.error("Error: %s", str(e))
return HttpResponse(status=500) # Internal Server Error
else:
return HttpResponse(status=405) # Method Not Allowed
@csrf_exempt
@require_http_methods(["GET"])
@login_required
def get_video_list(request):
"""
获取视频列表的函数支持分页
"""
try:
user = request.user
# 获取分页参数
page = request.GET.get('page', 1)
page_size = request.GET.get('page_size', 10)
# 检查是否有user_id参数并且当前用户是管理员
user_id = request.GET.get('user_id', None)
if user.is_staff:
# 管理员可以查看指定用户的视频列表,或查看所有用户视频列表
if user_id:
try:
specific_user = User.objects.get(id=user_id)
video_list = VideoGeneration.objects.filter(user=specific_user).order_by('-created_at')
print(f"管理员查询用户 {specific_user.username} 的视频列表")
except User.DoesNotExist:
return JsonResponse({'code': 404, 'message': '用户不存在', 'data': {}}, status=404)
else:
# 没有指定用户ID管理员查询所有用户的视频列表
video_list = VideoGeneration.objects.all().order_by('-created_at')
print("管理员查询所有用户的视频列表")
else:
# 普通用户只能查看自己的视频列表,忽略传入的 user_id 参数
video_list = VideoGeneration.objects.filter(user=user).order_by('-created_at')
print(f"普通用户 {user.username} 查询自己的视频列表")
# 使用Paginator进行分页
paginator = Paginator(video_list, page_size)
try:
videos = paginator.page(page)
except PageNotAnInteger:
videos = paginator.page(1) # 如果页码不是整数,返回第一页
except EmptyPage:
# 如果页码超出范围,返回空列表
return JsonResponse({'code': 200, 'message': '没有更多数据', 'data': {'list': [], 'page': page, 'page_size': page_size, 'total_pages': paginator.num_pages, 'total_videos': paginator.count}}, status=200)
# 组织视频数据
video_data = []
for video in videos:
video_data.append({
'id': video.id,
'text': video.text,
'voice_name': video.voice_name,
'style': video.style,
'rate': video.rate,
'media_type': video.media_type,
'ratio': video.ratio,
'audio_url': video.audio_url,
'video_url': video.video_url,
'status': video.status,
'created_at': video.created_at,
'updated_at': video.updated_at,
'pid': video.pid,
'video_id': video.video_id,
'slug': video.slug,
})
# 响应数据
response_data = {
'code': 200,
'message': '成功获取视频列表',
'data': {
'list': video_data,
'page': videos.number,
'page_size': paginator.per_page,
'total_pages': paginator.num_pages,
'total_videos': paginator.count
}
}
return JsonResponse(response_data, status=200)
except Exception as e:
logger.error(f"Error fetching video list: {str(e)}")
return JsonResponse({"code": 500, "message": f"服务器错误: {str(e)}", "data": {}}, status=500)
@require_http_methods(["GET"])
def get_plans(request):
"""
获取所有套餐的API接口包含code和message字段
"""
try:
# 查询所有套餐
plans = Plan.objects.all()
# 构建响应数据
plans_data = [
{ "id": plan.id,
"title": plan.title,
"description": plan.description,
"price": float(plan.price), # 将Decimal类型转换为float类型
"credits_per_month": plan.credits_per_month,
"created_at": plan.created_at,
"updated_at": plan.updated_at,
"is_promotional": plan.is_promotional,
"unlimited_exports": plan.unlimited_exports,
"smart_music_sync": plan.smart_music_sync
}
for plan in plans
]
# 返回包含code和message的JSON响应
return JsonResponse({"code": 200, "message": "获取成功", "data": plans_data}, status=200)
except Exception as e:
# 处理异常并返回错误信息
return JsonResponse({"code": 500, "message": f"服务器错误: {str(e)}"}, status=500)
@csrf_exempt
@require_http_methods(["GET"])
def get_orders(request):
"""
获取订单列表支持分页状态筛选和排序
"""
try:
if not request.user.is_authenticated:
return JsonResponse({'code': 401, 'message': '用户未登录'})
status = request.GET.get('status', None)
page = request.GET.get('page', 1)
page_size = request.GET.get('page_size', 10)
# 过滤订单状态
if status:
orders = Order.objects.filter(status=status).order_by('-created_at')
else:
orders = Order.objects.all().order_by('-created_at')
# 分页
paginator = Paginator(orders, page_size)
try:
orders_page = paginator.page(page)
except PageNotAnInteger:
orders_page = paginator.page(1)
except EmptyPage:
orders_page = paginator.page(paginator.num_pages)
# 构建响应数据
order_list = [
{
'order_id': order.order_id,
'user_id': order.user.id,
'username': order.user.username,
'amount': float(order.amount),
'payment_method': order.payment_method,
'status': order.status,
'created_at': order.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': order.updated_at.strftime('%Y-%m-%d %H:%M:%S'),
}
for order in orders_page
]
return JsonResponse({
'code': 200,
'message': '获取成功',
'data': order_list,
'page': orders_page.number,
'total_pages': paginator.num_pages,
'total_orders': paginator.count,
})
except Exception as e:
return JsonResponse({'code': 500, 'message': str(e)})
@csrf_exempt
@require_http_methods(["DELETE"])
def delete_order(request, order_id):
"""
删除指定订单
"""
try:
if not request.user.is_authenticated:
return JsonResponse({'code': 401, 'message': '用户未登录'})
order = Order.objects.get(order_id=order_id)
order.delete()
return JsonResponse({'code': 200, 'message': '订单删除成功'})
except Order.DoesNotExist:
return JsonResponse({'code': 404, 'message': '订单不存在'})
except Exception as e:
return JsonResponse({'code': 500, 'message': str(e)})