240 lines
8.9 KiB
Python
240 lines
8.9 KiB
Python
![]() |
import os
|
|||
|
import uuid
|
|||
|
import json
|
|||
|
import requests
|
|||
|
from django.conf import settings
|
|||
|
from django.http import JsonResponse
|
|||
|
from django.core.exceptions import ObjectDoesNotExist
|
|||
|
from django.views.decorators.csrf import csrf_exempt
|
|||
|
from django.views.decorators.http import require_http_methods
|
|||
|
from azure.storage.blob import BlobServiceClient, BlobClient
|
|||
|
|
|||
|
from .models import VideoGeneration, User, TransactionHistory
|
|||
|
from .utils import get_logger
|
|||
|
|
|||
|
# 配置日志记录
|
|||
|
log_file = 'API日志.log'
|
|||
|
logger = get_logger('API日志', log_file, when='midnight', backup_count=7)
|
|||
|
|
|||
|
# API 配置
|
|||
|
API_KEY = 'be13f9fbf56446bdbaa16e986bb0771b'
|
|||
|
REGION = 'eastus'
|
|||
|
ENDPOINT = f'https://{REGION}.tts.speech.microsoft.com/cognitiveservices/v1'
|
|||
|
|
|||
|
@csrf_exempt
|
|||
|
def synthesize_speech(text, voice_name, style='general', rate=0):
|
|||
|
"""
|
|||
|
生成语音文件并返回音频URL
|
|||
|
"""
|
|||
|
try:
|
|||
|
# 提取语言信息
|
|||
|
language = voice_name.split('-')[0] + '-' + voice_name.split('-')[1]
|
|||
|
rate_str = f"{rate}%"
|
|||
|
output_format = 'audio-16khz-32kbitrate-mono-mp3'
|
|||
|
|
|||
|
# 设置请求头
|
|||
|
headers = {
|
|||
|
'Ocp-Apim-Subscription-Key': API_KEY,
|
|||
|
'Content-Type': 'application/ssml+xml',
|
|||
|
'X-Microsoft-OutputFormat': output_format,
|
|||
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0;'
|
|||
|
}
|
|||
|
|
|||
|
# 构建SSML请求体
|
|||
|
ssml = f"""
|
|||
|
<speak version='1.0' xmlns:mstts='http://www.w3.org/2001/mstts' xml:lang='{language}'>
|
|||
|
<voice xml:lang='{language}' xml:gender='Female' name='{voice_name}'>
|
|||
|
<mstts:express-as style='{style}'>
|
|||
|
<prosody rate='{rate_str}'>{text}</prosody>
|
|||
|
</mstts:express-as>
|
|||
|
</voice>
|
|||
|
</speak>
|
|||
|
"""
|
|||
|
|
|||
|
# 发送请求到 Microsoft 语音合成API
|
|||
|
response = requests.post(ENDPOINT, headers=headers, data=ssml)
|
|||
|
if response.status_code == 200:
|
|||
|
# 保存音频文件
|
|||
|
audio_filename = f"{uuid.uuid4()}.mp3"
|
|||
|
audio_path = os.path.join(settings.MEDIA_ROOT, audio_filename)
|
|||
|
with open(audio_path, 'wb') as audio_file:
|
|||
|
audio_file.write(response.content)
|
|||
|
audio_url = f"{settings.DOMAIN}{settings.MEDIA_URL}{audio_filename}"
|
|||
|
return {"code": 200, "audio_url": audio_url}
|
|||
|
else:
|
|||
|
return {"code": response.status_code, "message": "无法合成语音"}
|
|||
|
except Exception as e:
|
|||
|
return {"code": 500, "message": str(e)}
|
|||
|
|
|||
|
# 上传图片的接口
|
|||
|
@csrf_exempt
|
|||
|
@require_http_methods(["POST"])
|
|||
|
def upload_avatar(request):
|
|||
|
"""
|
|||
|
上传图片的接口,返回图片URL
|
|||
|
"""
|
|||
|
try:
|
|||
|
if not request.user.is_authenticated:
|
|||
|
return JsonResponse({'code': 401, 'message': '用户未登录'})
|
|||
|
|
|||
|
if 'avatar' not in request.FILES:
|
|||
|
return JsonResponse({'code': 400, 'message': '没有上传图片'})
|
|||
|
|
|||
|
avatar_file = request.FILES['avatar']
|
|||
|
|
|||
|
# 保存图片到指定目录 avatars/
|
|||
|
avatar_filename = f"{uuid.uuid4()}.{avatar_file.name.split('.')[-1]}"
|
|||
|
avatar_path = os.path.join(settings.MEDIA_ROOT, 'avatars', avatar_filename)
|
|||
|
with open(avatar_path, 'wb') as f:
|
|||
|
for chunk in avatar_file.chunks():
|
|||
|
f.write(chunk)
|
|||
|
|
|||
|
avatar_url = f"{settings.DOMAIN}{settings.MEDIA_URL}avatars/{avatar_filename}"
|
|||
|
|
|||
|
return JsonResponse({'code': 200, "message": "上传成功",'data':{'avatar_url': avatar_url} })
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"上传头像时出错: {str(e)}")
|
|||
|
return JsonResponse({"code": 500, "message": str(e)})
|
|||
|
|
|||
|
@csrf_exempt
|
|||
|
@require_http_methods(["POST"])
|
|||
|
def upload_audio(request):
|
|||
|
"""
|
|||
|
上传音乐或视频文件的接口,返回文件URL(支持 .m4v, .mp4, .mp3, .wav 格式)
|
|||
|
"""
|
|||
|
try:
|
|||
|
if not request.user.is_authenticated:
|
|||
|
return JsonResponse({'code': 401, 'message': '用户未登录'})
|
|||
|
|
|||
|
if 'audio' not in request.FILES:
|
|||
|
return JsonResponse({'code': 400, 'message': '没有上传音频或视频文件'})
|
|||
|
|
|||
|
audio_file = request.FILES['audio']
|
|||
|
file_extension = audio_file.name.split('.')[-1].lower()
|
|||
|
|
|||
|
# 允许的文件格式
|
|||
|
allowed_extensions = ['m4v', 'mp4', 'mp3', 'wav']
|
|||
|
|
|||
|
# 检查文件格式是否被允许
|
|||
|
if file_extension not in allowed_extensions:
|
|||
|
return JsonResponse({'code': 400, 'message': '只支持 .m4v, .mp4, .mp3, .wav 格式的文件'})
|
|||
|
|
|||
|
# 生成唯一的文件名
|
|||
|
audio_filename = f"{uuid.uuid4()}.{file_extension}"
|
|||
|
|
|||
|
# 创建 Azure BlobServiceClient 实例
|
|||
|
blob_service_client = BlobServiceClient.from_connection_string(settings.AZURE_CONNECTION_STRING)
|
|||
|
container_client = blob_service_client.get_container_client(settings.AZURE_CONTAINER_NAME)
|
|||
|
|
|||
|
# 上传文件到 Azure Blob Storage
|
|||
|
blob_client = container_client.get_blob_client(audio_filename)
|
|||
|
|
|||
|
# 使用 File 的 read 方法直接上传文件
|
|||
|
blob_client.upload_blob(audio_file, overwrite=True)
|
|||
|
|
|||
|
# 获取文件的 URL
|
|||
|
audio_url = blob_client.url
|
|||
|
|
|||
|
return JsonResponse({'code': 200, 'message': '上传成功', 'data': {'audio_url': audio_url}})
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"上传文件时出错: {str(e)}")
|
|||
|
return JsonResponse({"code": 500, "message": str(e)})
|
|||
|
import re
|
|||
|
# 功能费用配置
|
|||
|
FEATURE_COSTS = {
|
|||
|
"text-to-video": {"base_cost": 10}, # 按字符收费
|
|||
|
"img-to-video": {"base_cost": 10}, # 固定收费
|
|||
|
"create-tiktok-video": {"base_cost": 20, "additional_cost_per_1000_chars": 10}, # 按字符收费
|
|||
|
"create-music-video": {"base_cost": 20}, # 固定收费
|
|||
|
"create-avatar-video": {"base_cost": 30, "additional_cost_per_1000_chars": 10}, # 按字符收费
|
|||
|
}
|
|||
|
|
|||
|
# 计算文本生成类功能的所需积分
|
|||
|
def calculate_points_for_text(text, base_cost, additional_cost_per_1000_chars=0):
|
|||
|
"""
|
|||
|
根据文本内容和定价策略计算所需积分。
|
|||
|
"""
|
|||
|
# 计算中文字符和非中文字符数量
|
|||
|
chinese_chars = re.findall(r'[\u4e00-\u9fff]', text)
|
|||
|
non_chinese_chars = re.findall(r'[^\u4e00-\u9fff]', text)
|
|||
|
|
|||
|
chinese_char_count = len(chinese_chars)
|
|||
|
non_chinese_char_count = len(non_chinese_chars)
|
|||
|
|
|||
|
# 中文字符算2个普通字符
|
|||
|
total_char_count = chinese_char_count * 2 + non_chinese_char_count
|
|||
|
|
|||
|
# 计算积分
|
|||
|
total_points = base_cost + (total_char_count // 1000) * additional_cost_per_1000_chars
|
|||
|
return total_points
|
|||
|
|
|||
|
# 扣费逻辑
|
|||
|
|
|||
|
def deduct_points(user, feature, text=None):
|
|||
|
"""
|
|||
|
根据不同功能类型扣费,返回 True 或 JsonResponse 提示积分不足。
|
|||
|
:param user: 用户对象
|
|||
|
:param feature: 功能类型 (如: text_to_video, img_to_video)
|
|||
|
:param text: 可选的文本参数
|
|||
|
:return: True 或者返回 JsonResponse
|
|||
|
"""
|
|||
|
# 检查功能类型是否在费用配置中
|
|||
|
if feature not in FEATURE_COSTS:
|
|||
|
return JsonResponse({'code': 400, 'message': f'无效的功能类型: {feature}', 'data': {}})
|
|||
|
|
|||
|
# 获取费用配置
|
|||
|
feature_config = FEATURE_COSTS[feature]
|
|||
|
base_cost = feature_config.get("base_cost", 0)
|
|||
|
additional_cost_per_1000_chars = feature_config.get("additional_cost_per_1000_chars", 0)
|
|||
|
|
|||
|
# 如果有文本传入,计算所需积分和字符数量
|
|||
|
char_count = 0
|
|||
|
if text:
|
|||
|
# 计算中文字符和非中文字符数量
|
|||
|
chinese_chars = re.findall(r'[\u4e00-\u9fff]', text)
|
|||
|
non_chinese_chars = re.findall(r'[^\u4e00-\u9fff]', text)
|
|||
|
|
|||
|
chinese_char_count = len(chinese_chars)
|
|||
|
non_chinese_char_count = len(non_chinese_chars)
|
|||
|
|
|||
|
# 中文字符算2个普通字符
|
|||
|
total_char_count = chinese_char_count * 2 + non_chinese_char_count
|
|||
|
char_count = total_char_count
|
|||
|
|
|||
|
# 计算积分
|
|||
|
required_points = calculate_points_for_text(text, base_cost, additional_cost_per_1000_chars)
|
|||
|
else:
|
|||
|
# 固定收费的功能
|
|||
|
required_points = base_cost
|
|||
|
|
|||
|
# 检查用户积分是否足够
|
|||
|
if user.points < required_points:
|
|||
|
return JsonResponse({'code': 402, 'message': '积分不足', 'data': {}})
|
|||
|
|
|||
|
# 扣费前记录用户积分
|
|||
|
previous_points_balance = user.points
|
|||
|
|
|||
|
# 扣除积分并保存
|
|||
|
user.points -= required_points
|
|||
|
user.save()
|
|||
|
|
|||
|
|
|||
|
# 记录消费明细到数据库
|
|||
|
TransactionHistory.objects.create(
|
|||
|
user=user,
|
|||
|
feature=feature,
|
|||
|
points_spent=required_points,
|
|||
|
char_count=char_count if text else None, # 记录字符数量
|
|||
|
previous_points_balance=previous_points_balance,
|
|||
|
new_points_balance=user.points,
|
|||
|
description=f'扣费 {required_points} 积分用于 {feature}',
|
|||
|
success=True
|
|||
|
)
|
|||
|
|
|||
|
# 返回成功扣费
|
|||
|
return True
|
|||
|
|
|||
|
|
|||
|
|