first commit

This commit is contained in:
2025-11-17 12:49:59 +08:00
commit 89fd3c8bd9
238 changed files with 51533 additions and 0 deletions

764
generate_pdf.py Normal file
View File

@@ -0,0 +1,764 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
谛听多维报告 PDF 生成器
使用 WeasyPrint 将 HTML 模板转换为 PDF
"""
import json
import re
from pathlib import Path
from typing import Dict, Any, Optional
from jinja2 import Environment, FileSystemLoader, select_autoescape
# 尝试导入 WeasyPrint提供友好的错误提示
try:
from weasyprint import HTML, CSS
from weasyprint.text.fonts import FontConfiguration
WEASYPRINT_AVAILABLE = True
except OSError as e:
if 'libgobject' in str(e) or 'gobject' in str(e).lower():
WEASYPRINT_AVAILABLE = False
WEASYPRINT_ERROR = "WeasyPrint 需要 GTK+ 运行时库。\n\n" \
"Windows 用户推荐使用 Conda 安装(最简单):\n" \
" conda install -c conda-forge weasyprint\n\n" \
"或参考 install_weasyprint_windows.md 文件中的详细安装指南。"
else:
WEASYPRINT_AVAILABLE = False
WEASYPRINT_ERROR = f"WeasyPrint 导入失败: {e}\n\n请参考 install_weasyprint_windows.md 文件。"
except ImportError as e:
WEASYPRINT_AVAILABLE = False
WEASYPRINT_ERROR = f"WeasyPrint 未安装: {e}\n\n请运行: pip install weasyprint"
class ReportDataProcessor:
"""报告数据处理器"""
@staticmethod
def mask_name(name: Optional[str]) -> str:
"""姓名脱敏"""
if not name:
return ''
if len(name) == 1:
return '*'
if len(name) == 2:
return name[0] + '*'
return name[0] + '*' * (len(name) - 2) + name[-1]
@staticmethod
def mask_phone(phone: Optional[str]) -> str:
"""手机号脱敏"""
if not phone:
return ''
if len(phone) == 11:
return phone[:3] + '****' + phone[7:]
return phone
@staticmethod
def mask_id_card(id_card: Optional[str]) -> str:
"""身份证号脱敏"""
if not id_card:
return ''
return re.sub(r'^(.{6})(?:\d+)(.{4})$', r'\1****\2', id_card)
@staticmethod
def format_interval(interval: Optional[str], unit: str = "") -> str:
"""格式化区间表达式"""
if not interval or interval == "-" or interval == "0":
return interval or "-"
try:
# 处理特殊格式,如 "3,6(个月)"
if "(" in interval and ")" in interval:
match = re.match(r'^(\d+(?:,\d+)*)\((.+)\)$', interval)
if match:
numbers = [n.strip() for n in match.group(1).split(",")]
time_unit = match.group(2)
if len(numbers) == 2:
return f"{numbers[0]}-{numbers[1]}{time_unit}"
return f"{', '.join(numbers)}{time_unit}"
# 处理区间表达式
pattern = r'^([\[\(])(\d+(?:\.\d+)?),(\d+(?:\.\d+)?|\+)([\]\)])$'
match = re.match(pattern, interval)
if not match:
return interval
left_bracket, left_value, right_value, right_bracket = match.groups()
is_left_inclusive = left_bracket == "["
is_right_inclusive = right_bracket == "]"
is_right_infinity = right_value == "+"
if is_right_infinity:
if is_left_inclusive:
return f"{left_value}{unit}"
else:
return f">{left_value}{unit}"
else:
left_num = float(left_value)
right_num = float(right_value)
if left_num == right_num:
return f"{int(left_num)}{unit}"
else:
if is_left_inclusive and is_right_inclusive:
return f"{int(left_num)}-{int(right_num)}{unit}"
elif is_left_inclusive and not is_right_inclusive:
return f"{int(left_num)}-{int(right_num) - 1}{unit}"
elif not is_left_inclusive and is_right_inclusive:
return f"{int(left_num) + 1}-{int(right_num)}{unit}"
else:
return f"{int(left_num) + 1}-{int(right_num) - 1}{unit}"
except Exception as e:
print(f"区间格式化失败: {e}, 原数据: {interval}")
return interval
@staticmethod
def format_amount_interval(interval: Optional[str]) -> str:
"""格式化金额区间"""
return ReportDataProcessor.format_interval(interval, "")
@staticmethod
def format_institution_interval(interval: Optional[str]) -> str:
"""格式化机构数量区间"""
return ReportDataProcessor.format_interval(interval, "")
@staticmethod
def get_check_suggest_class(check_suggest: Optional[str]) -> str:
"""获取审核建议样式类"""
suggest = check_suggest or '建议拒绝'
if '拒绝' in suggest:
return 'pdf-value-danger'
elif '通过' in suggest:
return 'pdf-value-success'
else:
return 'pdf-value-warning'
@staticmethod
def get_fraud_risk_level(score: int) -> str:
"""获取反欺诈风险等级"""
if score == -1:
return '未评估'
if score >= 80:
return '高风险'
if score >= 60:
return '中风险'
return '低风险'
@staticmethod
def get_credit_level(score: int) -> str:
"""获取信用等级"""
if score == -1:
return '未评估'
if score >= 800:
return '信用较好'
if score >= 500:
return '信用良好'
return '信用一般'
@staticmethod
def get_fraud_score_bg_class(score: int) -> str:
"""获取反欺诈评分背景样式类"""
if score == -1:
return 'pdf-score-default'
if score >= 80:
return 'pdf-score-high'
if score >= 60:
return 'pdf-score-medium'
return 'pdf-score-low'
@staticmethod
def get_credit_score_bg_class(score: int) -> str:
"""获取信用评分背景样式类"""
if score == -1:
return 'pdf-score-default'
if score >= 800:
return 'pdf-score-low'
if score >= 500:
return 'pdf-score-info'
return 'pdf-score-medium'
@staticmethod
def get_risk_tag_class(level: str) -> str:
"""获取风险标签样式类"""
if level == '高风险':
return 'pdf-tag-danger'
if level == '中风险':
return 'pdf-tag-warning'
if level == '低风险':
return 'pdf-tag-success'
if level == '信用较好':
return 'pdf-tag-success'
if level == '信用良好':
return 'pdf-tag-info'
if level == '信用一般':
return 'pdf-tag-warning'
return 'pdf-tag-default'
@staticmethod
def get_risk_level_class(level: str) -> str:
"""获取风险等级样式类"""
if level == '高风险':
return 'pdf-score-high'
if level == '中风险':
return 'pdf-score-medium'
if level == '低风险':
return 'pdf-score-low'
return 'pdf-score-default'
@staticmethod
def get_risk_flag_text(flag: int) -> str:
"""获取风险标识文本"""
if flag == 1:
return '高风险'
if flag == 2:
return '低风险'
return '未查得'
@staticmethod
def get_risk_flag_tag_class(flag: int) -> str:
"""获取风险标识标签样式类"""
if flag == 1:
return 'pdf-tag-danger'
if flag == 2:
return 'pdf-tag-success'
return 'pdf-tag-default'
@staticmethod
def get_result_text(result: Optional[str]) -> str:
"""获取验证结果文本"""
if result == '一致':
return '核验一致'
if result == '不一致':
return '核验不一致'
return result or '未查得'
@staticmethod
def get_verification_result_class(result: Optional[str]) -> str:
"""获取验证结果样式类"""
if result == '一致':
return 'pdf-result-success'
if result == '不一致':
return 'pdf-result-danger'
return 'pdf-result-default'
@staticmethod
def get_high_risk_count(risk_warning: Dict[str, Any]) -> int:
"""获取高风险数量"""
high_risk_fields = [
'idCardTwoElementMismatch', 'phoneThreeElementMismatch',
'shortPhoneDuration', 'noPhoneDuration',
'hasCriminalRecord', 'isEconomyFront', 'isDisrupSocial', 'isKeyPerson', 'isTrafficRelated',
'hitHighRiskBankLastTwoYears', 'hitHighRiskNonBankLastTwoYears',
'hitCivilCase', 'hitCriminalRisk', 'hitAdministrativeCase', 'hitPreservationReview',
'hitExecutionCase', 'hitBankruptcyAndLiquidation', 'hitDirectlyUnderCase', 'hitCompensationCase',
'frequentApplicationRecent', 'frequentNonBankApplications', 'highDebtPressure', 'frequentBankApplications',
'frequentRentalApplications', 'veryFrequentRentalApplications'
]
return sum(risk_warning.get(field, 0) for field in high_risk_fields)
@staticmethod
def get_middle_risk_count(risk_warning: Dict[str, Any]) -> int:
"""获取中风险数量"""
middle_risk_fields = [
'idCardPhoneProvinceMismatch', 'isAntiFraudInfo',
'hitCurrentOverdue',
'moreFrequentNonBankApplications', 'highFraudGangLevel', 'moreFrequentBankApplications'
]
return sum(risk_warning.get(field, 0) for field in middle_risk_fields)
@staticmethod
def get_all_risks(risk_warning: Dict[str, Any]) -> list:
"""获取所有风险列表"""
risks = []
risk_mapping = {
'idCardTwoElementMismatch': {
'description': '身份证二要素信息对比结果不一致',
'detail': '身份证号与姓名信息不匹配',
'level': '高风险'
},
'phoneThreeElementMismatch': {
'description': '手机三要素简版不一致',
'detail': '手机号与身份证号、姓名信息不匹配',
'level': '高风险'
},
'shortPhoneDuration': {
'description': '手机在网时长极短',
'detail': '手机号在网时间过短,存在风险',
'level': '高风险'
},
'idCardPhoneProvinceMismatch': {
'description': '身份证号手机号归属省不一致',
'detail': '身份证归属地与手机号归属地不匹配',
'level': '中风险'
},
'hasCriminalRecord': {
'description': '该用户有前科',
'detail': '用户存在犯罪前科记录',
'level': '高风险'
},
'isKeyPerson': {
'description': '该用户为重点人员',
'detail': '用户被列为重点监管人员',
'level': '高风险'
},
'hitHighRiskBankLastTwoYears': {
'description': '近两年命中银行高风险',
'detail': '近两年在银行机构存在高风险记录',
'level': '高风险'
},
'hitCurrentOverdue': {
'description': '该用户命中当前逾期',
'detail': '用户当前存在逾期记录',
'level': '中风险'
},
'frequentApplicationRecent': {
'description': '近期申请机构极为频繁',
'detail': '近期在多个机构频繁申请贷款',
'level': '高风险'
}
}
for key, info in risk_mapping.items():
if risk_warning.get(key, 0):
badge_class = 'pdf-tag-danger' if info['level'] == '高风险' else 'pdf-tag-warning'
risks.append({
'key': key,
'description': info['description'],
'detail': info['detail'],
'level': info['level'],
'badge_class': badge_class
})
return risks
@staticmethod
def get_overdue_status_text(status: Optional[str]) -> str:
"""获取逾期状态文本"""
if status == '逾期':
return '逾期'
if status == '未逾期':
return '未逾期'
return '未知'
@staticmethod
def get_overdue_status_tag_class(status: Optional[str]) -> str:
"""获取逾期状态标签样式类"""
if status == '逾期':
return 'pdf-tag-danger'
if status == '未逾期':
return 'pdf-tag-success'
return 'pdf-tag-default'
@staticmethod
def get_overdue_time_text(status: Optional[str]) -> str:
"""获取逾期时间文本"""
if status == '逾期':
return '逾期'
if status == '未逾期':
return '正常'
return '未知'
@staticmethod
def get_overdue_time_class(status: Optional[str]) -> str:
"""获取逾期时间样式类"""
if status == '逾期':
return 'pdf-time-danger'
if status == '未逾期':
return 'pdf-time-success'
return 'pdf-time-default'
def process_judicial_data(judicial_data: Dict[str, Any]) -> Dict[str, Any]:
"""处理司法涉诉数据"""
processor = ReportDataProcessor()
# 处理不同的数据结构
lawsuit_stat = None
breach_case_list = []
consumption_restriction_list = []
# 如果数据在 judicial_data 下
if 'lawsuitStat' in judicial_data:
lawsuit_stat = judicial_data.get('lawsuitStat', {})
breach_case_list = judicial_data.get('breachCaseList', [])
consumption_restriction_list = judicial_data.get('consumptionRestrictionList', [])
# 如果数据在 entout.data 下
elif 'entout' in judicial_data:
entout_data = judicial_data.get('entout', {}).get('data', {})
lawsuit_stat = entout_data
breach_case_list = judicial_data.get('breachCaseList', [])
consumption_restriction_list = judicial_data.get('consumptionRestrictionList', [])
# 如果直接就是 lawsuitStat 结构
elif 'count' in judicial_data or 'civil' in judicial_data:
lawsuit_stat = judicial_data
if not lawsuit_stat:
lawsuit_stat = {}
# 处理案件统计
count = lawsuit_stat.get('count', {})
# 处理各类案件
civil = lawsuit_stat.get('civil', {})
criminal = lawsuit_stat.get('criminal', {})
administrative = lawsuit_stat.get('administrative', {})
implement = lawsuit_stat.get('implement', {})
preservation = lawsuit_stat.get('preservation', {})
bankrupt = lawsuit_stat.get('bankrupt', {})
# 法院曝光台数据(用于替代谛听报告的法院曝光台)
# 从 multCourtInfo 或司法涉诉数据中提取
court_exposure = {
'legal_cases': [],
'execution_cases': [],
'disin_cases': breach_case_list,
'limit_cases': consumption_restriction_list
}
# 如果有执行案件,添加到法院曝光台
if implement and implement.get('cases'):
court_exposure['execution_cases'] = implement.get('cases', [])
# 如果有民事案件,添加到法院曝光台
if civil and civil.get('cases'):
court_exposure['legal_cases'].extend(civil.get('cases', []))
# 如果有刑事案件,添加到法院曝光台
if criminal and criminal.get('cases'):
court_exposure['legal_cases'].extend(criminal.get('cases', []))
return {
'has_data': bool(count or civil or criminal or administrative or implement or preservation or bankrupt or breach_case_list or consumption_restriction_list),
'count': count,
'civil': civil,
'criminal': criminal,
'administrative': administrative,
'implement': implement,
'preservation': preservation,
'bankrupt': bankrupt,
'breach_case_list': breach_case_list,
'consumption_restriction_list': consumption_restriction_list,
'court_exposure': court_exposure
}
def process_report_data(data: Dict[str, Any], judicial_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""处理报告数据,准备模板变量"""
processor = ReportDataProcessor()
base_info = data.get('baseInfo', {})
check_suggest = data.get('checkSuggest', '')
fraud_score = data.get('fraudScore', -1)
credit_score = data.get('creditScore', -1)
verify_rule = data.get('verifyRule', '')
fraud_rule = data.get('fraudRule', '')
element_verification_detail = data.get('elementVerificationDetail', {})
risk_warning = data.get('riskWarning', {})
overdue_risk_product = data.get('overdueRiskProduct', {})
loan_evaluation_verification_detail = data.get('loanEvaluationVerificationDetail', {})
leasing_risk_assessment = data.get('leasingRiskAssessment', {})
risk_supervision = data.get('riskSupervision', {})
# 处理基本信息
base_info_processed = {
'name_masked': processor.mask_name(base_info.get('name')),
'age': base_info.get('age', ''),
'sex': base_info.get('sex', ''),
'phone_masked': processor.mask_phone(base_info.get('phone')),
'id_card_masked': processor.mask_id_card(base_info.get('idCard')),
'location': base_info.get('location', ''),
'phone_area': base_info.get('phoneArea', '')
}
# 处理要素核查
element_verification = None
if element_verification_detail:
sfzeys_flag = element_verification_detail.get('sfzeysFlag', 0)
sjsys_flag = element_verification_detail.get('sjsysFlag', 0)
person_check_details = element_verification_detail.get('personCheckDetails', {})
phone_check_details = element_verification_detail.get('phoneCheckDetails', {})
if person_check_details or phone_check_details:
element_verification = {
'sfzeys_flag': sfzeys_flag,
'sfzeys_flag_text': processor.get_risk_flag_text(sfzeys_flag),
'sfzeys_flag_tag_class': processor.get_risk_flag_tag_class(sfzeys_flag),
'sjsys_flag': sjsys_flag,
'sjsys_flag_text': processor.get_risk_flag_text(sjsys_flag),
'sjsys_flag_tag_class': processor.get_risk_flag_tag_class(sjsys_flag),
'person_check_details': person_check_details,
'phone_check_details': phone_check_details,
'person_result_text': processor.get_result_text(person_check_details.get('result')),
'person_result_class': processor.get_verification_result_class(person_check_details.get('result')),
'phone_result_text': processor.get_result_text(phone_check_details.get('result')),
'phone_result_class': processor.get_verification_result_class(phone_check_details.get('result'))
}
# 处理风险预警
risk_warning_processed = None
if risk_warning:
risks = processor.get_all_risks(risk_warning)
if risks or risk_warning.get('totalRiskCounts'):
risk_warning_processed = {
'has_data': True,
'total_risk_counts': risk_warning.get('totalRiskCounts', 0),
'high_risk_count': processor.get_high_risk_count(risk_warning),
'middle_risk_count': processor.get_middle_risk_count(risk_warning),
'level': risk_warning.get('level', '-'),
'risks': risks
}
# 处理逾期风险
overdue_risk_processed = None
if overdue_risk_product:
has_unsettled_overdue = overdue_risk_product.get('hasUnsettledOverdue')
overdue_risk_processed = {
'has_data': True,
'status_text': processor.get_overdue_status_text(has_unsettled_overdue),
'status_tag_class': processor.get_overdue_status_tag_class(has_unsettled_overdue),
'current_overdue_institution_count': processor.format_institution_interval(
overdue_risk_product.get('currentOverdueInstitutionCount')
),
'current_overdue_amount': processor.format_amount_interval(
overdue_risk_product.get('currentOverdueAmount')
),
'settled_institution_count': processor.format_institution_interval(
overdue_risk_product.get('settledInstitutionCount')
),
'total_loan_institutions': processor.format_institution_interval(
overdue_risk_product.get('totalLoanInstitutions')
),
'time_1day_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast1Day')),
'time_1day_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast1Day')),
'time_7days_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast7Days')),
'time_7days_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast7Days')),
'time_14days_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast14Days')),
'time_14days_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast14Days')),
'time_30days_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast30Days')),
'time_30days_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast30Days'))
}
# 处理借贷评估
loan_evaluation_processed = None
if loan_evaluation_verification_detail:
risk_flag = loan_evaluation_verification_detail.get('riskFlag', 0)
organ_loan_performances = loan_evaluation_verification_detail.get('organLoanPerformances', [])
if organ_loan_performances:
processed_performances = []
for item in organ_loan_performances:
apply_count = item.get('applyCount', '')
type_name = '银行机构' if apply_count == '银行' else '非银机构'
processed_performances.append({
'type_name': type_name,
'last7Day': item.get('last7Day', '0/0'),
'last15Day': item.get('last15Day', '0/0'),
'last1Month': item.get('last1Month', '0/0')
})
loan_evaluation_processed = {
'has_data': True,
'risk_flag': risk_flag,
'risk_flag_text': processor.get_risk_flag_text(risk_flag),
'risk_flag_tag_class': processor.get_risk_flag_tag_class(risk_flag),
'organ_loan_performances': processed_performances
}
# 处理租赁风险评估
leasing_risk_processed = None
if leasing_risk_assessment:
risk_flag = leasing_risk_assessment.get('riskFlag', 0)
leasing_risk_processed = {
'has_data': True,
'risk_flag': risk_flag,
'risk_flag_text': processor.get_risk_flag_text(risk_flag),
'risk_flag_tag_class': processor.get_risk_flag_tag_class(risk_flag),
'institution_total': leasing_risk_assessment.get('threeCInstitutionApplicationCountLast3Days', '0/0'),
'institution_weekend': leasing_risk_assessment.get('threeCInstitutionApplicationCountLast3DaysWeekend', '0/0'),
'institution_night': leasing_risk_assessment.get('threeCInstitutionApplicationCountLast3DaysNight', '0/0'),
'platform_total': leasing_risk_assessment.get('threeCPlatformApplicationCountLast3Days', '0/0'),
'platform_weekend': leasing_risk_assessment.get('threeCPlatformApplicationCountLast3DaysWeekend', '0/0'),
'platform_night': leasing_risk_assessment.get('threeCPlatformApplicationCountLast3DaysNight', '0/0')
}
# 处理运营商核验
operator_verification = None
if element_verification_detail:
online_risk_flag = element_verification_detail.get('onlineRiskFlag', 0)
online_risk_list = element_verification_detail.get('onlineRiskList', {})
phone_vail_risk_flag = element_verification_detail.get('phoneVailRiskFlag', 0)
phone_vail_risks = element_verification_detail.get('phoneVailRisks', {})
belong_risk_flag = element_verification_detail.get('belongRiskFlag', 0)
belong_risks = element_verification_detail.get('belongRisks', {})
if online_risk_list or phone_vail_risks or belong_risks:
operator_verification = {
'has_data': True,
'online_risk_flag': online_risk_flag,
'online_risk_flag_text': processor.get_risk_flag_text(online_risk_flag),
'online_risk_flag_tag_class': processor.get_risk_flag_tag_class(online_risk_flag),
'online_risk_list': online_risk_list,
'phone_vail_risk_flag': phone_vail_risk_flag,
'phone_vail_risk_flag_text': processor.get_risk_flag_text(phone_vail_risk_flag),
'phone_vail_risk_flag_tag_class': processor.get_risk_flag_tag_class(phone_vail_risk_flag),
'phone_vail_risks': phone_vail_risks,
'belong_risk_flag': belong_risk_flag,
'belong_risk_flag_text': processor.get_risk_flag_text(belong_risk_flag),
'belong_risk_flag_tag_class': processor.get_risk_flag_tag_class(belong_risk_flag),
'belong_risks': belong_risks
}
# 处理公安重点人员核验
key_person_verification = None
if element_verification_detail:
high_risk_flag = element_verification_detail.get('highRiskFlag', 0)
key_person_check_list = element_verification_detail.get('keyPersonCheckList', {})
anti_fraud_info = element_verification_detail.get('antiFraudInfo', {})
if key_person_check_list or anti_fraud_info:
key_person_verification = {
'has_data': True,
'high_risk_flag': high_risk_flag,
'high_risk_flag_text': processor.get_risk_flag_text(high_risk_flag),
'high_risk_flag_tag_class': processor.get_risk_flag_tag_class(high_risk_flag),
'key_person_check_list': key_person_check_list,
'anti_fraud_info': anti_fraud_info
}
# 处理法院曝光台(使用司法涉诉数据)
court_exposure_processed = None
if judicial_data:
judicial_processed = process_judicial_data(judicial_data)
if judicial_processed.get('has_data'):
court_exposure_processed = judicial_processed.get('court_exposure', {})
# 处理司法涉诉数据
judicial_processed = None
if judicial_data:
judicial_processed = process_judicial_data(judicial_data)
# 计算风险评分相关
fraud_risk_level = processor.get_fraud_risk_level(fraud_score)
credit_level = processor.get_credit_level(credit_score)
return {
'base_info': base_info_processed,
'check_suggest': check_suggest,
'check_suggest_class': processor.get_check_suggest_class(check_suggest),
'fraud_score': fraud_score,
'fraud_score_display': '未命中' if fraud_score == -1 else str(fraud_score),
'fraud_risk_level': fraud_risk_level,
'fraud_score_bg_class': processor.get_fraud_score_bg_class(fraud_score),
'fraud_risk_tag_class': processor.get_risk_tag_class(fraud_risk_level),
'credit_score': credit_score,
'credit_score_display': '未命中' if credit_score == -1 else str(credit_score),
'credit_level': credit_level,
'credit_score_bg_class': processor.get_credit_score_bg_class(credit_score),
'credit_risk_tag_class': processor.get_risk_tag_class(credit_level),
'verify_rule': verify_rule,
'verify_rule_class': processor.get_risk_level_class(verify_rule),
'verify_rule_tag_class': processor.get_risk_tag_class(verify_rule),
'fraud_rule': fraud_rule,
'fraud_rule_class': processor.get_risk_level_class(fraud_rule),
'fraud_rule_tag_class': processor.get_risk_tag_class(fraud_rule),
'element_verification': element_verification,
'operator_verification': operator_verification,
'key_person_verification': key_person_verification,
'overdue_risk': overdue_risk_processed,
'court_exposure': court_exposure_processed,
'loan_evaluation': loan_evaluation_processed,
'judicial_data': judicial_processed
}
def generate_pdf(data_file: str, output_file: str, template_dir: str = 'templates'):
"""生成 PDF 文件"""
# 检查 WeasyPrint 是否可用
if not WEASYPRINT_AVAILABLE:
print("=" * 60)
print("错误WeasyPrint 不可用")
print("=" * 60)
print(WEASYPRINT_ERROR)
print("=" * 60)
raise RuntimeError("WeasyPrint 未正确安装,请参考错误信息进行安装")
# 读取数据文件
with open(data_file, 'r', encoding='utf-8') as f:
if data_file.endswith('.json'):
json_data = json.load(f)
else:
raise ValueError("不支持的文件格式,请使用 JSON 文件")
# 如果是数组,查找 DWBG8B4D 和司法涉诉数据
report_data = None
judicial_data = None
if isinstance(json_data, list):
for item in json_data:
api_id = item.get('data', {}).get('apiID', '')
if api_id in ['DWBG8B4D', 'CDWBG8B4D']:
report_data = item.get('data', {}).get('data', {})
elif api_id in ['FLXG7E8F', 'FLXG0V4B', 'CFLXG0V4B']:
# 司法涉诉数据可能在 data.data.judicial_data 或 data.data.entout
data_content = item.get('data', {}).get('data', {})
if 'judicial_data' in data_content:
judicial_data = data_content.get('judicial_data', {})
elif 'entout' in data_content:
judicial_data = data_content
else:
judicial_data = data_content
else:
report_data = json_data
if not report_data:
raise ValueError("未找到 DWBG8B4D 数据")
# 处理数据
template_vars = process_report_data(report_data, judicial_data)
# 加载模板
env = Environment(
loader=FileSystemLoader(template_dir),
autoescape=select_autoescape(['html', 'xml'])
)
template = env.get_template('report_template.html')
# 渲染 HTML
html_content = template.render(**template_vars)
# 生成 PDF
font_config = FontConfiguration()
html_doc = HTML(string=html_content)
html_doc.write_pdf(output_file, font_config=font_config)
print(f"PDF 已生成: {output_file}")
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print("用法: python generate_pdf.py <数据文件> [输出文件]")
print("示例: python generate_pdf.py public/example.json output.pdf")
sys.exit(1)
data_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else 'report.pdf'
# 确保模板目录存在
template_dir = Path('templates')
if not template_dir.exists():
template_dir.mkdir()
print(f"已创建模板目录: {template_dir}")
try:
generate_pdf(data_file, output_file)
except Exception as e:
print(f"生成 PDF 失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)