Files
report_viewer/generate_pdf.py
2025-11-17 12:49:59 +08:00

765 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
谛听多维报告 PDF 生成器
使用 WeasyPrint 将 HTML 模板转换为 PDF
"""
import json
import re
from pathlib import Path
from typing import Dict, Any, Optional
from jinja2 import Environment, FileSystemLoader, select_autoescape
# 尝试导入 WeasyPrint提供友好的错误提示
try:
from weasyprint import HTML, CSS
from weasyprint.text.fonts import FontConfiguration
WEASYPRINT_AVAILABLE = True
except OSError as e:
if 'libgobject' in str(e) or 'gobject' in str(e).lower():
WEASYPRINT_AVAILABLE = False
WEASYPRINT_ERROR = "WeasyPrint 需要 GTK+ 运行时库。\n\n" \
"Windows 用户推荐使用 Conda 安装(最简单):\n" \
" conda install -c conda-forge weasyprint\n\n" \
"或参考 install_weasyprint_windows.md 文件中的详细安装指南。"
else:
WEASYPRINT_AVAILABLE = False
WEASYPRINT_ERROR = f"WeasyPrint 导入失败: {e}\n\n请参考 install_weasyprint_windows.md 文件。"
except ImportError as e:
WEASYPRINT_AVAILABLE = False
WEASYPRINT_ERROR = f"WeasyPrint 未安装: {e}\n\n请运行: pip install weasyprint"
class ReportDataProcessor:
"""报告数据处理器"""
@staticmethod
def mask_name(name: Optional[str]) -> str:
"""姓名脱敏"""
if not name:
return ''
if len(name) == 1:
return '*'
if len(name) == 2:
return name[0] + '*'
return name[0] + '*' * (len(name) - 2) + name[-1]
@staticmethod
def mask_phone(phone: Optional[str]) -> str:
"""手机号脱敏"""
if not phone:
return ''
if len(phone) == 11:
return phone[:3] + '****' + phone[7:]
return phone
@staticmethod
def mask_id_card(id_card: Optional[str]) -> str:
"""身份证号脱敏"""
if not id_card:
return ''
return re.sub(r'^(.{6})(?:\d+)(.{4})$', r'\1****\2', id_card)
@staticmethod
def format_interval(interval: Optional[str], unit: str = "") -> str:
"""格式化区间表达式"""
if not interval or interval == "-" or interval == "0":
return interval or "-"
try:
# 处理特殊格式,如 "3,6(个月)"
if "(" in interval and ")" in interval:
match = re.match(r'^(\d+(?:,\d+)*)\((.+)\)$', interval)
if match:
numbers = [n.strip() for n in match.group(1).split(",")]
time_unit = match.group(2)
if len(numbers) == 2:
return f"{numbers[0]}-{numbers[1]}{time_unit}"
return f"{', '.join(numbers)}{time_unit}"
# 处理区间表达式
pattern = r'^([\[\(])(\d+(?:\.\d+)?),(\d+(?:\.\d+)?|\+)([\]\)])$'
match = re.match(pattern, interval)
if not match:
return interval
left_bracket, left_value, right_value, right_bracket = match.groups()
is_left_inclusive = left_bracket == "["
is_right_inclusive = right_bracket == "]"
is_right_infinity = right_value == "+"
if is_right_infinity:
if is_left_inclusive:
return f"{left_value}{unit}"
else:
return f">{left_value}{unit}"
else:
left_num = float(left_value)
right_num = float(right_value)
if left_num == right_num:
return f"{int(left_num)}{unit}"
else:
if is_left_inclusive and is_right_inclusive:
return f"{int(left_num)}-{int(right_num)}{unit}"
elif is_left_inclusive and not is_right_inclusive:
return f"{int(left_num)}-{int(right_num) - 1}{unit}"
elif not is_left_inclusive and is_right_inclusive:
return f"{int(left_num) + 1}-{int(right_num)}{unit}"
else:
return f"{int(left_num) + 1}-{int(right_num) - 1}{unit}"
except Exception as e:
print(f"区间格式化失败: {e}, 原数据: {interval}")
return interval
@staticmethod
def format_amount_interval(interval: Optional[str]) -> str:
"""格式化金额区间"""
return ReportDataProcessor.format_interval(interval, "")
@staticmethod
def format_institution_interval(interval: Optional[str]) -> str:
"""格式化机构数量区间"""
return ReportDataProcessor.format_interval(interval, "")
@staticmethod
def get_check_suggest_class(check_suggest: Optional[str]) -> str:
"""获取审核建议样式类"""
suggest = check_suggest or '建议拒绝'
if '拒绝' in suggest:
return 'pdf-value-danger'
elif '通过' in suggest:
return 'pdf-value-success'
else:
return 'pdf-value-warning'
@staticmethod
def get_fraud_risk_level(score: int) -> str:
"""获取反欺诈风险等级"""
if score == -1:
return '未评估'
if score >= 80:
return '高风险'
if score >= 60:
return '中风险'
return '低风险'
@staticmethod
def get_credit_level(score: int) -> str:
"""获取信用等级"""
if score == -1:
return '未评估'
if score >= 800:
return '信用较好'
if score >= 500:
return '信用良好'
return '信用一般'
@staticmethod
def get_fraud_score_bg_class(score: int) -> str:
"""获取反欺诈评分背景样式类"""
if score == -1:
return 'pdf-score-default'
if score >= 80:
return 'pdf-score-high'
if score >= 60:
return 'pdf-score-medium'
return 'pdf-score-low'
@staticmethod
def get_credit_score_bg_class(score: int) -> str:
"""获取信用评分背景样式类"""
if score == -1:
return 'pdf-score-default'
if score >= 800:
return 'pdf-score-low'
if score >= 500:
return 'pdf-score-info'
return 'pdf-score-medium'
@staticmethod
def get_risk_tag_class(level: str) -> str:
"""获取风险标签样式类"""
if level == '高风险':
return 'pdf-tag-danger'
if level == '中风险':
return 'pdf-tag-warning'
if level == '低风险':
return 'pdf-tag-success'
if level == '信用较好':
return 'pdf-tag-success'
if level == '信用良好':
return 'pdf-tag-info'
if level == '信用一般':
return 'pdf-tag-warning'
return 'pdf-tag-default'
@staticmethod
def get_risk_level_class(level: str) -> str:
"""获取风险等级样式类"""
if level == '高风险':
return 'pdf-score-high'
if level == '中风险':
return 'pdf-score-medium'
if level == '低风险':
return 'pdf-score-low'
return 'pdf-score-default'
@staticmethod
def get_risk_flag_text(flag: int) -> str:
"""获取风险标识文本"""
if flag == 1:
return '高风险'
if flag == 2:
return '低风险'
return '未查得'
@staticmethod
def get_risk_flag_tag_class(flag: int) -> str:
"""获取风险标识标签样式类"""
if flag == 1:
return 'pdf-tag-danger'
if flag == 2:
return 'pdf-tag-success'
return 'pdf-tag-default'
@staticmethod
def get_result_text(result: Optional[str]) -> str:
"""获取验证结果文本"""
if result == '一致':
return '核验一致'
if result == '不一致':
return '核验不一致'
return result or '未查得'
@staticmethod
def get_verification_result_class(result: Optional[str]) -> str:
"""获取验证结果样式类"""
if result == '一致':
return 'pdf-result-success'
if result == '不一致':
return 'pdf-result-danger'
return 'pdf-result-default'
@staticmethod
def get_high_risk_count(risk_warning: Dict[str, Any]) -> int:
"""获取高风险数量"""
high_risk_fields = [
'idCardTwoElementMismatch', 'phoneThreeElementMismatch',
'shortPhoneDuration', 'noPhoneDuration',
'hasCriminalRecord', 'isEconomyFront', 'isDisrupSocial', 'isKeyPerson', 'isTrafficRelated',
'hitHighRiskBankLastTwoYears', 'hitHighRiskNonBankLastTwoYears',
'hitCivilCase', 'hitCriminalRisk', 'hitAdministrativeCase', 'hitPreservationReview',
'hitExecutionCase', 'hitBankruptcyAndLiquidation', 'hitDirectlyUnderCase', 'hitCompensationCase',
'frequentApplicationRecent', 'frequentNonBankApplications', 'highDebtPressure', 'frequentBankApplications',
'frequentRentalApplications', 'veryFrequentRentalApplications'
]
return sum(risk_warning.get(field, 0) for field in high_risk_fields)
@staticmethod
def get_middle_risk_count(risk_warning: Dict[str, Any]) -> int:
"""获取中风险数量"""
middle_risk_fields = [
'idCardPhoneProvinceMismatch', 'isAntiFraudInfo',
'hitCurrentOverdue',
'moreFrequentNonBankApplications', 'highFraudGangLevel', 'moreFrequentBankApplications'
]
return sum(risk_warning.get(field, 0) for field in middle_risk_fields)
@staticmethod
def get_all_risks(risk_warning: Dict[str, Any]) -> list:
"""获取所有风险列表"""
risks = []
risk_mapping = {
'idCardTwoElementMismatch': {
'description': '身份证二要素信息对比结果不一致',
'detail': '身份证号与姓名信息不匹配',
'level': '高风险'
},
'phoneThreeElementMismatch': {
'description': '手机三要素简版不一致',
'detail': '手机号与身份证号、姓名信息不匹配',
'level': '高风险'
},
'shortPhoneDuration': {
'description': '手机在网时长极短',
'detail': '手机号在网时间过短,存在风险',
'level': '高风险'
},
'idCardPhoneProvinceMismatch': {
'description': '身份证号手机号归属省不一致',
'detail': '身份证归属地与手机号归属地不匹配',
'level': '中风险'
},
'hasCriminalRecord': {
'description': '该用户有前科',
'detail': '用户存在犯罪前科记录',
'level': '高风险'
},
'isKeyPerson': {
'description': '该用户为重点人员',
'detail': '用户被列为重点监管人员',
'level': '高风险'
},
'hitHighRiskBankLastTwoYears': {
'description': '近两年命中银行高风险',
'detail': '近两年在银行机构存在高风险记录',
'level': '高风险'
},
'hitCurrentOverdue': {
'description': '该用户命中当前逾期',
'detail': '用户当前存在逾期记录',
'level': '中风险'
},
'frequentApplicationRecent': {
'description': '近期申请机构极为频繁',
'detail': '近期在多个机构频繁申请贷款',
'level': '高风险'
}
}
for key, info in risk_mapping.items():
if risk_warning.get(key, 0):
badge_class = 'pdf-tag-danger' if info['level'] == '高风险' else 'pdf-tag-warning'
risks.append({
'key': key,
'description': info['description'],
'detail': info['detail'],
'level': info['level'],
'badge_class': badge_class
})
return risks
@staticmethod
def get_overdue_status_text(status: Optional[str]) -> str:
"""获取逾期状态文本"""
if status == '逾期':
return '逾期'
if status == '未逾期':
return '未逾期'
return '未知'
@staticmethod
def get_overdue_status_tag_class(status: Optional[str]) -> str:
"""获取逾期状态标签样式类"""
if status == '逾期':
return 'pdf-tag-danger'
if status == '未逾期':
return 'pdf-tag-success'
return 'pdf-tag-default'
@staticmethod
def get_overdue_time_text(status: Optional[str]) -> str:
"""获取逾期时间文本"""
if status == '逾期':
return '逾期'
if status == '未逾期':
return '正常'
return '未知'
@staticmethod
def get_overdue_time_class(status: Optional[str]) -> str:
"""获取逾期时间样式类"""
if status == '逾期':
return 'pdf-time-danger'
if status == '未逾期':
return 'pdf-time-success'
return 'pdf-time-default'
def process_judicial_data(judicial_data: Dict[str, Any]) -> Dict[str, Any]:
"""处理司法涉诉数据"""
processor = ReportDataProcessor()
# 处理不同的数据结构
lawsuit_stat = None
breach_case_list = []
consumption_restriction_list = []
# 如果数据在 judicial_data 下
if 'lawsuitStat' in judicial_data:
lawsuit_stat = judicial_data.get('lawsuitStat', {})
breach_case_list = judicial_data.get('breachCaseList', [])
consumption_restriction_list = judicial_data.get('consumptionRestrictionList', [])
# 如果数据在 entout.data 下
elif 'entout' in judicial_data:
entout_data = judicial_data.get('entout', {}).get('data', {})
lawsuit_stat = entout_data
breach_case_list = judicial_data.get('breachCaseList', [])
consumption_restriction_list = judicial_data.get('consumptionRestrictionList', [])
# 如果直接就是 lawsuitStat 结构
elif 'count' in judicial_data or 'civil' in judicial_data:
lawsuit_stat = judicial_data
if not lawsuit_stat:
lawsuit_stat = {}
# 处理案件统计
count = lawsuit_stat.get('count', {})
# 处理各类案件
civil = lawsuit_stat.get('civil', {})
criminal = lawsuit_stat.get('criminal', {})
administrative = lawsuit_stat.get('administrative', {})
implement = lawsuit_stat.get('implement', {})
preservation = lawsuit_stat.get('preservation', {})
bankrupt = lawsuit_stat.get('bankrupt', {})
# 法院曝光台数据(用于替代谛听报告的法院曝光台)
# 从 multCourtInfo 或司法涉诉数据中提取
court_exposure = {
'legal_cases': [],
'execution_cases': [],
'disin_cases': breach_case_list,
'limit_cases': consumption_restriction_list
}
# 如果有执行案件,添加到法院曝光台
if implement and implement.get('cases'):
court_exposure['execution_cases'] = implement.get('cases', [])
# 如果有民事案件,添加到法院曝光台
if civil and civil.get('cases'):
court_exposure['legal_cases'].extend(civil.get('cases', []))
# 如果有刑事案件,添加到法院曝光台
if criminal and criminal.get('cases'):
court_exposure['legal_cases'].extend(criminal.get('cases', []))
return {
'has_data': bool(count or civil or criminal or administrative or implement or preservation or bankrupt or breach_case_list or consumption_restriction_list),
'count': count,
'civil': civil,
'criminal': criminal,
'administrative': administrative,
'implement': implement,
'preservation': preservation,
'bankrupt': bankrupt,
'breach_case_list': breach_case_list,
'consumption_restriction_list': consumption_restriction_list,
'court_exposure': court_exposure
}
def process_report_data(data: Dict[str, Any], judicial_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""处理报告数据,准备模板变量"""
processor = ReportDataProcessor()
base_info = data.get('baseInfo', {})
check_suggest = data.get('checkSuggest', '')
fraud_score = data.get('fraudScore', -1)
credit_score = data.get('creditScore', -1)
verify_rule = data.get('verifyRule', '')
fraud_rule = data.get('fraudRule', '')
element_verification_detail = data.get('elementVerificationDetail', {})
risk_warning = data.get('riskWarning', {})
overdue_risk_product = data.get('overdueRiskProduct', {})
loan_evaluation_verification_detail = data.get('loanEvaluationVerificationDetail', {})
leasing_risk_assessment = data.get('leasingRiskAssessment', {})
risk_supervision = data.get('riskSupervision', {})
# 处理基本信息
base_info_processed = {
'name_masked': processor.mask_name(base_info.get('name')),
'age': base_info.get('age', ''),
'sex': base_info.get('sex', ''),
'phone_masked': processor.mask_phone(base_info.get('phone')),
'id_card_masked': processor.mask_id_card(base_info.get('idCard')),
'location': base_info.get('location', ''),
'phone_area': base_info.get('phoneArea', '')
}
# 处理要素核查
element_verification = None
if element_verification_detail:
sfzeys_flag = element_verification_detail.get('sfzeysFlag', 0)
sjsys_flag = element_verification_detail.get('sjsysFlag', 0)
person_check_details = element_verification_detail.get('personCheckDetails', {})
phone_check_details = element_verification_detail.get('phoneCheckDetails', {})
if person_check_details or phone_check_details:
element_verification = {
'sfzeys_flag': sfzeys_flag,
'sfzeys_flag_text': processor.get_risk_flag_text(sfzeys_flag),
'sfzeys_flag_tag_class': processor.get_risk_flag_tag_class(sfzeys_flag),
'sjsys_flag': sjsys_flag,
'sjsys_flag_text': processor.get_risk_flag_text(sjsys_flag),
'sjsys_flag_tag_class': processor.get_risk_flag_tag_class(sjsys_flag),
'person_check_details': person_check_details,
'phone_check_details': phone_check_details,
'person_result_text': processor.get_result_text(person_check_details.get('result')),
'person_result_class': processor.get_verification_result_class(person_check_details.get('result')),
'phone_result_text': processor.get_result_text(phone_check_details.get('result')),
'phone_result_class': processor.get_verification_result_class(phone_check_details.get('result'))
}
# 处理风险预警
risk_warning_processed = None
if risk_warning:
risks = processor.get_all_risks(risk_warning)
if risks or risk_warning.get('totalRiskCounts'):
risk_warning_processed = {
'has_data': True,
'total_risk_counts': risk_warning.get('totalRiskCounts', 0),
'high_risk_count': processor.get_high_risk_count(risk_warning),
'middle_risk_count': processor.get_middle_risk_count(risk_warning),
'level': risk_warning.get('level', '-'),
'risks': risks
}
# 处理逾期风险
overdue_risk_processed = None
if overdue_risk_product:
has_unsettled_overdue = overdue_risk_product.get('hasUnsettledOverdue')
overdue_risk_processed = {
'has_data': True,
'status_text': processor.get_overdue_status_text(has_unsettled_overdue),
'status_tag_class': processor.get_overdue_status_tag_class(has_unsettled_overdue),
'current_overdue_institution_count': processor.format_institution_interval(
overdue_risk_product.get('currentOverdueInstitutionCount')
),
'current_overdue_amount': processor.format_amount_interval(
overdue_risk_product.get('currentOverdueAmount')
),
'settled_institution_count': processor.format_institution_interval(
overdue_risk_product.get('settledInstitutionCount')
),
'total_loan_institutions': processor.format_institution_interval(
overdue_risk_product.get('totalLoanInstitutions')
),
'time_1day_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast1Day')),
'time_1day_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast1Day')),
'time_7days_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast7Days')),
'time_7days_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast7Days')),
'time_14days_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast14Days')),
'time_14days_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast14Days')),
'time_30days_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast30Days')),
'time_30days_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast30Days'))
}
# 处理借贷评估
loan_evaluation_processed = None
if loan_evaluation_verification_detail:
risk_flag = loan_evaluation_verification_detail.get('riskFlag', 0)
organ_loan_performances = loan_evaluation_verification_detail.get('organLoanPerformances', [])
if organ_loan_performances:
processed_performances = []
for item in organ_loan_performances:
apply_count = item.get('applyCount', '')
type_name = '银行机构' if apply_count == '银行' else '非银机构'
processed_performances.append({
'type_name': type_name,
'last7Day': item.get('last7Day', '0/0'),
'last15Day': item.get('last15Day', '0/0'),
'last1Month': item.get('last1Month', '0/0')
})
loan_evaluation_processed = {
'has_data': True,
'risk_flag': risk_flag,
'risk_flag_text': processor.get_risk_flag_text(risk_flag),
'risk_flag_tag_class': processor.get_risk_flag_tag_class(risk_flag),
'organ_loan_performances': processed_performances
}
# 处理租赁风险评估
leasing_risk_processed = None
if leasing_risk_assessment:
risk_flag = leasing_risk_assessment.get('riskFlag', 0)
leasing_risk_processed = {
'has_data': True,
'risk_flag': risk_flag,
'risk_flag_text': processor.get_risk_flag_text(risk_flag),
'risk_flag_tag_class': processor.get_risk_flag_tag_class(risk_flag),
'institution_total': leasing_risk_assessment.get('threeCInstitutionApplicationCountLast3Days', '0/0'),
'institution_weekend': leasing_risk_assessment.get('threeCInstitutionApplicationCountLast3DaysWeekend', '0/0'),
'institution_night': leasing_risk_assessment.get('threeCInstitutionApplicationCountLast3DaysNight', '0/0'),
'platform_total': leasing_risk_assessment.get('threeCPlatformApplicationCountLast3Days', '0/0'),
'platform_weekend': leasing_risk_assessment.get('threeCPlatformApplicationCountLast3DaysWeekend', '0/0'),
'platform_night': leasing_risk_assessment.get('threeCPlatformApplicationCountLast3DaysNight', '0/0')
}
# 处理运营商核验
operator_verification = None
if element_verification_detail:
online_risk_flag = element_verification_detail.get('onlineRiskFlag', 0)
online_risk_list = element_verification_detail.get('onlineRiskList', {})
phone_vail_risk_flag = element_verification_detail.get('phoneVailRiskFlag', 0)
phone_vail_risks = element_verification_detail.get('phoneVailRisks', {})
belong_risk_flag = element_verification_detail.get('belongRiskFlag', 0)
belong_risks = element_verification_detail.get('belongRisks', {})
if online_risk_list or phone_vail_risks or belong_risks:
operator_verification = {
'has_data': True,
'online_risk_flag': online_risk_flag,
'online_risk_flag_text': processor.get_risk_flag_text(online_risk_flag),
'online_risk_flag_tag_class': processor.get_risk_flag_tag_class(online_risk_flag),
'online_risk_list': online_risk_list,
'phone_vail_risk_flag': phone_vail_risk_flag,
'phone_vail_risk_flag_text': processor.get_risk_flag_text(phone_vail_risk_flag),
'phone_vail_risk_flag_tag_class': processor.get_risk_flag_tag_class(phone_vail_risk_flag),
'phone_vail_risks': phone_vail_risks,
'belong_risk_flag': belong_risk_flag,
'belong_risk_flag_text': processor.get_risk_flag_text(belong_risk_flag),
'belong_risk_flag_tag_class': processor.get_risk_flag_tag_class(belong_risk_flag),
'belong_risks': belong_risks
}
# 处理公安重点人员核验
key_person_verification = None
if element_verification_detail:
high_risk_flag = element_verification_detail.get('highRiskFlag', 0)
key_person_check_list = element_verification_detail.get('keyPersonCheckList', {})
anti_fraud_info = element_verification_detail.get('antiFraudInfo', {})
if key_person_check_list or anti_fraud_info:
key_person_verification = {
'has_data': True,
'high_risk_flag': high_risk_flag,
'high_risk_flag_text': processor.get_risk_flag_text(high_risk_flag),
'high_risk_flag_tag_class': processor.get_risk_flag_tag_class(high_risk_flag),
'key_person_check_list': key_person_check_list,
'anti_fraud_info': anti_fraud_info
}
# 处理法院曝光台(使用司法涉诉数据)
court_exposure_processed = None
if judicial_data:
judicial_processed = process_judicial_data(judicial_data)
if judicial_processed.get('has_data'):
court_exposure_processed = judicial_processed.get('court_exposure', {})
# 处理司法涉诉数据
judicial_processed = None
if judicial_data:
judicial_processed = process_judicial_data(judicial_data)
# 计算风险评分相关
fraud_risk_level = processor.get_fraud_risk_level(fraud_score)
credit_level = processor.get_credit_level(credit_score)
return {
'base_info': base_info_processed,
'check_suggest': check_suggest,
'check_suggest_class': processor.get_check_suggest_class(check_suggest),
'fraud_score': fraud_score,
'fraud_score_display': '未命中' if fraud_score == -1 else str(fraud_score),
'fraud_risk_level': fraud_risk_level,
'fraud_score_bg_class': processor.get_fraud_score_bg_class(fraud_score),
'fraud_risk_tag_class': processor.get_risk_tag_class(fraud_risk_level),
'credit_score': credit_score,
'credit_score_display': '未命中' if credit_score == -1 else str(credit_score),
'credit_level': credit_level,
'credit_score_bg_class': processor.get_credit_score_bg_class(credit_score),
'credit_risk_tag_class': processor.get_risk_tag_class(credit_level),
'verify_rule': verify_rule,
'verify_rule_class': processor.get_risk_level_class(verify_rule),
'verify_rule_tag_class': processor.get_risk_tag_class(verify_rule),
'fraud_rule': fraud_rule,
'fraud_rule_class': processor.get_risk_level_class(fraud_rule),
'fraud_rule_tag_class': processor.get_risk_tag_class(fraud_rule),
'element_verification': element_verification,
'operator_verification': operator_verification,
'key_person_verification': key_person_verification,
'overdue_risk': overdue_risk_processed,
'court_exposure': court_exposure_processed,
'loan_evaluation': loan_evaluation_processed,
'judicial_data': judicial_processed
}
def generate_pdf(data_file: str, output_file: str, template_dir: str = 'templates'):
"""生成 PDF 文件"""
# 检查 WeasyPrint 是否可用
if not WEASYPRINT_AVAILABLE:
print("=" * 60)
print("错误WeasyPrint 不可用")
print("=" * 60)
print(WEASYPRINT_ERROR)
print("=" * 60)
raise RuntimeError("WeasyPrint 未正确安装,请参考错误信息进行安装")
# 读取数据文件
with open(data_file, 'r', encoding='utf-8') as f:
if data_file.endswith('.json'):
json_data = json.load(f)
else:
raise ValueError("不支持的文件格式,请使用 JSON 文件")
# 如果是数组,查找 DWBG8B4D 和司法涉诉数据
report_data = None
judicial_data = None
if isinstance(json_data, list):
for item in json_data:
api_id = item.get('data', {}).get('apiID', '')
if api_id in ['DWBG8B4D', 'CDWBG8B4D']:
report_data = item.get('data', {}).get('data', {})
elif api_id in ['FLXG7E8F', 'FLXG0V4B', 'CFLXG0V4B']:
# 司法涉诉数据可能在 data.data.judicial_data 或 data.data.entout
data_content = item.get('data', {}).get('data', {})
if 'judicial_data' in data_content:
judicial_data = data_content.get('judicial_data', {})
elif 'entout' in data_content:
judicial_data = data_content
else:
judicial_data = data_content
else:
report_data = json_data
if not report_data:
raise ValueError("未找到 DWBG8B4D 数据")
# 处理数据
template_vars = process_report_data(report_data, judicial_data)
# 加载模板
env = Environment(
loader=FileSystemLoader(template_dir),
autoescape=select_autoescape(['html', 'xml'])
)
template = env.get_template('report_template.html')
# 渲染 HTML
html_content = template.render(**template_vars)
# 生成 PDF
font_config = FontConfiguration()
html_doc = HTML(string=html_content)
html_doc.write_pdf(output_file, font_config=font_config)
print(f"PDF 已生成: {output_file}")
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print("用法: python generate_pdf.py <数据文件> [输出文件]")
print("示例: python generate_pdf.py public/example.json output.pdf")
sys.exit(1)
data_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else 'report.pdf'
# 确保模板目录存在
template_dir = Path('templates')
if not template_dir.exists():
template_dir.mkdir()
print(f"已创建模板目录: {template_dir}")
try:
generate_pdf(data_file, output_file)
except Exception as e:
print(f"生成 PDF 失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)