#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 谛听多维报告 PDF 生成器 使用 WeasyPrint 将 HTML 模板转换为 PDF """ import json import re from pathlib import Path from typing import Dict, Any, Optional from jinja2 import Environment, FileSystemLoader, select_autoescape # 尝试导入 WeasyPrint,提供友好的错误提示 try: from weasyprint import HTML, CSS from weasyprint.text.fonts import FontConfiguration WEASYPRINT_AVAILABLE = True except OSError as e: if 'libgobject' in str(e) or 'gobject' in str(e).lower(): WEASYPRINT_AVAILABLE = False WEASYPRINT_ERROR = "WeasyPrint 需要 GTK+ 运行时库。\n\n" \ "Windows 用户推荐使用 Conda 安装(最简单):\n" \ " conda install -c conda-forge weasyprint\n\n" \ "或参考 install_weasyprint_windows.md 文件中的详细安装指南。" else: WEASYPRINT_AVAILABLE = False WEASYPRINT_ERROR = f"WeasyPrint 导入失败: {e}\n\n请参考 install_weasyprint_windows.md 文件。" except ImportError as e: WEASYPRINT_AVAILABLE = False WEASYPRINT_ERROR = f"WeasyPrint 未安装: {e}\n\n请运行: pip install weasyprint" class ReportDataProcessor: """报告数据处理器""" @staticmethod def mask_name(name: Optional[str]) -> str: """姓名脱敏""" if not name: return '' if len(name) == 1: return '*' if len(name) == 2: return name[0] + '*' return name[0] + '*' * (len(name) - 2) + name[-1] @staticmethod def mask_phone(phone: Optional[str]) -> str: """手机号脱敏""" if not phone: return '' if len(phone) == 11: return phone[:3] + '****' + phone[7:] return phone @staticmethod def mask_id_card(id_card: Optional[str]) -> str: """身份证号脱敏""" if not id_card: return '' return re.sub(r'^(.{6})(?:\d+)(.{4})$', r'\1****\2', id_card) @staticmethod def format_interval(interval: Optional[str], unit: str = "") -> str: """格式化区间表达式""" if not interval or interval == "-" or interval == "0": return interval or "-" try: # 处理特殊格式,如 "3,6(个月)" if "(" in interval and ")" in interval: match = re.match(r'^(\d+(?:,\d+)*)\((.+)\)$', interval) if match: numbers = [n.strip() for n in match.group(1).split(",")] time_unit = match.group(2) if len(numbers) == 2: return f"{numbers[0]}-{numbers[1]}{time_unit}" return f"{', '.join(numbers)}{time_unit}" # 处理区间表达式 pattern = r'^([\[\(])(\d+(?:\.\d+)?),(\d+(?:\.\d+)?|\+)([\]\)])$' match = re.match(pattern, interval) if not match: return interval left_bracket, left_value, right_value, right_bracket = match.groups() is_left_inclusive = left_bracket == "[" is_right_inclusive = right_bracket == "]" is_right_infinity = right_value == "+" if is_right_infinity: if is_left_inclusive: return f"≥{left_value}{unit}" else: return f">{left_value}{unit}" else: left_num = float(left_value) right_num = float(right_value) if left_num == right_num: return f"{int(left_num)}{unit}" else: if is_left_inclusive and is_right_inclusive: return f"{int(left_num)}-{int(right_num)}{unit}" elif is_left_inclusive and not is_right_inclusive: return f"{int(left_num)}-{int(right_num) - 1}{unit}" elif not is_left_inclusive and is_right_inclusive: return f"{int(left_num) + 1}-{int(right_num)}{unit}" else: return f"{int(left_num) + 1}-{int(right_num) - 1}{unit}" except Exception as e: print(f"区间格式化失败: {e}, 原数据: {interval}") return interval @staticmethod def format_amount_interval(interval: Optional[str]) -> str: """格式化金额区间""" return ReportDataProcessor.format_interval(interval, "元") @staticmethod def format_institution_interval(interval: Optional[str]) -> str: """格式化机构数量区间""" return ReportDataProcessor.format_interval(interval, "家") @staticmethod def get_check_suggest_class(check_suggest: Optional[str]) -> str: """获取审核建议样式类""" suggest = check_suggest or '建议拒绝' if '拒绝' in suggest: return 'pdf-value-danger' elif '通过' in suggest: return 'pdf-value-success' else: return 'pdf-value-warning' @staticmethod def get_fraud_risk_level(score: int) -> str: """获取反欺诈风险等级""" if score == -1: return '未评估' if score >= 80: return '高风险' if score >= 60: return '中风险' return '低风险' @staticmethod def get_credit_level(score: int) -> str: """获取信用等级""" if score == -1: return '未评估' if score >= 800: return '信用较好' if score >= 500: return '信用良好' return '信用一般' @staticmethod def get_fraud_score_bg_class(score: int) -> str: """获取反欺诈评分背景样式类""" if score == -1: return 'pdf-score-default' if score >= 80: return 'pdf-score-high' if score >= 60: return 'pdf-score-medium' return 'pdf-score-low' @staticmethod def get_credit_score_bg_class(score: int) -> str: """获取信用评分背景样式类""" if score == -1: return 'pdf-score-default' if score >= 800: return 'pdf-score-low' if score >= 500: return 'pdf-score-info' return 'pdf-score-medium' @staticmethod def get_risk_tag_class(level: str) -> str: """获取风险标签样式类""" if level == '高风险': return 'pdf-tag-danger' if level == '中风险': return 'pdf-tag-warning' if level == '低风险': return 'pdf-tag-success' if level == '信用较好': return 'pdf-tag-success' if level == '信用良好': return 'pdf-tag-info' if level == '信用一般': return 'pdf-tag-warning' return 'pdf-tag-default' @staticmethod def get_risk_level_class(level: str) -> str: """获取风险等级样式类""" if level == '高风险': return 'pdf-score-high' if level == '中风险': return 'pdf-score-medium' if level == '低风险': return 'pdf-score-low' return 'pdf-score-default' @staticmethod def get_risk_flag_text(flag: int) -> str: """获取风险标识文本""" if flag == 1: return '高风险' if flag == 2: return '低风险' return '未查得' @staticmethod def get_risk_flag_tag_class(flag: int) -> str: """获取风险标识标签样式类""" if flag == 1: return 'pdf-tag-danger' if flag == 2: return 'pdf-tag-success' return 'pdf-tag-default' @staticmethod def get_result_text(result: Optional[str]) -> str: """获取验证结果文本""" if result == '一致': return '核验一致' if result == '不一致': return '核验不一致' return result or '未查得' @staticmethod def get_verification_result_class(result: Optional[str]) -> str: """获取验证结果样式类""" if result == '一致': return 'pdf-result-success' if result == '不一致': return 'pdf-result-danger' return 'pdf-result-default' @staticmethod def get_high_risk_count(risk_warning: Dict[str, Any]) -> int: """获取高风险数量""" high_risk_fields = [ 'idCardTwoElementMismatch', 'phoneThreeElementMismatch', 'shortPhoneDuration', 'noPhoneDuration', 'hasCriminalRecord', 'isEconomyFront', 'isDisrupSocial', 'isKeyPerson', 'isTrafficRelated', 'hitHighRiskBankLastTwoYears', 'hitHighRiskNonBankLastTwoYears', 'hitCivilCase', 'hitCriminalRisk', 'hitAdministrativeCase', 'hitPreservationReview', 'hitExecutionCase', 'hitBankruptcyAndLiquidation', 'hitDirectlyUnderCase', 'hitCompensationCase', 'frequentApplicationRecent', 'frequentNonBankApplications', 'highDebtPressure', 'frequentBankApplications', 'frequentRentalApplications', 'veryFrequentRentalApplications' ] return sum(risk_warning.get(field, 0) for field in high_risk_fields) @staticmethod def get_middle_risk_count(risk_warning: Dict[str, Any]) -> int: """获取中风险数量""" middle_risk_fields = [ 'idCardPhoneProvinceMismatch', 'isAntiFraudInfo', 'hitCurrentOverdue', 'moreFrequentNonBankApplications', 'highFraudGangLevel', 'moreFrequentBankApplications' ] return sum(risk_warning.get(field, 0) for field in middle_risk_fields) @staticmethod def get_all_risks(risk_warning: Dict[str, Any]) -> list: """获取所有风险列表""" risks = [] risk_mapping = { 'idCardTwoElementMismatch': { 'description': '身份证二要素信息对比结果不一致', 'detail': '身份证号与姓名信息不匹配', 'level': '高风险' }, 'phoneThreeElementMismatch': { 'description': '手机三要素简版不一致', 'detail': '手机号与身份证号、姓名信息不匹配', 'level': '高风险' }, 'shortPhoneDuration': { 'description': '手机在网时长极短', 'detail': '手机号在网时间过短,存在风险', 'level': '高风险' }, 'idCardPhoneProvinceMismatch': { 'description': '身份证号手机号归属省不一致', 'detail': '身份证归属地与手机号归属地不匹配', 'level': '中风险' }, 'hasCriminalRecord': { 'description': '该用户有前科', 'detail': '用户存在犯罪前科记录', 'level': '高风险' }, 'isKeyPerson': { 'description': '该用户为重点人员', 'detail': '用户被列为重点监管人员', 'level': '高风险' }, 'hitHighRiskBankLastTwoYears': { 'description': '近两年命中银行高风险', 'detail': '近两年在银行机构存在高风险记录', 'level': '高风险' }, 'hitCurrentOverdue': { 'description': '该用户命中当前逾期', 'detail': '用户当前存在逾期记录', 'level': '中风险' }, 'frequentApplicationRecent': { 'description': '近期申请机构极为频繁', 'detail': '近期在多个机构频繁申请贷款', 'level': '高风险' } } for key, info in risk_mapping.items(): if risk_warning.get(key, 0): badge_class = 'pdf-tag-danger' if info['level'] == '高风险' else 'pdf-tag-warning' risks.append({ 'key': key, 'description': info['description'], 'detail': info['detail'], 'level': info['level'], 'badge_class': badge_class }) return risks @staticmethod def get_overdue_status_text(status: Optional[str]) -> str: """获取逾期状态文本""" if status == '逾期': return '逾期' if status == '未逾期': return '未逾期' return '未知' @staticmethod def get_overdue_status_tag_class(status: Optional[str]) -> str: """获取逾期状态标签样式类""" if status == '逾期': return 'pdf-tag-danger' if status == '未逾期': return 'pdf-tag-success' return 'pdf-tag-default' @staticmethod def get_overdue_time_text(status: Optional[str]) -> str: """获取逾期时间文本""" if status == '逾期': return '逾期' if status == '未逾期': return '正常' return '未知' @staticmethod def get_overdue_time_class(status: Optional[str]) -> str: """获取逾期时间样式类""" if status == '逾期': return 'pdf-time-danger' if status == '未逾期': return 'pdf-time-success' return 'pdf-time-default' def process_judicial_data(judicial_data: Dict[str, Any]) -> Dict[str, Any]: """处理司法涉诉数据""" processor = ReportDataProcessor() # 处理不同的数据结构 lawsuit_stat = None breach_case_list = [] consumption_restriction_list = [] # 如果数据在 judicial_data 下 if 'lawsuitStat' in judicial_data: lawsuit_stat = judicial_data.get('lawsuitStat', {}) breach_case_list = judicial_data.get('breachCaseList', []) consumption_restriction_list = judicial_data.get('consumptionRestrictionList', []) # 如果数据在 entout.data 下 elif 'entout' in judicial_data: entout_data = judicial_data.get('entout', {}).get('data', {}) lawsuit_stat = entout_data breach_case_list = judicial_data.get('breachCaseList', []) consumption_restriction_list = judicial_data.get('consumptionRestrictionList', []) # 如果直接就是 lawsuitStat 结构 elif 'count' in judicial_data or 'civil' in judicial_data: lawsuit_stat = judicial_data if not lawsuit_stat: lawsuit_stat = {} # 处理案件统计 count = lawsuit_stat.get('count', {}) # 处理各类案件 civil = lawsuit_stat.get('civil', {}) criminal = lawsuit_stat.get('criminal', {}) administrative = lawsuit_stat.get('administrative', {}) implement = lawsuit_stat.get('implement', {}) preservation = lawsuit_stat.get('preservation', {}) bankrupt = lawsuit_stat.get('bankrupt', {}) # 法院曝光台数据(用于替代谛听报告的法院曝光台) # 从 multCourtInfo 或司法涉诉数据中提取 court_exposure = { 'legal_cases': [], 'execution_cases': [], 'disin_cases': breach_case_list, 'limit_cases': consumption_restriction_list } # 如果有执行案件,添加到法院曝光台 if implement and implement.get('cases'): court_exposure['execution_cases'] = implement.get('cases', []) # 如果有民事案件,添加到法院曝光台 if civil and civil.get('cases'): court_exposure['legal_cases'].extend(civil.get('cases', [])) # 如果有刑事案件,添加到法院曝光台 if criminal and criminal.get('cases'): court_exposure['legal_cases'].extend(criminal.get('cases', [])) return { 'has_data': bool(count or civil or criminal or administrative or implement or preservation or bankrupt or breach_case_list or consumption_restriction_list), 'count': count, 'civil': civil, 'criminal': criminal, 'administrative': administrative, 'implement': implement, 'preservation': preservation, 'bankrupt': bankrupt, 'breach_case_list': breach_case_list, 'consumption_restriction_list': consumption_restriction_list, 'court_exposure': court_exposure } def process_report_data(data: Dict[str, Any], judicial_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """处理报告数据,准备模板变量""" processor = ReportDataProcessor() base_info = data.get('baseInfo', {}) check_suggest = data.get('checkSuggest', '') fraud_score = data.get('fraudScore', -1) credit_score = data.get('creditScore', -1) verify_rule = data.get('verifyRule', '') fraud_rule = data.get('fraudRule', '') element_verification_detail = data.get('elementVerificationDetail', {}) risk_warning = data.get('riskWarning', {}) overdue_risk_product = data.get('overdueRiskProduct', {}) loan_evaluation_verification_detail = data.get('loanEvaluationVerificationDetail', {}) leasing_risk_assessment = data.get('leasingRiskAssessment', {}) risk_supervision = data.get('riskSupervision', {}) # 处理基本信息 base_info_processed = { 'name_masked': processor.mask_name(base_info.get('name')), 'age': base_info.get('age', ''), 'sex': base_info.get('sex', ''), 'phone_masked': processor.mask_phone(base_info.get('phone')), 'id_card_masked': processor.mask_id_card(base_info.get('idCard')), 'location': base_info.get('location', ''), 'phone_area': base_info.get('phoneArea', '') } # 处理要素核查 element_verification = None if element_verification_detail: sfzeys_flag = element_verification_detail.get('sfzeysFlag', 0) sjsys_flag = element_verification_detail.get('sjsysFlag', 0) person_check_details = element_verification_detail.get('personCheckDetails', {}) phone_check_details = element_verification_detail.get('phoneCheckDetails', {}) if person_check_details or phone_check_details: element_verification = { 'sfzeys_flag': sfzeys_flag, 'sfzeys_flag_text': processor.get_risk_flag_text(sfzeys_flag), 'sfzeys_flag_tag_class': processor.get_risk_flag_tag_class(sfzeys_flag), 'sjsys_flag': sjsys_flag, 'sjsys_flag_text': processor.get_risk_flag_text(sjsys_flag), 'sjsys_flag_tag_class': processor.get_risk_flag_tag_class(sjsys_flag), 'person_check_details': person_check_details, 'phone_check_details': phone_check_details, 'person_result_text': processor.get_result_text(person_check_details.get('result')), 'person_result_class': processor.get_verification_result_class(person_check_details.get('result')), 'phone_result_text': processor.get_result_text(phone_check_details.get('result')), 'phone_result_class': processor.get_verification_result_class(phone_check_details.get('result')) } # 处理风险预警 risk_warning_processed = None if risk_warning: risks = processor.get_all_risks(risk_warning) if risks or risk_warning.get('totalRiskCounts'): risk_warning_processed = { 'has_data': True, 'total_risk_counts': risk_warning.get('totalRiskCounts', 0), 'high_risk_count': processor.get_high_risk_count(risk_warning), 'middle_risk_count': processor.get_middle_risk_count(risk_warning), 'level': risk_warning.get('level', '-'), 'risks': risks } # 处理逾期风险 overdue_risk_processed = None if overdue_risk_product: has_unsettled_overdue = overdue_risk_product.get('hasUnsettledOverdue') overdue_risk_processed = { 'has_data': True, 'status_text': processor.get_overdue_status_text(has_unsettled_overdue), 'status_tag_class': processor.get_overdue_status_tag_class(has_unsettled_overdue), 'current_overdue_institution_count': processor.format_institution_interval( overdue_risk_product.get('currentOverdueInstitutionCount') ), 'current_overdue_amount': processor.format_amount_interval( overdue_risk_product.get('currentOverdueAmount') ), 'settled_institution_count': processor.format_institution_interval( overdue_risk_product.get('settledInstitutionCount') ), 'total_loan_institutions': processor.format_institution_interval( overdue_risk_product.get('totalLoanInstitutions') ), 'time_1day_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast1Day')), 'time_1day_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast1Day')), 'time_7days_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast7Days')), 'time_7days_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast7Days')), 'time_14days_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast14Days')), 'time_14days_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast14Days')), 'time_30days_text': processor.get_overdue_time_text(overdue_risk_product.get('overdueLast30Days')), 'time_30days_class': processor.get_overdue_time_class(overdue_risk_product.get('overdueLast30Days')) } # 处理借贷评估 loan_evaluation_processed = None if loan_evaluation_verification_detail: risk_flag = loan_evaluation_verification_detail.get('riskFlag', 0) organ_loan_performances = loan_evaluation_verification_detail.get('organLoanPerformances', []) if organ_loan_performances: processed_performances = [] for item in organ_loan_performances: apply_count = item.get('applyCount', '') type_name = '银行机构' if apply_count == '银行' else '非银机构' processed_performances.append({ 'type_name': type_name, 'last7Day': item.get('last7Day', '0/0'), 'last15Day': item.get('last15Day', '0/0'), 'last1Month': item.get('last1Month', '0/0') }) loan_evaluation_processed = { 'has_data': True, 'risk_flag': risk_flag, 'risk_flag_text': processor.get_risk_flag_text(risk_flag), 'risk_flag_tag_class': processor.get_risk_flag_tag_class(risk_flag), 'organ_loan_performances': processed_performances } # 处理租赁风险评估 leasing_risk_processed = None if leasing_risk_assessment: risk_flag = leasing_risk_assessment.get('riskFlag', 0) leasing_risk_processed = { 'has_data': True, 'risk_flag': risk_flag, 'risk_flag_text': processor.get_risk_flag_text(risk_flag), 'risk_flag_tag_class': processor.get_risk_flag_tag_class(risk_flag), 'institution_total': leasing_risk_assessment.get('threeCInstitutionApplicationCountLast3Days', '0/0'), 'institution_weekend': leasing_risk_assessment.get('threeCInstitutionApplicationCountLast3DaysWeekend', '0/0'), 'institution_night': leasing_risk_assessment.get('threeCInstitutionApplicationCountLast3DaysNight', '0/0'), 'platform_total': leasing_risk_assessment.get('threeCPlatformApplicationCountLast3Days', '0/0'), 'platform_weekend': leasing_risk_assessment.get('threeCPlatformApplicationCountLast3DaysWeekend', '0/0'), 'platform_night': leasing_risk_assessment.get('threeCPlatformApplicationCountLast3DaysNight', '0/0') } # 处理运营商核验 operator_verification = None if element_verification_detail: online_risk_flag = element_verification_detail.get('onlineRiskFlag', 0) online_risk_list = element_verification_detail.get('onlineRiskList', {}) phone_vail_risk_flag = element_verification_detail.get('phoneVailRiskFlag', 0) phone_vail_risks = element_verification_detail.get('phoneVailRisks', {}) belong_risk_flag = element_verification_detail.get('belongRiskFlag', 0) belong_risks = element_verification_detail.get('belongRisks', {}) if online_risk_list or phone_vail_risks or belong_risks: operator_verification = { 'has_data': True, 'online_risk_flag': online_risk_flag, 'online_risk_flag_text': processor.get_risk_flag_text(online_risk_flag), 'online_risk_flag_tag_class': processor.get_risk_flag_tag_class(online_risk_flag), 'online_risk_list': online_risk_list, 'phone_vail_risk_flag': phone_vail_risk_flag, 'phone_vail_risk_flag_text': processor.get_risk_flag_text(phone_vail_risk_flag), 'phone_vail_risk_flag_tag_class': processor.get_risk_flag_tag_class(phone_vail_risk_flag), 'phone_vail_risks': phone_vail_risks, 'belong_risk_flag': belong_risk_flag, 'belong_risk_flag_text': processor.get_risk_flag_text(belong_risk_flag), 'belong_risk_flag_tag_class': processor.get_risk_flag_tag_class(belong_risk_flag), 'belong_risks': belong_risks } # 处理公安重点人员核验 key_person_verification = None if element_verification_detail: high_risk_flag = element_verification_detail.get('highRiskFlag', 0) key_person_check_list = element_verification_detail.get('keyPersonCheckList', {}) anti_fraud_info = element_verification_detail.get('antiFraudInfo', {}) if key_person_check_list or anti_fraud_info: key_person_verification = { 'has_data': True, 'high_risk_flag': high_risk_flag, 'high_risk_flag_text': processor.get_risk_flag_text(high_risk_flag), 'high_risk_flag_tag_class': processor.get_risk_flag_tag_class(high_risk_flag), 'key_person_check_list': key_person_check_list, 'anti_fraud_info': anti_fraud_info } # 处理法院曝光台(使用司法涉诉数据) court_exposure_processed = None if judicial_data: judicial_processed = process_judicial_data(judicial_data) if judicial_processed.get('has_data'): court_exposure_processed = judicial_processed.get('court_exposure', {}) # 处理司法涉诉数据 judicial_processed = None if judicial_data: judicial_processed = process_judicial_data(judicial_data) # 计算风险评分相关 fraud_risk_level = processor.get_fraud_risk_level(fraud_score) credit_level = processor.get_credit_level(credit_score) return { 'base_info': base_info_processed, 'check_suggest': check_suggest, 'check_suggest_class': processor.get_check_suggest_class(check_suggest), 'fraud_score': fraud_score, 'fraud_score_display': '未命中' if fraud_score == -1 else str(fraud_score), 'fraud_risk_level': fraud_risk_level, 'fraud_score_bg_class': processor.get_fraud_score_bg_class(fraud_score), 'fraud_risk_tag_class': processor.get_risk_tag_class(fraud_risk_level), 'credit_score': credit_score, 'credit_score_display': '未命中' if credit_score == -1 else str(credit_score), 'credit_level': credit_level, 'credit_score_bg_class': processor.get_credit_score_bg_class(credit_score), 'credit_risk_tag_class': processor.get_risk_tag_class(credit_level), 'verify_rule': verify_rule, 'verify_rule_class': processor.get_risk_level_class(verify_rule), 'verify_rule_tag_class': processor.get_risk_tag_class(verify_rule), 'fraud_rule': fraud_rule, 'fraud_rule_class': processor.get_risk_level_class(fraud_rule), 'fraud_rule_tag_class': processor.get_risk_tag_class(fraud_rule), 'element_verification': element_verification, 'operator_verification': operator_verification, 'key_person_verification': key_person_verification, 'overdue_risk': overdue_risk_processed, 'court_exposure': court_exposure_processed, 'loan_evaluation': loan_evaluation_processed, 'judicial_data': judicial_processed } def generate_pdf(data_file: str, output_file: str, template_dir: str = 'templates'): """生成 PDF 文件""" # 检查 WeasyPrint 是否可用 if not WEASYPRINT_AVAILABLE: print("=" * 60) print("错误:WeasyPrint 不可用") print("=" * 60) print(WEASYPRINT_ERROR) print("=" * 60) raise RuntimeError("WeasyPrint 未正确安装,请参考错误信息进行安装") # 读取数据文件 with open(data_file, 'r', encoding='utf-8') as f: if data_file.endswith('.json'): json_data = json.load(f) else: raise ValueError("不支持的文件格式,请使用 JSON 文件") # 如果是数组,查找 DWBG8B4D 和司法涉诉数据 report_data = None judicial_data = None if isinstance(json_data, list): for item in json_data: api_id = item.get('data', {}).get('apiID', '') if api_id in ['DWBG8B4D', 'CDWBG8B4D']: report_data = item.get('data', {}).get('data', {}) elif api_id in ['FLXG7E8F', 'FLXG0V4B', 'CFLXG0V4B']: # 司法涉诉数据可能在 data.data.judicial_data 或 data.data.entout data_content = item.get('data', {}).get('data', {}) if 'judicial_data' in data_content: judicial_data = data_content.get('judicial_data', {}) elif 'entout' in data_content: judicial_data = data_content else: judicial_data = data_content else: report_data = json_data if not report_data: raise ValueError("未找到 DWBG8B4D 数据") # 处理数据 template_vars = process_report_data(report_data, judicial_data) # 加载模板 env = Environment( loader=FileSystemLoader(template_dir), autoescape=select_autoescape(['html', 'xml']) ) template = env.get_template('report_template.html') # 渲染 HTML html_content = template.render(**template_vars) # 生成 PDF font_config = FontConfiguration() html_doc = HTML(string=html_content) html_doc.write_pdf(output_file, font_config=font_config) print(f"PDF 已生成: {output_file}") if __name__ == '__main__': import sys if len(sys.argv) < 2: print("用法: python generate_pdf.py <数据文件> [输出文件]") print("示例: python generate_pdf.py public/example.json output.pdf") sys.exit(1) data_file = sys.argv[1] output_file = sys.argv[2] if len(sys.argv) > 2 else 'report.pdf' # 确保模板目录存在 template_dir = Path('templates') if not template_dir.exists(): template_dir.mkdir() print(f"已创建模板目录: {template_dir}") try: generate_pdf(data_file, output_file) except Exception as e: print(f"生成 PDF 失败: {e}") import traceback traceback.print_exc() sys.exit(1)