ai_admin/WebAdmin/base.py
2024-09-20 04:29:09 +00:00

240 lines
8.9 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import uuid
import json
import requests
from django.conf import settings
from django.http import JsonResponse
from django.core.exceptions import ObjectDoesNotExist
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from azure.storage.blob import BlobServiceClient, BlobClient
from .models import VideoGeneration, User, TransactionHistory
from .utils import get_logger
# 配置日志记录
log_file = 'API日志.log'
logger = get_logger('API日志', log_file, when='midnight', backup_count=7)
# API 配置
API_KEY = 'be13f9fbf56446bdbaa16e986bb0771b'
REGION = 'eastus'
ENDPOINT = f'https://{REGION}.tts.speech.microsoft.com/cognitiveservices/v1'
@csrf_exempt
def synthesize_speech(text, voice_name, style='general', rate=0):
"""
生成语音文件并返回音频URL
"""
try:
# 提取语言信息
language = voice_name.split('-')[0] + '-' + voice_name.split('-')[1]
rate_str = f"{rate}%"
output_format = 'audio-16khz-32kbitrate-mono-mp3'
# 设置请求头
headers = {
'Ocp-Apim-Subscription-Key': API_KEY,
'Content-Type': 'application/ssml+xml',
'X-Microsoft-OutputFormat': output_format,
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0;'
}
# 构建SSML请求体
ssml = f"""
<speak version='1.0' xmlns:mstts='http://www.w3.org/2001/mstts' xml:lang='{language}'>
<voice xml:lang='{language}' xml:gender='Female' name='{voice_name}'>
<mstts:express-as style='{style}'>
<prosody rate='{rate_str}'>{text}</prosody>
</mstts:express-as>
</voice>
</speak>
"""
# 发送请求到 Microsoft 语音合成API
response = requests.post(ENDPOINT, headers=headers, data=ssml)
if response.status_code == 200:
# 保存音频文件
audio_filename = f"{uuid.uuid4()}.mp3"
audio_path = os.path.join(settings.MEDIA_ROOT, audio_filename)
with open(audio_path, 'wb') as audio_file:
audio_file.write(response.content)
audio_url = f"{settings.DOMAIN}{settings.MEDIA_URL}{audio_filename}"
return {"code": 200, "audio_url": audio_url}
else:
return {"code": response.status_code, "message": "无法合成语音"}
except Exception as e:
return {"code": 500, "message": str(e)}
# 上传图片的接口
@csrf_exempt
@require_http_methods(["POST"])
def upload_avatar(request):
"""
上传图片的接口返回图片URL
"""
try:
if not request.user.is_authenticated:
return JsonResponse({'code': 401, 'message': '用户未登录'})
if 'avatar' not in request.FILES:
return JsonResponse({'code': 400, 'message': '没有上传图片'})
avatar_file = request.FILES['avatar']
# 保存图片到指定目录 avatars/
avatar_filename = f"{uuid.uuid4()}.{avatar_file.name.split('.')[-1]}"
avatar_path = os.path.join(settings.MEDIA_ROOT, 'avatars', avatar_filename)
with open(avatar_path, 'wb') as f:
for chunk in avatar_file.chunks():
f.write(chunk)
avatar_url = f"{settings.DOMAIN}{settings.MEDIA_URL}avatars/{avatar_filename}"
return JsonResponse({'code': 200, "message": "上传成功",'data':{'avatar_url': avatar_url} })
except Exception as e:
logger.error(f"上传头像时出错: {str(e)}")
return JsonResponse({"code": 500, "message": str(e)})
@csrf_exempt
@require_http_methods(["POST"])
def upload_audio(request):
"""
上传音乐或视频文件的接口返回文件URL支持 .m4v, .mp4, .mp3, .wav 格式)
"""
try:
if not request.user.is_authenticated:
return JsonResponse({'code': 401, 'message': '用户未登录'})
if 'audio' not in request.FILES:
return JsonResponse({'code': 400, 'message': '没有上传音频或视频文件'})
audio_file = request.FILES['audio']
file_extension = audio_file.name.split('.')[-1].lower()
# 允许的文件格式
allowed_extensions = ['m4v', 'mp4', 'mp3', 'wav']
# 检查文件格式是否被允许
if file_extension not in allowed_extensions:
return JsonResponse({'code': 400, 'message': '只支持 .m4v, .mp4, .mp3, .wav 格式的文件'})
# 生成唯一的文件名
audio_filename = f"{uuid.uuid4()}.{file_extension}"
# 创建 Azure BlobServiceClient 实例
blob_service_client = BlobServiceClient.from_connection_string(settings.AZURE_CONNECTION_STRING)
container_client = blob_service_client.get_container_client(settings.AZURE_CONTAINER_NAME)
# 上传文件到 Azure Blob Storage
blob_client = container_client.get_blob_client(audio_filename)
# 使用 File 的 read 方法直接上传文件
blob_client.upload_blob(audio_file, overwrite=True)
# 获取文件的 URL
audio_url = blob_client.url
return JsonResponse({'code': 200, 'message': '上传成功', 'data': {'audio_url': audio_url}})
except Exception as e:
logger.error(f"上传文件时出错: {str(e)}")
return JsonResponse({"code": 500, "message": str(e)})
import re
# 功能费用配置
FEATURE_COSTS = {
"text-to-video": {"base_cost": 10}, # 按字符收费
"img-to-video": {"base_cost": 10}, # 固定收费
"create-tiktok-video": {"base_cost": 20, "additional_cost_per_1000_chars": 10}, # 按字符收费
"create-music-video": {"base_cost": 20}, # 固定收费
"create-avatar-video": {"base_cost": 30, "additional_cost_per_1000_chars": 10}, # 按字符收费
}
# 计算文本生成类功能的所需积分
def calculate_points_for_text(text, base_cost, additional_cost_per_1000_chars=0):
"""
根据文本内容和定价策略计算所需积分。
"""
# 计算中文字符和非中文字符数量
chinese_chars = re.findall(r'[\u4e00-\u9fff]', text)
non_chinese_chars = re.findall(r'[^\u4e00-\u9fff]', text)
chinese_char_count = len(chinese_chars)
non_chinese_char_count = len(non_chinese_chars)
# 中文字符算2个普通字符
total_char_count = chinese_char_count * 2 + non_chinese_char_count
# 计算积分
total_points = base_cost + (total_char_count // 1000) * additional_cost_per_1000_chars
return total_points
# 扣费逻辑
def deduct_points(user, feature, text=None):
"""
根据不同功能类型扣费,返回 True 或 JsonResponse 提示积分不足。
:param user: 用户对象
:param feature: 功能类型 (如: text_to_video, img_to_video)
:param text: 可选的文本参数
:return: True 或者返回 JsonResponse
"""
# 检查功能类型是否在费用配置中
if feature not in FEATURE_COSTS:
return JsonResponse({'code': 400, 'message': f'无效的功能类型: {feature}', 'data': {}})
# 获取费用配置
feature_config = FEATURE_COSTS[feature]
base_cost = feature_config.get("base_cost", 0)
additional_cost_per_1000_chars = feature_config.get("additional_cost_per_1000_chars", 0)
# 如果有文本传入,计算所需积分和字符数量
char_count = 0
if text:
# 计算中文字符和非中文字符数量
chinese_chars = re.findall(r'[\u4e00-\u9fff]', text)
non_chinese_chars = re.findall(r'[^\u4e00-\u9fff]', text)
chinese_char_count = len(chinese_chars)
non_chinese_char_count = len(non_chinese_chars)
# 中文字符算2个普通字符
total_char_count = chinese_char_count * 2 + non_chinese_char_count
char_count = total_char_count
# 计算积分
required_points = calculate_points_for_text(text, base_cost, additional_cost_per_1000_chars)
else:
# 固定收费的功能
required_points = base_cost
# 检查用户积分是否足够
if user.points < required_points:
return JsonResponse({'code': 402, 'message': '积分不足', 'data': {}})
# 扣费前记录用户积分
previous_points_balance = user.points
# 扣除积分并保存
user.points -= required_points
user.save()
# 记录消费明细到数据库
TransactionHistory.objects.create(
user=user,
feature=feature,
points_spent=required_points,
char_count=char_count if text else None, # 记录字符数量
previous_points_balance=previous_points_balance,
new_points_balance=user.points,
description=f'扣费 {required_points} 积分用于 {feature}',
success=True
)
# 返回成功扣费
return True