765 lines
32 KiB
Python
765 lines
32 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
谛听多维报告 PDF 生成器
|
||
使用 WeasyPrint 将 HTML 模板转换为 PDF
|
||
"""
|
||
|
||
import json
|
||
import re
|
||
from pathlib import Path
|
||
from typing import Dict, Any, Optional
|
||
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
||
|
||
# 尝试导入 WeasyPrint,提供友好的错误提示
|
||
try:
|
||
from weasyprint import HTML, CSS
|
||
from weasyprint.text.fonts import FontConfiguration
|
||
WEASYPRINT_AVAILABLE = True
|
||
except OSError as e:
|
||
if 'libgobject' in str(e) or 'gobject' in str(e).lower():
|
||
WEASYPRINT_AVAILABLE = False
|
||
WEASYPRINT_ERROR = "WeasyPrint 需要 GTK+ 运行时库。\n\n" \
|
||
"Windows 用户推荐使用 Conda 安装(最简单):\n" \
|
||
" conda install -c conda-forge weasyprint\n\n" \
|
||
"或参考 install_weasyprint_windows.md 文件中的详细安装指南。"
|
||
else:
|
||
WEASYPRINT_AVAILABLE = False
|
||
WEASYPRINT_ERROR = f"WeasyPrint 导入失败: {e}\n\n请参考 install_weasyprint_windows.md 文件。"
|
||
except ImportError as e:
|
||
WEASYPRINT_AVAILABLE = False
|
||
WEASYPRINT_ERROR = f"WeasyPrint 未安装: {e}\n\n请运行: pip install weasyprint"
|
||
|
||
|
||
class ReportDataProcessor:
|
||
"""报告数据处理器"""
|
||
|
||
@staticmethod
|
||
def mask_name(name: Optional[str]) -> str:
|
||
"""姓名脱敏"""
|
||
if not name:
|
||
return ''
|
||
if len(name) == 1:
|
||
return '*'
|
||
if len(name) == 2:
|
||
return name[0] + '*'
|
||
return name[0] + '*' * (len(name) - 2) + name[-1]
|
||
|
||
@staticmethod
|
||
def mask_phone(phone: Optional[str]) -> str:
|
||
"""手机号脱敏"""
|
||
if not phone:
|
||
return ''
|
||
if len(phone) == 11:
|
||
return phone[:3] + '****' + phone[7:]
|
||
return phone
|
||
|
||
@staticmethod
|
||
def mask_id_card(id_card: Optional[str]) -> str:
|
||
"""身份证号脱敏"""
|
||
if not id_card:
|
||
return ''
|
||
return re.sub(r'^(.{6})(?:\d+)(.{4})$', r'\1****\2', id_card)
|
||
|
||
@staticmethod
|
||
def format_interval(interval: Optional[str], unit: str = "") -> str:
|
||
"""格式化区间表达式"""
|
||
if not interval or interval == "-" or interval == "0":
|
||
return interval or "-"
|
||
|
||
try:
|
||
# 处理特殊格式,如 "3,6(个月)"
|
||
if "(" in interval and ")" in interval:
|
||
match = re.match(r'^(\d+(?:,\d+)*)\((.+)\)$', interval)
|
||
if match:
|
||
numbers = [n.strip() for n in match.group(1).split(",")]
|
||
time_unit = match.group(2)
|
||
if len(numbers) == 2:
|
||
return f"{numbers[0]}-{numbers[1]}{time_unit}"
|
||
return f"{', '.join(numbers)}{time_unit}"
|
||
|
||
# 处理区间表达式
|
||
pattern = r'^([\[\(])(\d+(?:\.\d+)?),(\d+(?:\.\d+)?|\+)([\]\)])$'
|
||
match = re.match(pattern, interval)
|
||
|
||
if not match:
|
||
return interval
|
||
|
||
left_bracket, left_value, right_value, right_bracket = match.groups()
|
||
is_left_inclusive = left_bracket == "["
|
||
is_right_inclusive = right_bracket == "]"
|
||
is_right_infinity = right_value == "+"
|
||
|
||
if is_right_infinity:
|
||
if is_left_inclusive:
|
||
return f"≥{left_value}{unit}"
|
||
else:
|
||
return f">{left_value}{unit}"
|
||
else:
|
||
left_num = float(left_value)
|
||
right_num = float(right_value)
|
||
|
||
if left_num == right_num:
|
||
return f"{int(left_num)}{unit}"
|
||
else:
|
||
if is_left_inclusive and is_right_inclusive:
|
||
return f"{int(left_num)}-{int(right_num)}{unit}"
|
||
elif is_left_inclusive and not is_right_inclusive:
|
||
return f"{int(left_num)}-{int(right_num) - 1}{unit}"
|
||
elif not is_left_inclusive and is_right_inclusive:
|
||
return f"{int(left_num) + 1}-{int(right_num)}{unit}"
|
||
else:
|
||
return f"{int(left_num) + 1}-{int(right_num) - 1}{unit}"
|
||
except Exception as e:
|
||
print(f"区间格式化失败: {e}, 原数据: {interval}")
|
||
return interval
|
||
|
||
@staticmethod
|
||
def format_amount_interval(interval: Optional[str]) -> str:
|
||
"""格式化金额区间"""
|
||
return ReportDataProcessor.format_interval(interval, "元")
|
||
|
||
@staticmethod
|
||
def format_institution_interval(interval: Optional[str]) -> str:
|
||
"""格式化机构数量区间"""
|
||
return ReportDataProcessor.format_interval(interval, "家")
|
||
|
||
@staticmethod
|
||
def get_check_suggest_class(check_suggest: Optional[str]) -> str:
|
||
"""获取审核建议样式类"""
|
||
suggest = check_suggest or '建议拒绝'
|
||
if '拒绝' in suggest:
|
||
return 'pdf-value-danger'
|
||
elif '通过' in suggest:
|
||
return 'pdf-value-success'
|
||
else:
|
||
return 'pdf-value-warning'
|
||
|
||
@staticmethod
|
||
def get_fraud_risk_level(score: int) -> str:
|
||
"""获取反欺诈风险等级"""
|
||
if score == -1:
|
||
return '未评估'
|
||
if score >= 80:
|
||
return '高风险'
|
||
if score >= 60:
|
||
return '中风险'
|
||
return '低风险'
|
||
|
||
@staticmethod
|
||
def get_credit_level(score: int) -> str:
|
||
"""获取信用等级"""
|
||
if score == -1:
|
||
return '未评估'
|
||
if score >= 800:
|
||
return '信用较好'
|
||
if score >= 500:
|
||
return '信用良好'
|
||
return '信用一般'
|
||
|
||
@staticmethod
|
||
def get_fraud_score_bg_class(score: int) -> str:
|
||
"""获取反欺诈评分背景样式类"""
|
||
if score == -1:
|
||
return 'pdf-score-default'
|
||
if score >= 80:
|
||
return 'pdf-score-high'
|
||
if score >= 60:
|
||
return 'pdf-score-medium'
|
||
return 'pdf-score-low'
|
||
|
||
@staticmethod
|
||
def get_credit_score_bg_class(score: int) -> str:
|
||
"""获取信用评分背景样式类"""
|
||
if score == -1:
|
||
return 'pdf-score-default'
|
||
if score >= 800:
|
||
return 'pdf-score-low'
|
||
if score >= 500:
|
||
return 'pdf-score-info'
|
||
return 'pdf-score-medium'
|
||
|
||
@staticmethod
|
||
def get_risk_tag_class(level: str) -> str:
|
||
"""获取风险标签样式类"""
|
||
if level == '高风险':
|
||
return 'pdf-tag-danger'
|
||
if level == '中风险':
|
||
return 'pdf-tag-warning'
|
||
if level == '低风险':
|
||
return 'pdf-tag-success'
|
||
if level == '信用较好':
|
||
return 'pdf-tag-success'
|
||
if level == '信用良好':
|
||
return 'pdf-tag-info'
|
||
if level == '信用一般':
|
||
return 'pdf-tag-warning'
|
||
return 'pdf-tag-default'
|
||
|
||
@staticmethod
|
||
def get_risk_level_class(level: str) -> str:
|
||
"""获取风险等级样式类"""
|
||
if level == '高风险':
|
||
return 'pdf-score-high'
|
||
if level == '中风险':
|
||
return 'pdf-score-medium'
|
||
if level == '低风险':
|
||
return 'pdf-score-low'
|
||
return 'pdf-score-default'
|
||
|
||
@staticmethod
|
||
def get_risk_flag_text(flag: int) -> str:
|
||
"""获取风险标识文本"""
|
||
if flag == 1:
|
||
return '高风险'
|
||
if flag == 2:
|
||
return '低风险'
|
||
return '未查得'
|
||
|
||
@staticmethod
|
||
def get_risk_flag_tag_class(flag: int) -> str:
|
||
"""获取风险标识标签样式类"""
|
||
if flag == 1:
|
||
return 'pdf-tag-danger'
|
||
if flag == 2:
|
||
return 'pdf-tag-success'
|
||
return 'pdf-tag-default'
|
||
|
||
@staticmethod
|
||
def get_result_text(result: Optional[str]) -> str:
|
||
"""获取验证结果文本"""
|
||
if result == '一致':
|
||
return '核验一致'
|
||
if result == '不一致':
|
||
return '核验不一致'
|
||
return result or '未查得'
|
||
|
||
@staticmethod
|
||
def get_verification_result_class(result: Optional[str]) -> str:
|
||
"""获取验证结果样式类"""
|
||
if result == '一致':
|
||
return 'pdf-result-success'
|
||
if result == '不一致':
|
||
return 'pdf-result-danger'
|
||
return 'pdf-result-default'
|
||
|
||
@staticmethod
|
||
def get_high_risk_count(risk_warning: Dict[str, Any]) -> int:
|
||
"""获取高风险数量"""
|
||
high_risk_fields = [
|
||
'idCardTwoElementMismatch', 'phoneThreeElementMismatch',
|
||
'shortPhoneDuration', 'noPhoneDuration',
|
||
'hasCriminalRecord', 'isEconomyFront', 'isDisrupSocial', 'isKeyPerson', 'isTrafficRelated',
|
||
'hitHighRiskBankLastTwoYears', 'hitHighRiskNonBankLastTwoYears',
|
||
'hitCivilCase', 'hitCriminalRisk', 'hitAdministrativeCase', 'hitPreservationReview',
|
||
'hitExecutionCase', 'hitBankruptcyAndLiquidation', 'hitDirectlyUnderCase', 'hitCompensationCase',
|
||
'frequentApplicationRecent', 'frequentNonBankApplications', 'highDebtPressure', 'frequentBankApplications',
|
||
'frequentRentalApplications', 'veryFrequentRentalApplications'
|
||
]
|
||
return sum(risk_warning.get(field, 0) for field in high_risk_fields)
|
||
|
||
@staticmethod
|
||
def get_middle_risk_count(risk_warning: Dict[str, Any]) -> int:
|
||
"""获取中风险数量"""
|
||
middle_risk_fields = [
|
||
'idCardPhoneProvinceMismatch', 'isAntiFraudInfo',
|
||
'hitCurrentOverdue',
|
||
'moreFrequentNonBankApplications', 'highFraudGangLevel', 'moreFrequentBankApplications'
|
||
]
|
||
return sum(risk_warning.get(field, 0) for field in middle_risk_fields)
|
||
|
||
@staticmethod
|
||
def get_all_risks(risk_warning: Dict[str, Any]) -> list:
|
||
"""获取所有风险列表"""
|
||
risks = []
|
||
|
||
risk_mapping = {
|
||
'idCardTwoElementMismatch': {
|
||
'description': '身份证二要素信息对比结果不一致',
|
||
'detail': '身份证号与姓名信息不匹配',
|
||
'level': '高风险'
|
||
},
|
||
'phoneThreeElementMismatch': {
|
||
'description': '手机三要素简版不一致',
|
||
'detail': '手机号与身份证号、姓名信息不匹配',
|
||
'level': '高风险'
|
||
},
|
||
'shortPhoneDuration': {
|
||
'description': '手机在网时长极短',
|
||
'detail': '手机号在网时间过短,存在风险',
|
||
'level': '高风险'
|
||
},
|
||
'idCardPhoneProvinceMismatch': {
|
||
'description': '身份证号手机号归属省不一致',
|
||
'detail': '身份证归属地与手机号归属地不匹配',
|
||
'level': '中风险'
|
||
},
|
||
'hasCriminalRecord': {
|
||
'description': '该用户有前科',
|
||
'detail': '用户存在犯罪前科记录',
|
||
'level': '高风险'
|
||
},
|
||
'isKeyPerson': {
|
||
'description': '该用户为重点人员',
|
||
'detail': '用户被列为重点监管人员',
|
||
'level': '高风险'
|
||
},
|
||
'hitHighRiskBankLastTwoYears': {
|
||
'description': '近两年命中银行高风险',
|
||
'detail': '近两年在银行机构存在高风险记录',
|
||
'level': '高风险'
|
||
},
|
||
'hitCurrentOverdue': {
|
||
'description': '该用户命中当前逾期',
|
||
'detail': '用户当前存在逾期记录',
|
||
'level': '中风险'
|
||
},
|
||
'frequentApplicationRecent': {
|
||
'description': '近期申请机构极为频繁',
|
||
'detail': '近期在多个机构频繁申请贷款',
|
||
'level': '高风险'
|
||
}
|
||
}
|
||
|
||
for key, info in risk_mapping.items():
|
||
if risk_warning.get(key, 0):
|
||
badge_class = 'pdf-tag-danger' if info['level'] == '高风险' else 'pdf-tag-warning'
|
||
risks.append({
|
||
'key': key,
|
||
'description': info['description'],
|
||
'detail': info['detail'],
|
||
'level': info['level'],
|
||
'badge_class': badge_class
|
||
})
|
||
|
||
return risks
|
||
|
||
@staticmethod
|
||
def get_overdue_status_text(status: Optional[str]) -> str:
|
||
"""获取逾期状态文本"""
|
||
if status == '逾期':
|
||
return '逾期'
|
||
if status == '未逾期':
|
||
return '未逾期'
|
||
return '未知'
|
||
|
||
@staticmethod
|
||
def get_overdue_status_tag_class(status: Optional[str]) -> str:
|
||
"""获取逾期状态标签样式类"""
|
||
if status == '逾期':
|
||
return 'pdf-tag-danger'
|
||
if status == '未逾期':
|
||
return 'pdf-tag-success'
|
||
return 'pdf-tag-default'
|
||
|
||
@staticmethod
|
||
def get_overdue_time_text(status: Optional[str]) -> str:
|
||
"""获取逾期时间文本"""
|
||
if status == '逾期':
|
||
return '逾期'
|
||
if status == '未逾期':
|
||
return '正常'
|
||
return '未知'
|
||
|
||
@staticmethod
|
||
def get_overdue_time_class(status: Optional[str]) -> str:
|
||
"""获取逾期时间样式类"""
|
||
if status == '逾期':
|
||
return 'pdf-time-danger'
|
||
if status == '未逾期':
|
||
return 'pdf-time-success'
|
||
return 'pdf-time-default'
|
||
|
||
|
||
def process_judicial_data(judicial_data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""处理司法涉诉数据"""
|
||
processor = ReportDataProcessor()
|
||
|
||
# 处理不同的数据结构
|
||
lawsuit_stat = None
|
||
breach_case_list = []
|
||
consumption_restriction_list = []
|
||
|
||
# 如果数据在 judicial_data 下
|
||
if 'lawsuitStat' in judicial_data:
|
||
lawsuit_stat = judicial_data.get('lawsuitStat', {})
|
||
breach_case_list = judicial_data.get('breachCaseList', [])
|
||
consumption_restriction_list = judicial_data.get('consumptionRestrictionList', [])
|
||
# 如果数据在 entout.data 下
|
||
elif 'entout' in judicial_data:
|
||
entout_data = judicial_data.get('entout', {}).get('data', {})
|
||
lawsuit_stat = entout_data
|
||
breach_case_list = judicial_data.get('breachCaseList', [])
|
||
consumption_restriction_list = judicial_data.get('consumptionRestrictionList', [])
|
||
# 如果直接就是 lawsuitStat 结构
|
||
elif 'count' in judicial_data or 'civil' in judicial_data:
|
||
lawsuit_stat = judicial_data
|
||
|
||
if not lawsuit_stat:
|
||
lawsuit_stat = {}
|
||
|
||
# 处理案件统计
|
||
count = lawsuit_stat.get('count', {})
|
||
|
||
# 处理各类案件
|
||
civil = lawsuit_stat.get('civil', {})
|
||
criminal = lawsuit_stat.get('criminal', {})
|
||
administrative = lawsuit_stat.get('administrative', {})
|
||
implement = lawsuit_stat.get('implement', {})
|
||
preservation = lawsuit_stat.get('preservation', {})
|
||
bankrupt = lawsuit_stat.get('bankrupt', {})
|
||
|
||
# 法院曝光台数据(用于替代谛听报告的法院曝光台)
|
||
# 从 multCourtInfo 或司法涉诉数据中提取
|
||
court_exposure = {
|
||
'legal_cases': [],
|
||
'execution_cases': [],
|
||
'disin_cases': breach_case_list,
|
||
'limit_cases': consumption_restriction_list
|
||
}
|
||
|
||
# 如果有执行案件,添加到法院曝光台
|
||
if implement and implement.get('cases'):
|
||
court_exposure['execution_cases'] = implement.get('cases', [])
|
||
|
||
# 如果有民事案件,添加到法院曝光台
|
||
if civil and civil.get('cases'):
|
||
court_exposure['legal_cases'].extend(civil.get('cases', []))
|
||
|
||
# 如果有刑事案件,添加到法院曝光台
|
||
if criminal and criminal.get('cases'):
|
||
court_exposure['legal_cases'].extend(criminal.get('cases', []))
|
||
|
||
return {
|
||
'has_data': bool(count or civil or criminal or administrative or implement or preservation or bankrupt or breach_case_list or consumption_restriction_list),
|
||
'count': count,
|
||
'civil': civil,
|
||
'criminal': criminal,
|
||
'administrative': administrative,
|
||
'implement': implement,
|
||
'preservation': preservation,
|
||
'bankrupt': bankrupt,
|
||
'breach_case_list': breach_case_list,
|
||
'consumption_restriction_list': consumption_restriction_list,
|
||
'court_exposure': court_exposure
|
||
}
|
||
|
||
|
||
def process_report_data(data: Dict[str, Any], judicial_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
||
"""处理报告数据,准备模板变量"""
|
||
processor = ReportDataProcessor()
|
||
|
||
base_info = data.get('baseInfo', {})
|
||
check_suggest = data.get('checkSuggest', '')
|
||
fraud_score = data.get('fraudScore', -1)
|
||
credit_score = data.get('creditScore', -1)
|
||
verify_rule = data.get('verifyRule', '')
|
||
fraud_rule = data.get('fraudRule', '')
|
||
|
||
element_verification_detail = data.get('elementVerificationDetail', {})
|
||
risk_warning = data.get('riskWarning', {})
|
||
overdue_risk_product = data.get('overdueRiskProduct', {})
|
||
loan_evaluation_verification_detail = data.get('loanEvaluationVerificationDetail', {})
|
||
leasing_risk_assessment = data.get('leasingRiskAssessment', {})
|
||
risk_supervision = data.get('riskSupervision', {})
|
||
|
||
# 处理基本信息
|
||
base_info_processed = {
|
||
'name_masked': processor.mask_name(base_info.get('name')),
|
||
'age': base_info.get('age', ''),
|
||
'sex': base_info.get('sex', ''),
|
||
'phone_masked': processor.mask_phone(base_info.get('phone')),
|
||
'id_card_masked': processor.mask_id_card(base_info.get('idCard')),
|
||
'location': base_info.get('location', ''),
|
||
'phone_area': base_info.get('phoneArea', '')
|
||
}
|
||
|
||
# 处理要素核查
|
||
element_verification = None
|
||
if element_verification_detail:
|
||
sfzeys_flag = element_verification_detail.get('sfzeysFlag', 0)
|
||
sjsys_flag = element_verification_detail.get('sjsysFlag', 0)
|
||
person_check_details = element_verification_detail.get('personCheckDetails', {})
|
||
phone_check_details = element_verification_detail.get('phoneCheckDetails', {})
|
||
|
||
if person_check_details or phone_check_details:
|
||
element_verification = {
|
||
'sfzeys_flag': sfzeys_flag,
|
||
'sfzeys_flag_text': processor.get_risk_flag_text(sfzeys_flag),
|
||
'sfzeys_flag_tag_class': processor.get_risk_flag_tag_class(sfzeys_flag),
|
||
'sjsys_flag': sjsys_flag,
|
||
'sjsys_flag_text': processor.get_risk_flag_text(sjsys_flag),
|
||
'sjsys_flag_tag_class': processor.get_risk_flag_tag_class(sjsys_flag),
|
||
'person_check_details': person_check_details,
|
||
'phone_check_details': phone_check_details,
|
||
'person_result_text': processor.get_result_text(person_check_details.get('result')),
|
||
'person_result_class': processor.get_verification_result_class(person_check_details.get('result')),
|
||
'phone_result_text': processor.get_result_text(phone_check_details.get('result')),
|
||
'phone_result_class': processor.get_verification_result_class(phone_check_details.get('result'))
|
||
}
|
||
|
||
# 处理风险预警
|
||
risk_warning_processed = None
|
||
if risk_warning:
|
||
risks = processor.get_all_risks(risk_warning)
|
||
if risks or risk_warning.get('totalRiskCounts'):
|
||
risk_warning_processed = {
|
||
'has_data': True,
|
||
'total_risk_counts': risk_warning.get('totalRiskCounts', 0),
|
||
'high_risk_count': processor.get_high_risk_count(risk_warning),
|
||
'middle_risk_count': processor.get_middle_risk_count(risk_warning),
|
||
'level': risk_warning.get('level', '-'),
|
||
'risks': risks
|
||
}
|
||
|
||
# 处理逾期风险
|
||
overdue_risk_processed = None
|
||
if overdue_risk_product:
|
||
has_unsettled_overdue = overdue_risk_product.get('hasUnsettledOverdue')
|
||
overdue_risk_processed = {
|
||
'has_data': True,
|
||
'status_text': processor.get_overdue_status_text(has_unsettled_overdue),
|
||
'status_tag_class': processor.get_overdue_status_tag_class(has_unsettled_overdue),
|
||
'current_overdue_institution_count': processor.format_institution_interval(
|
||
overdue_risk_product.get('currentOverdueInstitutionCount')
|
||
),
|
||
'current_overdue_amount': processor.format_amount_interval(
|
||
overdue_risk_product.get('currentOverdueAmount')
|
||
),
|
||
'settled_institution_count': processor.format_institution_interval(
|
||
overdue_risk_product.get('settledInstitutionCount')
|
||
),
|
||
'total_loan_institutions': processor.format_institution_interval(
|
||
overdue_risk_product.get('totalLoanInstitutions')
|
||
),
|
||
'time_1day_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast1Day')),
|
||
'time_1day_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast1Day')),
|
||
'time_7days_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast7Days')),
|
||
'time_7days_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast7Days')),
|
||
'time_14days_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast14Days')),
|
||
'time_14days_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast14Days')),
|
||
'time_30days_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast30Days')),
|
||
'time_30days_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast30Days'))
|
||
}
|
||
|
||
# 处理借贷评估
|
||
loan_evaluation_processed = None
|
||
if loan_evaluation_verification_detail:
|
||
risk_flag = loan_evaluation_verification_detail.get('riskFlag', 0)
|
||
organ_loan_performances = loan_evaluation_verification_detail.get('organLoanPerformances', [])
|
||
|
||
if organ_loan_performances:
|
||
processed_performances = []
|
||
for item in organ_loan_performances:
|
||
apply_count = item.get('applyCount', '')
|
||
type_name = '银行机构' if apply_count == '银行' else '非银机构'
|
||
processed_performances.append({
|
||
'type_name': type_name,
|
||
'last7Day': item.get('last7Day', '0/0'),
|
||
'last15Day': item.get('last15Day', '0/0'),
|
||
'last1Month': item.get('last1Month', '0/0')
|
||
})
|
||
|
||
loan_evaluation_processed = {
|
||
'has_data': True,
|
||
'risk_flag': risk_flag,
|
||
'risk_flag_text': processor.get_risk_flag_text(risk_flag),
|
||
'risk_flag_tag_class': processor.get_risk_flag_tag_class(risk_flag),
|
||
'organ_loan_performances': processed_performances
|
||
}
|
||
|
||
# 处理租赁风险评估
|
||
leasing_risk_processed = None
|
||
if leasing_risk_assessment:
|
||
risk_flag = leasing_risk_assessment.get('riskFlag', 0)
|
||
leasing_risk_processed = {
|
||
'has_data': True,
|
||
'risk_flag': risk_flag,
|
||
'risk_flag_text': processor.get_risk_flag_text(risk_flag),
|
||
'risk_flag_tag_class': processor.get_risk_flag_tag_class(risk_flag),
|
||
'institution_total': leasing_risk_assessment.get('threeCInstitutionApplicationCountLast3Days', '0/0'),
|
||
'institution_weekend': leasing_risk_assessment.get('threeCInstitutionApplicationCountLast3DaysWeekend', '0/0'),
|
||
'institution_night': leasing_risk_assessment.get('threeCInstitutionApplicationCountLast3DaysNight', '0/0'),
|
||
'platform_total': leasing_risk_assessment.get('threeCPlatformApplicationCountLast3Days', '0/0'),
|
||
'platform_weekend': leasing_risk_assessment.get('threeCPlatformApplicationCountLast3DaysWeekend', '0/0'),
|
||
'platform_night': leasing_risk_assessment.get('threeCPlatformApplicationCountLast3DaysNight', '0/0')
|
||
}
|
||
|
||
# 处理运营商核验
|
||
operator_verification = None
|
||
if element_verification_detail:
|
||
online_risk_flag = element_verification_detail.get('onlineRiskFlag', 0)
|
||
online_risk_list = element_verification_detail.get('onlineRiskList', {})
|
||
phone_vail_risk_flag = element_verification_detail.get('phoneVailRiskFlag', 0)
|
||
phone_vail_risks = element_verification_detail.get('phoneVailRisks', {})
|
||
belong_risk_flag = element_verification_detail.get('belongRiskFlag', 0)
|
||
belong_risks = element_verification_detail.get('belongRisks', {})
|
||
|
||
if online_risk_list or phone_vail_risks or belong_risks:
|
||
operator_verification = {
|
||
'has_data': True,
|
||
'online_risk_flag': online_risk_flag,
|
||
'online_risk_flag_text': processor.get_risk_flag_text(online_risk_flag),
|
||
'online_risk_flag_tag_class': processor.get_risk_flag_tag_class(online_risk_flag),
|
||
'online_risk_list': online_risk_list,
|
||
'phone_vail_risk_flag': phone_vail_risk_flag,
|
||
'phone_vail_risk_flag_text': processor.get_risk_flag_text(phone_vail_risk_flag),
|
||
'phone_vail_risk_flag_tag_class': processor.get_risk_flag_tag_class(phone_vail_risk_flag),
|
||
'phone_vail_risks': phone_vail_risks,
|
||
'belong_risk_flag': belong_risk_flag,
|
||
'belong_risk_flag_text': processor.get_risk_flag_text(belong_risk_flag),
|
||
'belong_risk_flag_tag_class': processor.get_risk_flag_tag_class(belong_risk_flag),
|
||
'belong_risks': belong_risks
|
||
}
|
||
|
||
# 处理公安重点人员核验
|
||
key_person_verification = None
|
||
if element_verification_detail:
|
||
high_risk_flag = element_verification_detail.get('highRiskFlag', 0)
|
||
key_person_check_list = element_verification_detail.get('keyPersonCheckList', {})
|
||
anti_fraud_info = element_verification_detail.get('antiFraudInfo', {})
|
||
|
||
if key_person_check_list or anti_fraud_info:
|
||
key_person_verification = {
|
||
'has_data': True,
|
||
'high_risk_flag': high_risk_flag,
|
||
'high_risk_flag_text': processor.get_risk_flag_text(high_risk_flag),
|
||
'high_risk_flag_tag_class': processor.get_risk_flag_tag_class(high_risk_flag),
|
||
'key_person_check_list': key_person_check_list,
|
||
'anti_fraud_info': anti_fraud_info
|
||
}
|
||
|
||
# 处理法院曝光台(使用司法涉诉数据)
|
||
court_exposure_processed = None
|
||
if judicial_data:
|
||
judicial_processed = process_judicial_data(judicial_data)
|
||
if judicial_processed.get('has_data'):
|
||
court_exposure_processed = judicial_processed.get('court_exposure', {})
|
||
|
||
# 处理司法涉诉数据
|
||
judicial_processed = None
|
||
if judicial_data:
|
||
judicial_processed = process_judicial_data(judicial_data)
|
||
|
||
# 计算风险评分相关
|
||
fraud_risk_level = processor.get_fraud_risk_level(fraud_score)
|
||
credit_level = processor.get_credit_level(credit_score)
|
||
|
||
return {
|
||
'base_info': base_info_processed,
|
||
'check_suggest': check_suggest,
|
||
'check_suggest_class': processor.get_check_suggest_class(check_suggest),
|
||
'fraud_score': fraud_score,
|
||
'fraud_score_display': '未命中' if fraud_score == -1 else str(fraud_score),
|
||
'fraud_risk_level': fraud_risk_level,
|
||
'fraud_score_bg_class': processor.get_fraud_score_bg_class(fraud_score),
|
||
'fraud_risk_tag_class': processor.get_risk_tag_class(fraud_risk_level),
|
||
'credit_score': credit_score,
|
||
'credit_score_display': '未命中' if credit_score == -1 else str(credit_score),
|
||
'credit_level': credit_level,
|
||
'credit_score_bg_class': processor.get_credit_score_bg_class(credit_score),
|
||
'credit_risk_tag_class': processor.get_risk_tag_class(credit_level),
|
||
'verify_rule': verify_rule,
|
||
'verify_rule_class': processor.get_risk_level_class(verify_rule),
|
||
'verify_rule_tag_class': processor.get_risk_tag_class(verify_rule),
|
||
'fraud_rule': fraud_rule,
|
||
'fraud_rule_class': processor.get_risk_level_class(fraud_rule),
|
||
'fraud_rule_tag_class': processor.get_risk_tag_class(fraud_rule),
|
||
'element_verification': element_verification,
|
||
'operator_verification': operator_verification,
|
||
'key_person_verification': key_person_verification,
|
||
'overdue_risk': overdue_risk_processed,
|
||
'court_exposure': court_exposure_processed,
|
||
'loan_evaluation': loan_evaluation_processed,
|
||
'judicial_data': judicial_processed
|
||
}
|
||
|
||
|
||
def generate_pdf(data_file: str, output_file: str, template_dir: str = 'templates'):
|
||
"""生成 PDF 文件"""
|
||
# 检查 WeasyPrint 是否可用
|
||
if not WEASYPRINT_AVAILABLE:
|
||
print("=" * 60)
|
||
print("错误:WeasyPrint 不可用")
|
||
print("=" * 60)
|
||
print(WEASYPRINT_ERROR)
|
||
print("=" * 60)
|
||
raise RuntimeError("WeasyPrint 未正确安装,请参考错误信息进行安装")
|
||
|
||
# 读取数据文件
|
||
with open(data_file, 'r', encoding='utf-8') as f:
|
||
if data_file.endswith('.json'):
|
||
json_data = json.load(f)
|
||
else:
|
||
raise ValueError("不支持的文件格式,请使用 JSON 文件")
|
||
|
||
# 如果是数组,查找 DWBG8B4D 和司法涉诉数据
|
||
report_data = None
|
||
judicial_data = None
|
||
|
||
if isinstance(json_data, list):
|
||
for item in json_data:
|
||
api_id = item.get('data', {}).get('apiID', '')
|
||
if api_id in ['DWBG8B4D', 'CDWBG8B4D']:
|
||
report_data = item.get('data', {}).get('data', {})
|
||
elif api_id in ['FLXG7E8F', 'FLXG0V4B', 'CFLXG0V4B']:
|
||
# 司法涉诉数据可能在 data.data.judicial_data 或 data.data.entout
|
||
data_content = item.get('data', {}).get('data', {})
|
||
if 'judicial_data' in data_content:
|
||
judicial_data = data_content.get('judicial_data', {})
|
||
elif 'entout' in data_content:
|
||
judicial_data = data_content
|
||
else:
|
||
judicial_data = data_content
|
||
else:
|
||
report_data = json_data
|
||
|
||
if not report_data:
|
||
raise ValueError("未找到 DWBG8B4D 数据")
|
||
|
||
# 处理数据
|
||
template_vars = process_report_data(report_data, judicial_data)
|
||
|
||
# 加载模板
|
||
env = Environment(
|
||
loader=FileSystemLoader(template_dir),
|
||
autoescape=select_autoescape(['html', 'xml'])
|
||
)
|
||
template = env.get_template('report_template.html')
|
||
|
||
# 渲染 HTML
|
||
html_content = template.render(**template_vars)
|
||
|
||
# 生成 PDF
|
||
font_config = FontConfiguration()
|
||
html_doc = HTML(string=html_content)
|
||
html_doc.write_pdf(output_file, font_config=font_config)
|
||
|
||
print(f"PDF 已生成: {output_file}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
import sys
|
||
|
||
if len(sys.argv) < 2:
|
||
print("用法: python generate_pdf.py <数据文件> [输出文件]")
|
||
print("示例: python generate_pdf.py public/example.json output.pdf")
|
||
sys.exit(1)
|
||
|
||
data_file = sys.argv[1]
|
||
output_file = sys.argv[2] if len(sys.argv) > 2 else 'report.pdf'
|
||
|
||
# 确保模板目录存在
|
||
template_dir = Path('templates')
|
||
if not template_dir.exists():
|
||
template_dir.mkdir()
|
||
print(f"已创建模板目录: {template_dir}")
|
||
|
||
try:
|
||
generate_pdf(data_file, output_file)
|
||
except Exception as e:
|
||
print(f"生成 PDF 失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
sys.exit(1)
|