Ai_Admin/MyWeb/views.py
2024-06-05 05:10:50 +08:00

610 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
import json
import random
import time
import uuid
from datetime import datetime
from django.db.models import Q
from django.http import JsonResponse, HttpResponseRedirect, HttpResponse
from django.contrib.auth.hashers import check_password
from MyApi.models import Administrator, Copywriting, User, FriendRequest, ApiCallLog, MembershipType, RedemptionCard, \
TransactionLog
from django.contrib.auth import login as django_login, logout as django_logout, login
from django.shortcuts import render, redirect
from .decorators import admin_login_required
from django.contrib.auth import logout
from django.core.paginator import Paginator, PageNotAnInteger, EmptyPage
from django.db import transaction
import Levenshtein
from urllib.parse import unquote, quote
def admin_login(request):
# 检查是否已登录
if 'admin_id' in request.session:
return redirect('admin_dashboard') # 使用视图的名称执行重定向
if request.method == 'POST':
data = json.loads(request.body)
username = data.get('username')
password = data.get('password')
try:
user = Administrator.objects.get(username=username)
except Administrator.DoesNotExist:
return JsonResponse({'status': 'error', 'message': '用户不存在'})
# 检查密码是否正确
if user and check_password(password, user.password):
if user.is_active:
# 登录成功,设置会话
request.session['role'] = user.role
request.session['admin_id'] = user.id
return JsonResponse({'status': 'success', 'message': '登录成功'})
else:
return JsonResponse({'status': 'error', 'message': '用户已被禁用'})
else:
return JsonResponse({'status': 'error', 'message': '密码错误'})
else:
# 非POST请求返回登录页面
return render(request, 'login.html')
@admin_login_required
def admin_dashboard(request):
# 从session获取管理员ID
admin_id = request.session.get('admin_id')
# 使用管理员ID查询管理员信息
try:
admin = Administrator.objects.get(id=admin_id)
except Administrator.DoesNotExist:
# 如果没有找到管理员,可以重定向到登录页面或显示错误信息
return HttpResponse('管理员不存在', status=404)
# 将管理员信息添加到上下文中
# context = {
# 'admin_username': admin.username,
# 'admin_role': admin.role,
# # 添加更多管理员信息到上下文中,根据需要
# }
return render(request, 'index.html', {'username':admin.username})
def admin_logout(request):
# 登出用户
logout(request)
# 重定向到登录页面,或者你希望用户登出后看到的页面
return redirect('admin_login')
@admin_login_required
# 小程序用户管理页面
def miniapp_users(request):
if request.method == "POST":
data = json.loads(request.body)
action = data.get('action')
if action == 'fetch':
# 分页获取小程序用户数据
users_list = User.objects.all().order_by('-created_at')
page = data.get('page', 1)
paginator = Paginator(users_list, 10) # 每页显示10条记录
try:
users = paginator.page(page)
except PageNotAnInteger:
users = paginator.page(1)
except EmptyPage:
users = paginator.page(paginator.num_pages)
users_data = list(users.object_list.values(
'openid', 'wxid','wechat_number', 'nickname', 'gender', 'region', 'email',
'phone', 'scene', 'is_member', 'member_start_time',
'member_end_time', 'is_active', 'usage_count', 'created_at'
))
return JsonResponse({
'data': users_data,
'code': 0,
'count': paginator.count,
'msg': ''
})
elif action == 'add':
print('添加')
elif action == 'edit':
# 编辑用户信息
user_id = data.get('id')
try:
user = User.objects.get(id=user_id)
user.wxid = data.get('wxid', user.wxid)
user.nickname = data.get('nickname', user.nickname)
user.gender = data.get('gender', user.gender)
user.region = data.get('region', user.region)
user.email = data.get('email', user.email)
user.phone = data.get('phone', user.phone)
user.scene = data.get('scene', user.scene)
user.is_member = data.get('is_member', user.is_member)
user.member_start_time = data.get('member_start_time', user.member_start_time)
user.member_end_time = data.get('member_end_time', user.member_end_time)
user.is_active = data.get('is_active', user.is_active)
user.usage_count = data.get('usage_count', user.usage_count)
user.save()
return JsonResponse({'status': 'success', 'msg': '用户信息更新成功'})
except User.DoesNotExist:
return JsonResponse({'status': 'error', 'msg': '用户不存在'})
elif action == 'delete':
# 删除用户
user_id = data.get('id')
User.objects.filter(id=user_id).delete()
return JsonResponse({'status': 'success', 'msg': '用户删除成功'})
return render(request, 'miniapp_users.html')
@admin_login_required
# 微信用户管理页面
def wechat_users(request):
if request.method == 'POST':
data = json.loads(request.body)
action = data.get('action')
if action == 'fetch':
# 处理获取好友请求记录的逻辑
page_number = data.get('page', 1)
limit = data.get('limit', 10)
search_keyword = data.get('search', '')
friend_requests_query = FriendRequest.objects.filter(
Q(fromnickname__icontains=search_keyword) |
Q(alias__icontains=search_keyword) |
Q(fromusername__icontains=search_keyword)
).order_by('-time')
paginator = Paginator(friend_requests_query, limit)
try:
friend_requests_page = paginator.page(page_number)
except PageNotAnInteger:
friend_requests_page = paginator.page(1)
except EmptyPage:
friend_requests_page = paginator.page(paginator.num_pages)
friend_requests_list = list(friend_requests_page.object_list.values(
'id', 'fromusername', 'alias', 'fromnickname', 'country',
'province', 'city', 'sex', 'scene', 'time', 'towxid'
))
return JsonResponse({
'code': 0,
'msg': '',
'count': paginator.count,
'data': friend_requests_list
})
elif action == 'add':
# 处理添加好友请求记录的逻辑
pass # 实现添加逻辑
elif action == 'edit':
# 处理编辑好友请求记录的逻辑
pass # 实现编辑逻辑
elif action == 'delete':
# 处理删除好友请求记录的逻辑
fr_id = data.get('id')
FriendRequest.objects.filter(id=fr_id).delete()
return JsonResponse({'status': 'success', 'msg': '记录删除成功'})
return render(request, 'wechat_users.html')
@admin_login_required
# 其他来源用户管理页面
def other_sources(request):
# 实现具体逻辑
return render(request, 'other_sources.html')
@admin_login_required
# 网站管理员管理页面
def site_admins(request):
# 实现具体逻辑
return render(request, 'site_admins.html')
@admin_login_required
# 视频提取记录管理页面
def video_extraction_records(request):
# 实现具体逻辑
return render(request, 'video_extraction_records.html')
@admin_login_required
# 文案提取记录管理页面
def copy_extraction_records(request):
# 实现具体逻辑
return render(request, 'copy_extraction_records.html')
@admin_login_required
# AI接口使用情况页面
def ai_api_usage(request):
# 实现具体逻辑
return render(request, 'ai_api_usage.html')
@admin_login_required
# 网站设置页面
def website_settings(request):
# 实现具体逻辑
return render(request, 'website_settings.html')
@admin_login_required
# 小程序设置页面
def miniapp_settings(request):
# 实现具体逻辑
return render(request, 'miniapp_settings.html')
@admin_login_required
# 后台主页
def main(request):
# 实现具体逻辑
return render(request, 'main.html')
# 文案库页面
@admin_login_required
def copywriting_library(request):
if request.method == "POST":
print(request.body)
data = json.loads(request.body)
action = data.get('action', '')
print(data)
if action == 'fetch':
# 分页参数
page = data.get('page', 1)
limit = data.get('limit', 10)
search = data.get('search', '')
search = quote(search)
# 过滤和排序Q(text_content__icontains=search_query)
queryset = Copywriting.objects.filter(Q(text_content__icontains=search)).order_by('-added_time')
paginator = Paginator(queryset, limit)
try:
copywritings = paginator.page(page)
except PageNotAnInteger:
copywritings = paginator.page(1)
except EmptyPage:
copywritings = paginator.page(paginator.num_pages)
copywriting_list = list(
copywritings.object_list.values('id', 'text_content', 'source', 'popularity','tag','is_approved', 'added_time'))
return JsonResponse({
"code": 0,
"msg": "",
"count": paginator.count,
"data": copywriting_list
})
elif action == 'add':
# 添加文案
text_content = data.get('text_content', '')
source = data.get('source', '')
tag = data.get('tag', '')
popularity = data.get('popularity', 0)
copywriting = Copywriting(
text_content=text_content,
source=source,
tag=tag,
popularity=popularity
)
copywriting.save()
return JsonResponse({"status": "success", "msg": "文案添加成功"})
elif action == 'batch-delete':
# 批量删除文案
ids = data.get('ids', [])
Copywriting.objects.filter(id__in=ids).delete()
return JsonResponse({"status": "success", "msg": "文案批量删除成功"})
elif action == 'delete':
# 批量删除文案
id = data.get('id')
Copywriting.objects.filter(id=id).delete()
return JsonResponse({"status": "success", "msg": "文案批量删除成功"})
elif action == 'edit':
# 获取请求中的数据
copywriting_id = data.get('id')
text_content = data.get('text_content')
source = data.get('source')
tag = data.get('tag')
try:
# 根据ID查找要编辑的文案实例
copywriting = Copywriting.objects.get(id=copywriting_id)
# 更新文案实例的字段
copywriting.text_content = text_content
copywriting.source = source
copywriting.tag = tag
copywriting.save() # 保存更改到数据库
return JsonResponse({'status': 'success', 'message': '文案更新成功'})
except Copywriting.DoesNotExist:
return JsonResponse({'status': 'error', 'message': '文案未找到'})
elif action == 'approve':
try:
copywriting_id = data.get('id')
copywriting = Copywriting.objects.get(id=copywriting_id)
copywriting.is_approved = 1
copywriting.save() # 保存更改到数据库
return JsonResponse({'status': 'success', 'message': '文案审核成功'})
except Copywriting.DoesNotExist:
return JsonResponse({'status': 'error', 'message': '文案未找到'})
# 对于非POST请求或者没有指定操作的POST请求返回页面
return render(request, 'copywriting_library.html')
@admin_login_required
# 文章记录页面
def article_records(request):
# 这里实现具体的视图逻辑
return render(request, 'article_records.html')
@admin_login_required
# 素材库页面
def material_library(request):
# 这里实现具体的视图逻辑
return render(request, 'material_library.html')
#删除重复文案
def get_similar_texts(threshold=0.8):
# 假设threshold为相似度阈值
all_texts = Copywriting.objects.filter(is_approved=0) # 仅考虑已审核的文案
print(len(all_texts))
to_delete = []
# 将查询到的文案内容解码
decoded_texts = [(text, unquote(text.text_content)) for text in all_texts]
for i, (text1, decoded_content1) in enumerate(decoded_texts):
for text2, decoded_content2 in decoded_texts[i + 1:]:
# 计算解码后的文本之间的相似度
if Levenshtein.ratio(decoded_content1, decoded_content2) > threshold:
# 如果相似度大于阈值,选择热度较低的文案删除
if text1.popularity <= text2.popularity:
to_delete.append(text1)
else:
to_delete.append(text2)
return set(to_delete) # 使用集合避免重复
@transaction.atomic
def delete_similar_texts():
to_delete = get_similar_texts()
deleted_count = len(to_delete) # 计算删除对象的数量
for text in to_delete:
text.delete() # 删除操作
return deleted_count # 返回删除的行数
def delete_similar_texts_view(request):
deleted_count = delete_similar_texts()
print(f"Deleted {deleted_count} similar texts.")
# 返回结果
return JsonResponse({'message': f'Deleted {deleted_count} similar texts.'})
def generate_redemption_code(prefix):
# 生成唯一的 UUID
unique_id = uuid.uuid4()
# 获取当前时间戳
current_timestamp = int(time.time())
# 使用 MD5 算法生成散列值
hash_object = hashlib.md5(unique_id.bytes + str(current_timestamp).encode())
hash_hex = hash_object.hexdigest()
# 将前缀和散列值的前8位拼接起来作为卡密
redemption_code = f"{prefix}{hash_hex[:8]}"
return redemption_code
#卡密管理
@admin_login_required
def redemption_card_list(request):
if request.method == "POST":
data = json.loads(request.body)
action = data.get('action', '')
if action == 'fetch':
# 获取分页参数
page = int(data.get('page', 1))
limit = int(data.get('limit', 10))
search = data.get('search', '')
search = quote(search)
# 过滤和排序
queryset = RedemptionCard.objects.filter().order_by('-card_creation_time')
paginator = Paginator(queryset, limit)
try:
redemptionCard = paginator.page(page)
except PageNotAnInteger:
redemptionCard = paginator.page(1)
except EmptyPage:
redemptionCard = paginator.page(paginator.num_pages)
copywriting_list = list(
redemptionCard.object_list.values('id', 'code', 'card_type', 'created_date', 'expiry_date', 'is_used',
'used_by_openid','used_by_nickname','card_creation_time','validity_period'))
return JsonResponse({
"code": 0,
"message": "ok",
"count": paginator.count,
"data": copywriting_list
})
elif action == 'edit':
try:
redemptionCard_id = data.get('id')
redemptionCard = RedemptionCard.objects.get(id=redemptionCard_id)
# 更新文案
# 这里添加更新文案的逻辑
return JsonResponse({'status': 'success', 'message': '更新成功'})
except RedemptionCard.DoesNotExist:
return JsonResponse({'status': 'error', 'message': '卡密未找到'})
elif action == 'add':
try:
# 解析请求数据
code_prefix = data.get('code_prefix', '')
card_type = data.get('card_type', '')
quantity = int(data.get('quantity', 0))
expiry_date = data.get('expiry_date', '')
validity_period = int(data.get('validity_period',0))
# 添加卡密逻辑
for _ in range(quantity):
card = RedemptionCard.objects.create(
code=generate_redemption_code(code_prefix),
card_type=card_type,
expiry_date=expiry_date,
validity_period=validity_period
)
return JsonResponse({'status': 'success', 'message': '卡密添加成功'})
except Exception as e:
return JsonResponse({'status': 'error', 'message': str(e)})
elif action == 'delete':
try:
redemptionCard_id = data.get('id')
redemptionCard = RedemptionCard.objects.get(id=redemptionCard_id)
redemptionCard.delete() # 删除单个卡密记录
return JsonResponse({'status': 'success', 'message': '卡密删除成功'})
except RedemptionCard.DoesNotExist:
return JsonResponse({'status': 'error', 'message': '卡密未找到'})
elif action == 'batch_delete':
try:
ids = data.get('ids', [])
RedemptionCard.objects.filter(id__in=ids).delete() # 批量删除卡密记录
return JsonResponse({'status': 'success', 'message': '批量删除卡密成功'})
except Exception as e:
return JsonResponse({'status': 'error', 'message': str(e)})
else:
return JsonResponse({
"code": 400,
"msg": "Invalid action parameter.",
})
return render(request, 'redemption_card.html')
# 会员配置
@admin_login_required
def membership_type_list(request):
if request.method == 'POST':
data = json.loads(request.body)
action = data['action']
if action == 'fetch': # 读取会员卡类型列表
is_quota = data.get('is_quota', None)
if is_quota is not None:
membership_types = MembershipType.objects.filter(is_quota=is_quota)
else:
membership_types = MembershipType.objects.all()
# 将会员卡类型列表序列化为JSON格式
membership_types_data = [
{
'id': m.id,
'type': m.type,
'title': m.title,
'description': m.description,
'price': m.price,
'duration_days': m.duration_days,
'coins': m.coins,
'is_quota': m.is_quota
} for m in membership_types
]
return JsonResponse({"code": 0, 'data': membership_types_data})
elif action == 'add': # 添加会员卡类型
# 获取前端传递过来的数据
membership_type = MembershipType(
type=data.get('type'),
title=data.get('title'),
description=data.get('description'),
price=data.get('price'),
duration_days=data.get('duration_days'),
coins=data.get('coins'),
is_quota=data.get('is_quota')
)
membership_type.save()
return JsonResponse({'status': 'success', 'message': '会员卡类型添加成功'})
elif action == 'edit': # 编辑会员卡类型
id = int(data['id'])
# 根据id获取要修改的MembershipType对象
membership_type = MembershipType.objects.get(id=id)
# 获取前端传递过来的数据
membership_type.title = data.get('title')
membership_type.description = data.get('description')
membership_type.price = data.get('price')
membership_type.duration_days = data.get('duration_days')
membership_type.coins = data.get('coins')
membership_type.is_quota = data.get('is_quota')
membership_type.save()
return JsonResponse({'status': 'success', 'message': '会员卡类型修改成功'})
elif action == 'delete': # 删除会员卡类型
id = int(data['id'])
# 根据id删除对应的MembershipType对象
MembershipType.objects.filter(id=id).delete()
return JsonResponse({'status': 'success', 'message': '会员卡类型删除成功'})
return render(request, 'membership-types.html')
# 订单记录
@admin_login_required
def transaction_log_list(request):
if request.method == "POST":
data = json.loads(request.body)
action = data.get('action', '')
if action == 'fetch':
page = data.get('page', 1)
limit = data.get('limit', 10)
queryset = TransactionLog.objects.all().order_by('-created_at')
paginator = Paginator(queryset, limit)
try:
transaction_logs = paginator.page(page)
except PageNotAnInteger:
transaction_logs = paginator.page(1)
except EmptyPage:
transaction_logs = paginator.page(paginator.num_pages)
transaction_log_list = list(
transaction_logs.object_list.values(
'id',
'transaction_no',
'transaction_status',
'user_openid',
'transaction_type',
'transaction_amount',
'remark',
'created_at',
'updated_at'
)
)
return JsonResponse({
"code": 0,
"msg": "",
"count": paginator.count,
"data": transaction_log_list
})
elif action == 'delete':
try:
id = data.get('id')
transaction_log = TransactionLog.objects.get(id=id)
transaction_log.delete()
return JsonResponse({'status': 'success', 'message': '订单记录删除成功'})
except TransactionLog.DoesNotExist:
return JsonResponse({'status': 'error', 'message': '订单记录未找到'})
elif action == 'batch_delete':
try:
ids = data.get('ids', [])
TransactionLog.objects.filter(id__in=ids).delete()
return JsonResponse({'status': 'success', 'message': '批量删除订单记录成功'})
except Exception as e:
return JsonResponse({'status': 'error', 'message': str(e)})
return render(request, 'transaction-logs.html')