240 lines
8.9 KiB
Python
Executable File
240 lines
8.9 KiB
Python
Executable File
import os
|
||
import uuid
|
||
import json
|
||
import requests
|
||
from django.conf import settings
|
||
from django.http import JsonResponse
|
||
from django.core.exceptions import ObjectDoesNotExist
|
||
from django.views.decorators.csrf import csrf_exempt
|
||
from django.views.decorators.http import require_http_methods
|
||
from azure.storage.blob import BlobServiceClient, BlobClient
|
||
|
||
from .models import VideoGeneration, User, TransactionHistory
|
||
from .utils import get_logger
|
||
|
||
# 配置日志记录
|
||
log_file = 'API日志.log'
|
||
logger = get_logger('API日志', log_file, when='midnight', backup_count=7)
|
||
|
||
# API 配置
|
||
API_KEY = 'be13f9fbf56446bdbaa16e986bb0771b'
|
||
REGION = 'eastus'
|
||
ENDPOINT = f'https://{REGION}.tts.speech.microsoft.com/cognitiveservices/v1'
|
||
|
||
@csrf_exempt
|
||
def synthesize_speech(text, voice_name, style='general', rate=0):
|
||
"""
|
||
生成语音文件并返回音频URL
|
||
"""
|
||
try:
|
||
# 提取语言信息
|
||
language = voice_name.split('-')[0] + '-' + voice_name.split('-')[1]
|
||
rate_str = f"{rate}%"
|
||
output_format = 'audio-16khz-32kbitrate-mono-mp3'
|
||
|
||
# 设置请求头
|
||
headers = {
|
||
'Ocp-Apim-Subscription-Key': API_KEY,
|
||
'Content-Type': 'application/ssml+xml',
|
||
'X-Microsoft-OutputFormat': output_format,
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0;'
|
||
}
|
||
|
||
# 构建SSML请求体
|
||
ssml = f"""
|
||
<speak version='1.0' xmlns:mstts='http://www.w3.org/2001/mstts' xml:lang='{language}'>
|
||
<voice xml:lang='{language}' xml:gender='Female' name='{voice_name}'>
|
||
<mstts:express-as style='{style}'>
|
||
<prosody rate='{rate_str}'>{text}</prosody>
|
||
</mstts:express-as>
|
||
</voice>
|
||
</speak>
|
||
"""
|
||
|
||
# 发送请求到 Microsoft 语音合成API
|
||
response = requests.post(ENDPOINT, headers=headers, data=ssml)
|
||
if response.status_code == 200:
|
||
# 保存音频文件
|
||
audio_filename = f"{uuid.uuid4()}.mp3"
|
||
audio_path = os.path.join(settings.MEDIA_ROOT, audio_filename)
|
||
with open(audio_path, 'wb') as audio_file:
|
||
audio_file.write(response.content)
|
||
audio_url = f"{settings.DOMAIN}{settings.MEDIA_URL}{audio_filename}"
|
||
return {"code": 200, "audio_url": audio_url}
|
||
else:
|
||
return {"code": response.status_code, "message": "无法合成语音"}
|
||
except Exception as e:
|
||
return {"code": 500, "message": str(e)}
|
||
|
||
# 上传图片的接口
|
||
@csrf_exempt
|
||
@require_http_methods(["POST"])
|
||
def upload_avatar(request):
|
||
"""
|
||
上传图片的接口,返回图片URL
|
||
"""
|
||
try:
|
||
if not request.user.is_authenticated:
|
||
return JsonResponse({'code': 401, 'message': '用户未登录'})
|
||
|
||
if 'avatar' not in request.FILES:
|
||
return JsonResponse({'code': 400, 'message': '没有上传图片'})
|
||
|
||
avatar_file = request.FILES['avatar']
|
||
|
||
# 保存图片到指定目录 avatars/
|
||
avatar_filename = f"{uuid.uuid4()}.{avatar_file.name.split('.')[-1]}"
|
||
avatar_path = os.path.join(settings.MEDIA_ROOT, 'avatars', avatar_filename)
|
||
with open(avatar_path, 'wb') as f:
|
||
for chunk in avatar_file.chunks():
|
||
f.write(chunk)
|
||
|
||
avatar_url = f"{settings.DOMAIN}{settings.MEDIA_URL}avatars/{avatar_filename}"
|
||
|
||
return JsonResponse({'code': 200, "message": "上传成功",'data':{'avatar_url': avatar_url} })
|
||
except Exception as e:
|
||
logger.error(f"上传头像时出错: {str(e)}")
|
||
return JsonResponse({"code": 500, "message": str(e)})
|
||
|
||
@csrf_exempt
|
||
@require_http_methods(["POST"])
|
||
def upload_audio(request):
|
||
"""
|
||
上传音乐或视频文件的接口,返回文件URL(支持 .m4v, .mp4, .mp3, .wav 格式)
|
||
"""
|
||
try:
|
||
if not request.user.is_authenticated:
|
||
return JsonResponse({'code': 401, 'message': '用户未登录'})
|
||
|
||
if 'audio' not in request.FILES:
|
||
return JsonResponse({'code': 400, 'message': '没有上传音频或视频文件'})
|
||
|
||
audio_file = request.FILES['audio']
|
||
file_extension = audio_file.name.split('.')[-1].lower()
|
||
|
||
# 允许的文件格式
|
||
allowed_extensions = ['m4v', 'mp4', 'mp3', 'wav']
|
||
|
||
# 检查文件格式是否被允许
|
||
if file_extension not in allowed_extensions:
|
||
return JsonResponse({'code': 400, 'message': '只支持 .m4v, .mp4, .mp3, .wav 格式的文件'})
|
||
|
||
# 生成唯一的文件名
|
||
audio_filename = f"{uuid.uuid4()}.{file_extension}"
|
||
|
||
# 创建 Azure BlobServiceClient 实例
|
||
blob_service_client = BlobServiceClient.from_connection_string(settings.AZURE_CONNECTION_STRING)
|
||
container_client = blob_service_client.get_container_client(settings.AZURE_CONTAINER_NAME)
|
||
|
||
# 上传文件到 Azure Blob Storage
|
||
blob_client = container_client.get_blob_client(audio_filename)
|
||
|
||
# 使用 File 的 read 方法直接上传文件
|
||
blob_client.upload_blob(audio_file, overwrite=True)
|
||
|
||
# 获取文件的 URL
|
||
audio_url = blob_client.url
|
||
|
||
return JsonResponse({'code': 200, 'message': '上传成功', 'data': {'audio_url': audio_url}})
|
||
|
||
except Exception as e:
|
||
logger.error(f"上传文件时出错: {str(e)}")
|
||
return JsonResponse({"code": 500, "message": str(e)})
|
||
import re
|
||
# 功能费用配置
|
||
FEATURE_COSTS = {
|
||
"text-to-video": {"base_cost": 10}, # 按字符收费
|
||
"img-to-video": {"base_cost": 10}, # 固定收费
|
||
"create-tiktok-video": {"base_cost": 20, "additional_cost_per_1000_chars": 10}, # 按字符收费
|
||
"create-music-video": {"base_cost": 20}, # 固定收费
|
||
"create-avatar-video": {"base_cost": 30, "additional_cost_per_1000_chars": 10}, # 按字符收费
|
||
}
|
||
|
||
# 计算文本生成类功能的所需积分
|
||
def calculate_points_for_text(text, base_cost, additional_cost_per_1000_chars=0):
|
||
"""
|
||
根据文本内容和定价策略计算所需积分。
|
||
"""
|
||
# 计算中文字符和非中文字符数量
|
||
chinese_chars = re.findall(r'[\u4e00-\u9fff]', text)
|
||
non_chinese_chars = re.findall(r'[^\u4e00-\u9fff]', text)
|
||
|
||
chinese_char_count = len(chinese_chars)
|
||
non_chinese_char_count = len(non_chinese_chars)
|
||
|
||
# 中文字符算2个普通字符
|
||
total_char_count = chinese_char_count * 2 + non_chinese_char_count
|
||
|
||
# 计算积分
|
||
total_points = base_cost + (total_char_count // 1000) * additional_cost_per_1000_chars
|
||
return total_points
|
||
|
||
# 扣费逻辑
|
||
|
||
def deduct_points(user, feature, text=None):
|
||
"""
|
||
根据不同功能类型扣费,返回 True 或 JsonResponse 提示积分不足。
|
||
:param user: 用户对象
|
||
:param feature: 功能类型 (如: text_to_video, img_to_video)
|
||
:param text: 可选的文本参数
|
||
:return: True 或者返回 JsonResponse
|
||
"""
|
||
# 检查功能类型是否在费用配置中
|
||
if feature not in FEATURE_COSTS:
|
||
return JsonResponse({'code': 400, 'message': f'无效的功能类型: {feature}', 'data': {}})
|
||
|
||
# 获取费用配置
|
||
feature_config = FEATURE_COSTS[feature]
|
||
base_cost = feature_config.get("base_cost", 0)
|
||
additional_cost_per_1000_chars = feature_config.get("additional_cost_per_1000_chars", 0)
|
||
|
||
# 如果有文本传入,计算所需积分和字符数量
|
||
char_count = 0
|
||
if text:
|
||
# 计算中文字符和非中文字符数量
|
||
chinese_chars = re.findall(r'[\u4e00-\u9fff]', text)
|
||
non_chinese_chars = re.findall(r'[^\u4e00-\u9fff]', text)
|
||
|
||
chinese_char_count = len(chinese_chars)
|
||
non_chinese_char_count = len(non_chinese_chars)
|
||
|
||
# 中文字符算2个普通字符
|
||
total_char_count = chinese_char_count * 2 + non_chinese_char_count
|
||
char_count = total_char_count
|
||
|
||
# 计算积分
|
||
required_points = calculate_points_for_text(text, base_cost, additional_cost_per_1000_chars)
|
||
else:
|
||
# 固定收费的功能
|
||
required_points = base_cost
|
||
|
||
# 检查用户积分是否足够
|
||
if user.points < required_points:
|
||
return JsonResponse({'code': 402, 'message': '积分不足', 'data': {}})
|
||
|
||
# 扣费前记录用户积分
|
||
previous_points_balance = user.points
|
||
|
||
# 扣除积分并保存
|
||
user.points -= required_points
|
||
user.save()
|
||
|
||
|
||
# 记录消费明细到数据库
|
||
TransactionHistory.objects.create(
|
||
user=user,
|
||
feature=feature,
|
||
points_spent=required_points,
|
||
char_count=char_count if text else None, # 记录字符数量
|
||
previous_points_balance=previous_points_balance,
|
||
new_points_balance=user.points,
|
||
description=f'扣费 {required_points} 积分用于 {feature}',
|
||
success=True
|
||
)
|
||
|
||
# 返回成功扣费
|
||
return True
|
||
|
||
|
||
|