import os import uuid import json import requests from django.conf import settings from django.http import JsonResponse from django.core.exceptions import ObjectDoesNotExist from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_http_methods from azure.storage.blob import BlobServiceClient, BlobClient from .models import VideoGeneration, User, TransactionHistory from .utils import get_logger # 配置日志记录 log_file = 'API日志.log' logger = get_logger('API日志', log_file, when='midnight', backup_count=7) # API 配置 API_KEY = 'be13f9fbf56446bdbaa16e986bb0771b' REGION = 'eastus' ENDPOINT = f'https://{REGION}.tts.speech.microsoft.com/cognitiveservices/v1' @csrf_exempt def synthesize_speech(text, voice_name, style='general', rate=0): """ 生成语音文件并返回音频URL """ try: # 提取语言信息 language = voice_name.split('-')[0] + '-' + voice_name.split('-')[1] rate_str = f"{rate}%" output_format = 'audio-16khz-32kbitrate-mono-mp3' # 设置请求头 headers = { 'Ocp-Apim-Subscription-Key': API_KEY, 'Content-Type': 'application/ssml+xml', 'X-Microsoft-OutputFormat': output_format, 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0;' } # 构建SSML请求体 ssml = f""" {text} """ # 发送请求到 Microsoft 语音合成API response = requests.post(ENDPOINT, headers=headers, data=ssml) if response.status_code == 200: # 保存音频文件 audio_filename = f"{uuid.uuid4()}.mp3" audio_path = os.path.join(settings.MEDIA_ROOT, audio_filename) with open(audio_path, 'wb') as audio_file: audio_file.write(response.content) audio_url = f"{settings.DOMAIN}{settings.MEDIA_URL}{audio_filename}" return {"code": 200, "audio_url": audio_url} else: return {"code": response.status_code, "message": "无法合成语音"} except Exception as e: return {"code": 500, "message": str(e)} # 上传图片的接口 @csrf_exempt @require_http_methods(["POST"]) def upload_avatar(request): """ 上传图片的接口,返回图片URL """ try: if not request.user.is_authenticated: return JsonResponse({'code': 401, 'message': '用户未登录'}) if 'avatar' not in request.FILES: return JsonResponse({'code': 400, 'message': '没有上传图片'}) avatar_file = request.FILES['avatar'] # 保存图片到指定目录 avatars/ avatar_filename = f"{uuid.uuid4()}.{avatar_file.name.split('.')[-1]}" avatar_path = os.path.join(settings.MEDIA_ROOT, 'avatars', avatar_filename) with open(avatar_path, 'wb') as f: for chunk in avatar_file.chunks(): f.write(chunk) avatar_url = f"{settings.DOMAIN}{settings.MEDIA_URL}avatars/{avatar_filename}" return JsonResponse({'code': 200, "message": "上传成功",'data':{'avatar_url': avatar_url} }) except Exception as e: logger.error(f"上传头像时出错: {str(e)}") return JsonResponse({"code": 500, "message": str(e)}) @csrf_exempt @require_http_methods(["POST"]) def upload_audio(request): """ 上传音乐或视频文件的接口,返回文件URL(支持 .m4v, .mp4, .mp3, .wav 格式) """ try: if not request.user.is_authenticated: return JsonResponse({'code': 401, 'message': '用户未登录'}) if 'audio' not in request.FILES: return JsonResponse({'code': 400, 'message': '没有上传音频或视频文件'}) audio_file = request.FILES['audio'] file_extension = audio_file.name.split('.')[-1].lower() # 允许的文件格式 allowed_extensions = ['m4v', 'mp4', 'mp3', 'wav'] # 检查文件格式是否被允许 if file_extension not in allowed_extensions: return JsonResponse({'code': 400, 'message': '只支持 .m4v, .mp4, .mp3, .wav 格式的文件'}) # 生成唯一的文件名 audio_filename = f"{uuid.uuid4()}.{file_extension}" # 创建 Azure BlobServiceClient 实例 blob_service_client = BlobServiceClient.from_connection_string(settings.AZURE_CONNECTION_STRING) container_client = blob_service_client.get_container_client(settings.AZURE_CONTAINER_NAME) # 上传文件到 Azure Blob Storage blob_client = container_client.get_blob_client(audio_filename) # 使用 File 的 read 方法直接上传文件 blob_client.upload_blob(audio_file, overwrite=True) # 获取文件的 URL audio_url = blob_client.url return JsonResponse({'code': 200, 'message': '上传成功', 'data': {'audio_url': audio_url}}) except Exception as e: logger.error(f"上传文件时出错: {str(e)}") return JsonResponse({"code": 500, "message": str(e)}) import re # 功能费用配置 FEATURE_COSTS = { "text-to-video": {"base_cost": 10}, # 按字符收费 "img-to-video": {"base_cost": 10}, # 固定收费 "create-tiktok-video": {"base_cost": 20, "additional_cost_per_1000_chars": 10}, # 按字符收费 "create-music-video": {"base_cost": 20}, # 固定收费 "create-avatar-video": {"base_cost": 30, "additional_cost_per_1000_chars": 10}, # 按字符收费 } # 计算文本生成类功能的所需积分 def calculate_points_for_text(text, base_cost, additional_cost_per_1000_chars=0): """ 根据文本内容和定价策略计算所需积分。 """ # 计算中文字符和非中文字符数量 chinese_chars = re.findall(r'[\u4e00-\u9fff]', text) non_chinese_chars = re.findall(r'[^\u4e00-\u9fff]', text) chinese_char_count = len(chinese_chars) non_chinese_char_count = len(non_chinese_chars) # 中文字符算2个普通字符 total_char_count = chinese_char_count * 2 + non_chinese_char_count # 计算积分 total_points = base_cost + (total_char_count // 1000) * additional_cost_per_1000_chars return total_points # 扣费逻辑 def deduct_points(user, feature, text=None): """ 根据不同功能类型扣费,返回 True 或 JsonResponse 提示积分不足。 :param user: 用户对象 :param feature: 功能类型 (如: text_to_video, img_to_video) :param text: 可选的文本参数 :return: True 或者返回 JsonResponse """ # 检查功能类型是否在费用配置中 if feature not in FEATURE_COSTS: return JsonResponse({'code': 400, 'message': f'无效的功能类型: {feature}', 'data': {}}) # 获取费用配置 feature_config = FEATURE_COSTS[feature] base_cost = feature_config.get("base_cost", 0) additional_cost_per_1000_chars = feature_config.get("additional_cost_per_1000_chars", 0) # 如果有文本传入,计算所需积分和字符数量 char_count = 0 if text: # 计算中文字符和非中文字符数量 chinese_chars = re.findall(r'[\u4e00-\u9fff]', text) non_chinese_chars = re.findall(r'[^\u4e00-\u9fff]', text) chinese_char_count = len(chinese_chars) non_chinese_char_count = len(non_chinese_chars) # 中文字符算2个普通字符 total_char_count = chinese_char_count * 2 + non_chinese_char_count char_count = total_char_count # 计算积分 required_points = calculate_points_for_text(text, base_cost, additional_cost_per_1000_chars) else: # 固定收费的功能 required_points = base_cost # 检查用户积分是否足够 if user.points < required_points: return JsonResponse({'code': 402, 'message': '积分不足', 'data': {}}) # 扣费前记录用户积分 previous_points_balance = user.points # 扣除积分并保存 user.points -= required_points user.save() # 记录消费明细到数据库 TransactionHistory.objects.create( user=user, feature=feature, points_spent=required_points, char_count=char_count if text else None, # 记录字符数量 previous_points_balance=previous_points_balance, new_points_balance=user.points, description=f'扣费 {required_points} 积分用于 {feature}', success=True ) # 返回成功扣费 return True