Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pandas as pd
- import requests
- import os
- import time
- import json
- # ======================== 用户配置区域 ========================
- EXCEL_PATH = r"C:\Users\a\Desktop\沪港通企业列表.xlsx" # Excel文件路径
- CODE_COLUMN = '股票代码' # 代码列名
- START_DATE = '2014-11-27' # 开始日期(YYYY-MM-DD)
- END_DATE = '2015-06-30' # 结束日期
- REPORT_TYPES = {
- '定期报告': ('00', '00'),
- '其他': ('90', '90'),
- }
- SAVE_PATH = r'D:/SSE_Reports/' # 路径斜杠方向保持统一 # 存储路径
- # ============================================================
- HEADERS = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36 Edg/133.0.0.0',
- 'Referer': 'https://www.sse.com.cn/disclosure/listedinfo/announcement/'
- }
- # 新版接口参数模板(网页1)
- BASE_PARAMS = {
- 'jsonCallBack': 'jsonpCallback',
- 'isPagination': 'true',
- 'securityType': '0101,120100,020100,020200,120200', # 证券类型组合(网页1)
- 'pageSize': 100,
- 'pageNo': 1,
- '_': int(time.time()*1000)
- }
- def load_codes():
- """加载股票代码(带格式校验)"""
- try:
- df = pd.read_excel(EXCEL_PATH)
- df[CODE_COLUMN] = df[CODE_COLUMN].astype(str).str.strip().str.zfill(6)
- valid_codes = df[df[CODE_COLUMN].str.match('^\d{6}$')][CODE_COLUMN].tolist()
- print(f'加载到有效股票代码 {len(valid_codes)} 个')
- return valid_codes
- except Exception as e:
- print(f'文件读取失败: {str(e)}')
- exit()
- def get_reports(code):
- """获取报告列表(新版分页逻辑)"""
- all_reports = []
- current_page = 1
- while True:
- params = {
- **BASE_PARAMS,
- 'productId': code,
- 'reportType': REPORT_TYPES['定期报告'][0],
- 'reportType2': REPORT_TYPES['其他'][1],
- 'beginDate': START_DATE.replace('-', ''),
- 'endDate': END_DATE.replace('-', ''),
- 'pageNo': current_page
- }
- try:
- response = requests.get(
- 'http://query.sse.com.cn/security/stock/queryCompanyBulletin.do',
- headers=HEADERS,
- params=params,
- timeout=15
- )
- data_str = response.text.split('jsonpCallback(')[1].strip(')')
- data = json.loads(data_str)
- if 'result' in data and data['result']:
- all_reports.extend(data['result'])
- if data['pageHelp']['pageNo'] >= data['pageHelp']['pageCount']:
- break
- current_page += 1
- else:
- break
- except Exception as e:
- print(f"获取{code}失败: {str(e)}")
- break
- return all_reports
- def download_file(url, save_path):
- """新版文件下载(带SSL验证)"""
- try:
- response = requests.get(
- f'http://static.sse.com.cn{url}', # PDF路径规则(网页1)
- headers=HEADERS,
- stream=True,
- verify=True # 启用SSL验证
- )
- with open(save_path, 'wb') as f:
- for chunk in response.iter_content(2048):
- f.write(chunk)
- return True
- except Exception as e:
- print(f"下载失败: {str(e)}")
- return False
- def main():
- codes = load_codes()
- os.makedirs(SAVE_PATH, exist_ok=True)
- for idx, code in enumerate(codes, 1):
- print(f"\n正在处理 [{idx}/{len(codes)}] {code}")
- reports = get_reports(code)
- if not reports:
- print(f"{code} 未找到报告")
- continue
- valid_reports = [
- r for r in reports
- if any(r['TITLE'].startswith(rt[0]) for rt in REPORT_TYPES.values())
- ]
- print(f"发现 {len(valid_reports)} 份合规报告")
- success = 0
- for report in valid_reports:
- file_name = f"{code}_{report['TITLE'].replace(':', '')}.pdf"
- save_path = os.path.join(SAVE_PATH, file_name)
- if not os.path.exists(save_path):
- if download_file(report['URL'], save_path):
- success += 1
- time.sleep(1.5) # 反爬间隔
- else:
- print(f"重试下载: {file_name}")
- time.sleep(3)
- download_file(report['URL'], save_path) # 二次重试
- else:
- print(f"已存在: {file_name}")
- print(f"完成下载 {success} 份报告")
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement