ai_admin/WebAdmin/api.py
2024-09-20 04:29:09 +00:00

501 lines
19 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import requests
import json
import uuid
import os
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.http import JsonResponse, HttpResponse
from django.views.decorators.csrf import csrf_exempt
from django.contrib.auth.decorators import login_required
from django.views.decorators.http import require_http_methods
from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
from datetime import datetime
from .create_tiktok_video import create_video
from .create_avatar_video import create_avatar_video_payload
from .create_music_video import create_music_video_payload
from .models import VideoGeneration, User, Plan, Order, WebsiteInfo
from .base import logger,synthesize_speech,upload_avatar,deduct_points
VIDEO_API_KEY = 'c4c0f516-33c0-4f5c-9d29-3b1d4e6a93c3'
VIDEO_API_URL = 'https://www.typeframes.com/api/public/v2/render'
WEBHOOK_URL = 'https://www.typeframes.cc/api/webhook/'
def generate_video_id(user_id):
timestamp = int(time.time())
video_id = f"{timestamp}_{user_id}"
return video_id
@csrf_exempt
@require_http_methods(["POST"])
def handle_generate_tiktok_video_request(request):
"""
处理生成 create-tiktok-video 视频请求的接口
:param request: HTTP 请求对象
:return: 包含生成状态或错误信息的JSON响应
"""
try:
if not request.user.is_authenticated:
return JsonResponse({'code': 401, 'message': '用户未登录'})
data = json.loads(request.body)
text = data.get('text', '')
voice_name = data.get('voice', '')
style = data.get('style', 'general')
rate = data.get('rate', 0)
media_type = data.get('mediaType', '')
ratio = data.get('ratio', '')
slug = data.get('slug', 'create-tiktok-video')
caption_preset = data.get('captionPreset', 'Wrap 1')
caption_position = data.get('captionPosition', 'bottom')
disableCaptions = data.get('disableCaptions',True)
generationPreset = data.get('generationPreset','LEONARDO') #ANIME
if not all([text, voice_name, media_type, ratio, slug, caption_preset, caption_position]):
return JsonResponse({'code': 400, 'message': '缺少必要参数'})
user = User.objects.get(id=request.user.id)
# 计算所需积分
result = deduct_points(user, "create-tiktok-video", text=text)
if result is not True:
return result # 返回积分不足的响应
video_gen = VideoGeneration.objects.create(
user=user,
text=text,
voice_name=voice_name,
style=style,
rate=rate,
media_type=media_type,
ratio=ratio,
slug=slug,
video_id=generate_video_id(user.id)
)
audio_result = synthesize_speech(text, voice_name, style, rate)
if audio_result['code'] != 200:
video_gen.status = 'failed'
video_gen.save()
return JsonResponse(audio_result)
audio_url = audio_result['audio_url']
video_gen.audio_url = audio_url
video_gen.status = 'in_progress'
video_gen.save()
video_result = create_video(
audio_url, generationPreset,media_type, text, ratio, voice_name, user.id, slug,disableCaptions, caption_preset, caption_position, video_id=video_gen.video_id
)
if video_result['code'] == 200:
video_gen.status = 'Pending'
video_gen.pid = video_result['data']['pid']
video_gen.save()
return JsonResponse(video_result)
else:
video_gen.status = 'Failed'
video_gen.save()
return JsonResponse({"code": 405, "message": "生成失败"})
except json.JSONDecodeError:
return JsonResponse({"code": 400, "message": "无效的JSON数据"})
except Exception as e:
logger.error(f"处理视频生成请求时出错: {str(e)}")
return JsonResponse({"code": 500, "message": str(e)})
@csrf_exempt
@require_http_methods(["POST"])
def handle_generate_avatar_video_request(request):
"""
处理生成 create-avatar-video 视频请求的接口
:param request: HTTP 请求对象
:return: 包含生成状态或错误信息的JSON响应
"""
try:
if not request.user.is_authenticated:
return JsonResponse({'code': 401, 'message': '用户未登录'})
data = json.loads(request.body)
text = data.get('text', '')
voice_name = data.get('voice', '')
style = data.get('style', 'general')
rate = data.get('rate', 0)
media_type = data.get('mediaType', '')
ratio = data.get('ratio', '9 / 16')
slug = 'create-avatar-video'
caption_preset = data.get('captionPreset', 'Wrap 1')
caption_position = data.get('captionPosition', 'bottom')
selected_avatar = data.get('selectedAvatar', '')
disableCaptions = data.get('disableCaptions',True)
generationPreset = data.get('generationPreset','LEONARDO') #ANIME
if not all([text, voice_name, media_type, ratio, slug, caption_preset, caption_position, selected_avatar]):
return JsonResponse({'code': 400, 'message': '缺少必要参数'})
user = User.objects.get(id=request.user.id)
# 计算所需积分
result = deduct_points(user, "create-avatar-video", text=text)
if result is not True:
return result # 返回积分不足的响应
video_gen = VideoGeneration.objects.create(
user=user,
text=text,
voice_name=voice_name,
style=style,
rate=rate,
media_type=media_type,
ratio=ratio,
slug=slug,
video_id=generate_video_id(user.id)
)
audio_result = synthesize_speech(text, voice_name, style, rate)
if audio_result['code'] != 200:
video_gen.status = 'failed'
video_gen.save()
return JsonResponse(audio_result)
audio_url = audio_result['audio_url']
video_gen.audio_url = audio_url
video_gen.status = 'in_progress'
video_gen.save()
video_result = create_avatar_video_payload(
audio_url,generationPreset, media_type, text, ratio, voice_name, user.id, slug,disableCaptions, caption_preset, caption_position, selected_avatar, video_id=video_gen.video_id
)
if video_result['code'] == 200:
video_gen.status = 'Pending'
video_gen.pid = video_result['data']['pid']
video_gen.save()
return JsonResponse(video_result)
else:
video_gen.status = 'Failed'
video_gen.save()
return JsonResponse({"code": 405, "message": "生成失败"})
except json.JSONDecodeError:
return JsonResponse({"code": 400, "message": "无效的JSON数据"})
except Exception as e:
logger.error(f"处理头像视频生成请求时出错: {str(e)}")
return JsonResponse({"code": 500, "message": str(e)})
@csrf_exempt
@require_http_methods(["POST"])
def handle_generate_music_video_request(request):
"""
处理生成 create-music-video 视频请求的接口
"""
try:
if not request.user.is_authenticated:
return JsonResponse({'code': 401, 'message': '用户未登录'})
data = json.loads(request.body)
audio_url = data.get('audioUrl', '')
media_type = data.get('mediaType', 'stockVideo')
ratio = data.get('ratio', '9 / 16')
slug = data.get('slug', 'create-music-video')
caption_preset = data.get('captionPreset', 'Wrap 1')
caption_position = data.get('captionPosition', 'bottom')
disableCaptions = data.get('disableCaptions',True)
generationPreset = data.get('generationPreset','LEONARDO') #ANIME
if not all([audio_url, media_type, ratio, slug, caption_preset, caption_position]):
return JsonResponse({'code': 400, 'message': '缺少必要参数'})
user = User.objects.get(id=request.user.id)
result = deduct_points(user, "create-music-video")
if result is not True:
return result # 返回积分不足的响应
video_gen = VideoGeneration.objects.create(
user=user,
text='',
voice_name='',
style='',
rate=0,
media_type=media_type,
ratio=ratio,
slug=slug,
video_id=generate_video_id(user.id)
)
video_result = create_music_video_payload(
audio_url=audio_url,
generationPreset=generationPreset,
media_type=media_type,
text='',
ratio=ratio,
voice_name='',
style='',
rate=0,
user_id=user.id,
slug=slug,
disableCaptions=disableCaptions,
caption_preset=caption_preset,
caption_position=caption_position,
video_id=video_gen.video_id
)
print(video_result)
if video_result['code'] == 200:
video_gen.status = 'Pending'
video_gen.pid = video_result['data']['pid']
video_gen.save()
return JsonResponse(video_result)
else:
video_gen.status = 'Failed'
video_gen.save()
return JsonResponse({"code": 405, "message": "生成失败"})
except json.JSONDecodeError:
return JsonResponse({"code": 400, "message": "无效的JSON数据"})
except Exception as e:
logger.error(f"处理音乐视频生成请求时出错: {str(e)}")
return JsonResponse({"code": 500, "message": str(e)})
@csrf_exempt
def webhook(request):
"""
处理生成视频回调的函数
"""
if request.method == 'POST':
try:
# 获取原始的 POST 数据
raw_data = request.body
logger.info("Received raw data:\n%s\n", raw_data)
# 解析 JSON 数据
data = json.loads(raw_data)
if not data:
return HttpResponse(status=400) # Bad Request
# 记录完整的接收到的数据
logger.info("Received complete callback data:\n%s\n", json.dumps(data, indent=4))
# 提取基本信息
video_url = data.get('videoUrl', '') # 编辑视频 URL
video_id = data.get('videoId', '') # 视频 ID
user_id = request.GET.get('userid', '')
if user_id and video_id:
print('回调更新状态')
try:
user = User.objects.get(id=user_id)
video_gen = VideoGeneration.objects.filter(user=user, video_id=video_id).first()
if video_gen:
if video_url:
video_gen.video_url = video_url
video_gen.status = 'completed'
else:
video_gen.status = 'failed'
video_gen.save()
# 构建输出数据
output = {
'videoUrl': video_url,
'status': video_gen.status
}
# 记录基本信息
logger.info("Updated video generation status and video URL:\n%s\n", json.dumps(output, indent=4))
else:
logger.warning(f"No video generation found for user_id: {user_id} and video_id: {video_id}")
except ObjectDoesNotExist:
logger.error(f"User with id {user_id} does not exist")
else:
logger.error("User ID or Video ID not provided in the webhook URL or callback data")
return HttpResponse(status=200)
except json.JSONDecodeError as e:
logger.error("Invalid JSON data: %s", str(e))
return HttpResponse(status=400) # Bad Request
except Exception as e:
logger.error("Error: %s", str(e))
return HttpResponse(status=500) # Internal Server Error
else:
return HttpResponse(status=405) # Method Not Allowed
#获取视频列表
@csrf_exempt
@require_http_methods(["GET"])
@login_required
def get_video_list(request):
"""
获取视频列表的函数,支持分页
"""
try:
user = request.user
# 检查是否有用户ID参数
user_id = request.GET.get('user_id', None)
# 获取分页参数
page = request.GET.get('page', 1)
page_size = request.GET.get('page_size', 10)
# 如果有user_id参数并且当前用户是管理员则获取该用户的视频列表
if user_id and user.is_staff:
try:
specific_user = User.objects.get(id=user_id)
video_list = VideoGeneration.objects.filter(user=specific_user).order_by('-created_at')
except ObjectDoesNotExist:
return JsonResponse({'code': 404, 'message': '用户不存在'}, status=404)
# 如果没有user_id参数或者当前用户不是管理员则获取当前用户的视频列表
elif not user.is_staff:
video_list = VideoGeneration.objects.filter(user=user).order_by('-created_at')
else:
# 管理员返回所有记录
video_list = VideoGeneration.objects.all().order_by('-created_at')
# 使用Paginator进行分页
paginator = Paginator(video_list, page_size)
try:
videos = paginator.page(page)
except PageNotAnInteger:
videos = paginator.page(1)
except EmptyPage:
videos = paginator.page(paginator.num_pages)
video_data = []
for video in videos:
video_data.append({
'id': video.id,
'text': video.text,
'voice_name': video.voice_name,
'style': video.style,
'rate': video.rate,
'media_type': video.media_type,
'ratio': video.ratio,
'audio_url': video.audio_url,
'video_url': video.video_url,
'status': video.status,
'created_at': video.created_at,
'updated_at': video.updated_at,
'pid':video.pid,
'video_id':video.video_id,
'slug': video.slug,
})
response_data = {
'code': 200,
'data': {"list":video_data,'page': videos.number,
'page_size': paginator.per_page,
'total_pages': paginator.num_pages,
'total_videos': paginator.count}
}
return JsonResponse(response_data)
except Exception as e:
logger.error(f"Error fetching video list: {str(e)}")
return JsonResponse({"code": 500, "message": str(e)}, status=500)
@require_http_methods(["GET"])
def get_plans(request):
"""
获取所有套餐的API接口包含code和message字段
"""
try:
# 查询所有套餐
plans = Plan.objects.all()
# 构建响应数据
plans_data = [
{ "id": plan.id,
"title": plan.title,
"description": plan.description,
"price": float(plan.price), # 将Decimal类型转换为float类型
"credits_per_month": plan.credits_per_month,
"created_at": plan.created_at,
"updated_at": plan.updated_at,
"is_promotional": plan.is_promotional,
"unlimited_exports": plan.unlimited_exports,
"smart_music_sync": plan.smart_music_sync
}
for plan in plans
]
# 返回包含code和message的JSON响应
return JsonResponse({"code": 200, "message": "获取成功", "data": plans_data}, status=200)
except Exception as e:
# 处理异常并返回错误信息
return JsonResponse({"code": 500, "message": f"服务器错误: {str(e)}"}, status=500)
@csrf_exempt
@require_http_methods(["GET"])
def get_orders(request):
"""
获取订单列表,支持分页、状态筛选和排序
"""
try:
if not request.user.is_authenticated:
return JsonResponse({'code': 401, 'message': '用户未登录'})
status = request.GET.get('status', None)
page = request.GET.get('page', 1)
page_size = request.GET.get('page_size', 10)
# 过滤订单状态
if status:
orders = Order.objects.filter(status=status).order_by('-created_at')
else:
orders = Order.objects.all().order_by('-created_at')
# 分页
paginator = Paginator(orders, page_size)
try:
orders_page = paginator.page(page)
except PageNotAnInteger:
orders_page = paginator.page(1)
except EmptyPage:
orders_page = paginator.page(paginator.num_pages)
# 构建响应数据
order_list = [
{
'order_id': order.order_id,
'user_id': order.user.id,
'username': order.user.username,
'amount': float(order.amount),
'payment_method': order.payment_method,
'status': order.status,
'created_at': order.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': order.updated_at.strftime('%Y-%m-%d %H:%M:%S'),
}
for order in orders_page
]
return JsonResponse({
'code': 200,
'message': '获取成功',
'data': order_list,
'page': orders_page.number,
'total_pages': paginator.num_pages,
'total_orders': paginator.count,
})
except Exception as e:
return JsonResponse({'code': 500, 'message': str(e)})
@csrf_exempt
@require_http_methods(["DELETE"])
def delete_order(request, order_id):
"""
删除指定订单
"""
try:
if not request.user.is_authenticated:
return JsonResponse({'code': 401, 'message': '用户未登录'})
order = Order.objects.get(order_id=order_id)
order.delete()
return JsonResponse({'code': 200, 'message': '订单删除成功'})
except Order.DoesNotExist:
return JsonResponse({'code': 404, 'message': '订单不存在'})
except Exception as e:
return JsonResponse({'code': 500, 'message': str(e)})