Files
Ai_Admin/WebAdmin/admin_views.py
Jane Doe 9992293b9f 2.0
2024-09-25 12:18:07 +08:00

532 lines
20 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import re
import uuid
from datetime import datetime
from django.contrib.auth import logout
from django.contrib.auth import authenticate, login as auth_login, update_session_auth_hash
from django.core.exceptions import ValidationError
from django.core.serializers.json import DjangoJSONEncoder
from django.db import IntegrityError
from django.db.models import Count, Sum
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.shortcuts import render, redirect,get_object_or_404
from django.contrib.auth.decorators import login_required
from allauth.account.auth_backends import AuthenticationBackend
from django.views.decorators.http import require_http_methods
from django.views.generic import View
from django.core.cache import cache
from captcha.image import ImageCaptcha
import random
from WebSite import settings
from .models import User, WebsiteInfo, Article, Order, VideoGeneration
from django.http import JsonResponse, HttpResponse
import string
def generate_captcha(request):
image = ImageCaptcha()
captcha_text = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4)) # 生成随机验证码
cache.set('captcha', captcha_text, timeout=300) # 存储验证码
image_data = image.generate(captcha_text)
return HttpResponse(image_data, content_type='image/png')
@csrf_exempt
def admin_login_view(request):
if request.user.is_authenticated:
return redirect('admin_home')
if request.method == 'POST':
try:
body = json.loads(request.body)
username = body.get('username')
password = body.get('password')
captcha_input = body.get('captcha')
captcha_stored = cache.get('captcha')
if captcha_input != captcha_stored:
return JsonResponse({'code': 400, 'message': '验证码错误'})
user = authenticate(request, username=username, password=password)
if user is not None:
auth_login(request, user)
return JsonResponse({'code': 200, 'message': '登录成功'})
else:
return JsonResponse({'code': 400, 'message': '用户名或密码错误'})
except json.JSONDecodeError:
return JsonResponse({'code': 400, 'message': '无效的请求数据'})
except Exception as e:
return JsonResponse({'code': 500, 'message': f'服务器内部错误: {str(e)}'})
return render(request, 'admin/admin_login.html')
@login_required
def admin_home_view(request):
return render(request, 'admin/home.html')
@login_required
@require_http_methods(["GET", "POST"])
@csrf_exempt # 根据需要选择是否使用
def admin_article_list(request):
articles = list(Article.objects.all().values('id', 'chinese_title', 'english_title', 'published_at', 'status'))
if request.method == 'GET':
# GET 请求返回页面
return render(request, 'admin/article-list.html', {"articles":articles})
elif request.method == 'POST':
# 获取文章列表并转换为 JSON 格式
articles = list(Article.objects.all().values('id', 'chinese_title', 'english_title', 'published_at', 'status'))
# 手动格式化 published_at 字段为字符串
for article in articles:
if article['published_at']:
article['published_at'] = article['published_at'].strftime('%Y-%m-%d %H:%M:%S')
else:
article['published_at'] = '未发布'
# 返回适配 layui 表格的数据格式
data = {
"code": 200,
"msg": "成功",
"count": len(articles), # 返回文章数量
"data": articles # 直接返回文章数据
}
return JsonResponse(data) # 返回 JSON 响应
@login_required
def ArticleDetailView(request, pk, lang):
# 确认用户登录
if not request.user.is_authenticated:
return redirect('admin_login')
# 获取指定的文章
article = get_object_or_404(Article, pk=pk)
# 根据 URL 中的 lang 参数选择显示的语言
if lang == 'en':
title = article.english_title
keywords = article.english_keywords
content = article.english_content
else:
title = article.chinese_title
keywords = article.chinese_keywords
content = article.chinese_content
# 渲染模板并传递数据
context = {
'title': title,
'keywords': keywords,
'content': content,
'published_at': article.published_at,
}
return render(request, 'admin/article_detail.html', context)
@login_required
@csrf_exempt # 根据需要选择是否使用
def admin_site_info_view(request):
website_info = WebsiteInfo.objects.first()
if request.method == 'POST':
try:
# 获取 JSON 数据
data = json.loads(request.body)
# 从数据中提取信息
domain_en = data.get('domain_en', '')
title_en = data.get('title_en', '')
keywords_en = data.get('keywords_en', '')
description_en = data.get('description_en', '')
domain_zh = data.get('domain_zh', '')
title_zh = data.get('title_zh', '')
keywords_zh = data.get('keywords_zh', '')
description_zh = data.get('description_zh', '')
# 数据验证(示例)
if not domain_en or not domain_zh:
return JsonResponse({
'code': 400,
'message': '域名不能为空',
'data': {}
})
# 更新网站配置信息
website_info.domain_en = domain_en
website_info.title_en = title_en
website_info.keywords_en = keywords_en
website_info.description_en = description_en
website_info.domain_zh = domain_zh
website_info.title_zh = title_zh
website_info.keywords_zh = keywords_zh
website_info.description_zh = description_zh
website_info.save()
# 返回统一的响应格式
return JsonResponse({
'code': 200,
'message': '保存成功',
'data': {}
})
except json.JSONDecodeError:
return JsonResponse({
'code': 400,
'message': '无效的 JSON 格式',
'data': {}
})
except ValidationError as ve:
return JsonResponse({
'code': 400,
'message': str(ve),
'data': {}
})
except IntegrityError:
return JsonResponse({
'code': 500,
'message': '数据库错误,请重试',
'data': {}
})
except Exception as e:
return JsonResponse({
'code': 500,
'message': '服务器错误,请重试',
'data': {'error': str(e)}
})
# 返回网站配置信息
# data = {
# 'domain_en': website_info.domain_en,
# 'title_en': website_info.title_en,
# 'keywords_en': website_info.keywords_en,
# 'description_en': website_info.description_en,
# 'domain_zh': website_info.domain_zh,
# 'title_zh': website_info.title_zh,
# 'keywords_zh': website_info.keywords_zh,
# 'description_zh': website_info.description_zh,
# }
else:
return render(request, 'admin/site-info.html', {"website_info": website_info})
@login_required
@csrf_exempt
def save_article(request, pk=None):
if request.method == "GET":
# 如果传递了文章 ID则为编辑模式否则为添加模式
article = get_object_or_404(Article, pk=pk) if pk else None
return render(request, 'admin/add-article.html', {
'article': article, # 传递文章对象给模板
'page_title': '修改文章' if article else '添加文章',
})
elif request.method == "POST":
try:
data = json.loads(request.body)
chinese_title = data.get('chinese_title', '')
english_title = data.get('english_title', '')
chinese_content = data.get('chinese_content', '')
english_content = data.get('english_content', '')
image_url = data.get('image_url', '')
status = data.get('status', 'pending')
# 检查是否传递了文章 ID 以判断是编辑还是添加
if pk:
article = get_object_or_404(Article, pk=pk)
else:
article = Article()
# 设置文章内容
article.chinese_title = chinese_title
article.english_title = english_title
article.chinese_content = chinese_content
article.english_content = english_content
article.image_url = image_url
article.status = status
article.published_at = datetime.now() if status == 'published' else None
# 保存文章
article.save()
return JsonResponse({'code': 200, 'message': '文章保存成功', 'article_id': article.id})
except Exception as e:
return JsonResponse({'code': 500, 'message': str(e)})
# 图片上传接口(专用于编辑器)
@login_required
@csrf_exempt
@require_http_methods(["POST"])
def upload_editor_image(request):
"""
上传图片并返回图片URL供富文本编辑器使用
"""
try:
# 验证用户是否已登录
if not request.user.is_authenticated:
return JsonResponse({'code': 401, 'message': '用户未登录'})
# 检查是否包含上传的图片
if 'file' not in request.FILES:
return JsonResponse({'code': 400, 'message': '没有上传图片'})
# 获取上传的图片文件
image_file = request.FILES['file']
# 使用 UUID 为文件生成唯一名称,并保存到服务器指定目录
file_extension = image_file.name.split('.')[-1]
filename = f"{uuid.uuid4()}.{file_extension}"
file_path = os.path.join(settings.MEDIA_ROOT, 'editor_images', filename)
# 将文件写入服务器
with open(file_path, 'wb') as f:
for chunk in image_file.chunks():
f.write(chunk)
# 生成图片的访问 URL
image_url = f"{settings.DOMAIN}{settings.MEDIA_URL}editor_images/{filename}"
data_json={
"code": 0
,"msg": "图片上传成功"
,"data": {
"src": image_url
,"title": None
}
}
return JsonResponse(data_json)
except Exception as e:
return JsonResponse({'code': 500, 'message': str(e)})
class ArticleManagementView(View):
def delete(self, request, article_id):
if not request.user.is_authenticated or not request.user.is_staff:
return JsonResponse({'code': 403, 'message': '没有权限执行此操作'})
try:
article = Article.objects.get(id=article_id)
article.delete()
return JsonResponse({'code': 200, 'message': '删除成功'})
except Article.DoesNotExist:
return JsonResponse({'code': 404, 'message': '文章未找到'})
def post(self, request, article_id):
if not request.user.is_authenticated or not request.user.is_staff:
return JsonResponse({'code': 403, 'message': '没有权限执行此操作'})
try:
article = Article.objects.get(id=article_id)
# 审核文章逻辑
article.status = 'published' # 假设审核通过状态设为 published
article.published_at = datetime.now() # 设置当前时间为发布时间
article.save()
return JsonResponse({'code': 200, 'message': '审核成功'})
except Article.DoesNotExist:
return JsonResponse({'code': 404, 'message': '文章未找到'})
from django.core.paginator import Paginator, EmptyPage, InvalidPage
@login_required
@require_http_methods(["GET", "POST"])
@csrf_exempt
def admin_user_list(request):
if request.method == 'GET':
# GET 请求返回页面
return render(request, 'admin/user-list.html')
elif request.method == 'POST':
# 获取分页参数
page = request.POST.get('page', 1)
limit = request.POST.get('limit', 10)
# 获取用户列表并转换为 JSON 格式,按照注册时间最新排序
users_queryset = User.objects.all().order_by('-created_at').values(
'id', 'username', 'email', 'phone', 'is_member', 'points', 'created_at',
'login_count', 'last_login_ip', 'source'
)
paginator = Paginator(users_queryset, limit)
current_page = paginator.page(page)
# 格式化创建时间字段
users = list(current_page.object_list)
for user in users:
if user['created_at']:
user['created_at'] = user['created_at'].strftime('%Y-%m-%d %H:%M:%S')
# 返回适配 layui 表格的数据格式
data = {
"code": 0,
"msg": "成功",
"count": paginator.count, # 总记录数
"data": users
}
return JsonResponse(data)
@login_required
@require_http_methods(["GET", "POST"])
@csrf_exempt
def admin_task_list(request):
if request.method == 'GET':
# GET 请求返回页面
return render(request, 'admin/task-list.html')
elif request.method == 'POST':
# 获取分页参数
try:
page = int(request.POST.get('page', 1))
limit = int(request.POST.get('limit', 10))
except ValueError:
page = 1
limit = 10
# 获取所有任务按时间排序
tasks_queryset = VideoGeneration.objects.all().order_by('-created_at')
# 统计不同类型视频数量、状态数量
video_type_count = VideoGeneration.objects.values('slug').annotate(count=Count('slug'))
status_count = {
'completed': tasks_queryset.filter(status='completed').count(),
'in_progress': tasks_queryset.filter(status='in_progress').count(),
'failed': tasks_queryset.filter(status='failed').count(),
}
# 分页
paginator = Paginator(tasks_queryset, limit)
try:
current_page = paginator.page(page)
except (EmptyPage, InvalidPage):
current_page = paginator.page(1)
# 格式化时间字段并构建任务列表
tasks = []
for task in current_page.object_list:
tasks.append({
'id': task.id,
'video_id': task.video_id,
'user_id': task.user_id,
'text': task.text if task.text else '无生成文本',
'slug': task.slug,
'time_duration': task.time_duration if task.time_duration else '无时长',
'video_url': task.video_url,
'created_at': task.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'status': task.status,
})
# 返回适配layui表格的数据格式
data = {
"code": 0,
"msg": "成功",
"count": paginator.count,
"data": tasks,
"video_type_count": list(video_type_count), # 各种类型视频数量
"status_count": status_count # 各个状态数量
}
return JsonResponse(data)
@login_required
@require_http_methods(["GET", "POST"])
@csrf_exempt
def admin_order_list(request):
if request.method == 'GET':
return render(request, 'admin/order-list.html')
elif request.method == 'POST':
# 获取订单列表
order_queryset = Order.objects.all().order_by('-created_at')
# 获取分页参数
page = int(request.POST.get('page', 1))
limit = int(request.POST.get('limit', 10))
# 分页处理
paginator = Paginator(order_queryset, limit)
orders = paginator.get_page(page)
# 转换数据为 JSON 格式
order_data = list(orders.object_list.values('order_id', 'username', 'plan__title', 'amount', 'payment_method', 'status', 'created_at'))
# 手动格式化创建时间字段
for order in order_data:
order['created_at'] = order['created_at'].strftime('%Y-%m-%d %H:%M:%S')
# 翻译支付方式和状态
order['payment_method'] = '支付宝' if order['payment_method'] == 'alipay' else '贝宝'
order['status'] = {
'pending': '待处理',
'completed': '已完成',
'canceled': '已取消',
'failed': '失败'
}.get(order['status'], order['status'])
# 支付方式统计
payment_method_count = Order.objects.values('payment_method').annotate(count=Count('payment_method'))
# 各个状态的订单数量
status_count = {
'completed': Order.objects.filter(status='completed').count(),
'failed': Order.objects.filter(status='failed').count(),
'canceled': Order.objects.filter(status='canceled').count(),
'pending': Order.objects.filter(status='pending').count(),
}
# 支付方式金额统计(仅统计已完成的订单)
total_income_by_payment_method = Order.objects.filter(status='completed').values('payment_method').annotate(
total_income=Sum('amount')
)
# 转换支付方式为中文名称
total_income_by_payment_method = [
{
'payment_method': '支付宝' if method['payment_method'] == 'alipay' else '贝宝',
'total_income': method['total_income']
}
for method in total_income_by_payment_method
]
# 返回数据
data = {
"code": 0,
"msg": "成功",
"count": paginator.count,
"data": order_data,
"payment_method_count": list(payment_method_count),
"status_count": status_count,
"total_income_by_payment_method": total_income_by_payment_method
}
return JsonResponse(data)
@login_required
@require_http_methods(["POST"])
@csrf_exempt
def admin_change_password(request):
user = request.user
data = json.loads(request.body)
old_password = data.get('old_password')
new_password = data.get('new_password')
# 验证旧密码
if not user.check_password(old_password):
return JsonResponse({'code': 400, 'msg': '旧密码错误'})
# 验证新密码是否为空
if not new_password:
return JsonResponse({'code': 400, 'msg': '新密码不能为空'})
# 更新密码
user.set_password(new_password)
user.save()
# 确保更新密码后不会自动退出登录
update_session_auth_hash(request, user)
return JsonResponse({'code': 200, 'msg': '密码修改成功'})
@login_required
@require_http_methods(["POST", "GET"])
def admin_logout(request):
# 退出登录
logout(request)
# 重定向到登录页面
return redirect('admin_login')