DataHub Unified Data Layer — DataHub 统一数据层
v1.0DataHub统一数据获取层 — 发布-订阅架构整合14个数据源,支持行情/财报/公告/宏观/研报/IR数据获取,智能缓存+优先级路由+模式匹配,提供quick_quote/quick_financial/quick_macro等便捷方法。触发词:获取数据、查行情、查财报、查宏观数据、DataHub、数据订阅、发布订阅。 (无需翻译,原文已包含中文描述)
运行时依赖
安装命令
点击复制技能文档
DataHub 统一数据获取层 版本:v1.0 状态:生产就绪 代码位置:(请配置您的DataHub安装路径)
何时激活 用户需要获取金融数据(行情、财报、公告、宏观、研报、IR资讯) 用户需要多数据源对比验证 用户提到发布-订阅、数据中枢、DataHub 用户需要批量获取数据(多只股票、多个指标) 用户需要自然语言查询金融数据 用户需要构建数据Pipeline 触发词:获取数据、查行情、查财报、宏观经济、DataHub、数据订阅、发布订阅、数据源
快速开始 方式一:便捷函数(推荐)
import asyncio import sys sys.path.insert(0, "") from datahub.production_startup import quick_quote, quick_financial, quick_macro, quick_queryasync def main(): # 1. 获取A股行情 quote = await quick_quote("600519", market='cn') print(f"贵州茅台: {quote}") # 2. 获取港股行情 quote_hk = await quick_quote("00656", market='hk') print(f"港股: {quote_hk}") # 3. 获取美股行情 quote_us = await quick_quote("AAPL", market='us') print(f"Apple: {quote_us}") # 4. 获取财报数据 financial = await quick_financial("600519", report_type='balance') print(f"资产负债表: {financial}") # 5. 获取宏观数据 gdp = await quick_macro("GDP", source='fred') print(f"GDP数据: {gdp}") # 6. 自然语言查询 result = await quick_query("贵州茅台股价") print(f"查询结果: {result}")
asyncio.run(main())
方式二:完整初始化
import asyncio import sys sys.path.insert(0, "") from datahub import get_datahub, register_all_producersasync def main(): # 初始化DataHub datahub = get_datahub() # 注册所有Producer(14个数据源) producers = register_all_producers(datahub) print(f"已注册 {len(producers)} 个Producer") # 订阅数据 received = asyncio.Event() result = {} async def callback(data): result['data'] = data result['status'] = 'success' received.set() # 订阅主题 datahub.subscribe("westock:cn:quote:600519", callback) # 请求刷新 await datahub.request_refresh("westock:cn:quote:600519") # 等待数据 await asyncio.wait_for(received.wait(), timeout=5.0) print(result)
asyncio.run(main())
方式三:生产环境管理器
import asyncio import sys sys.path.insert(0, "") from datahub.production_startup import get_productionasync def main(): # 获取生产环境实例(自动初始化) prod = await get_production() # 使用便捷方法 quote = await prod.get_quote("600519") financial = await prod.get_financial("600519", report_type='balance') # 获取统计信息 stats = prod.get_stats() print(f"统计: {stats}")
asyncio.run(main())
主题命名规范 格式:{producer}:{data_type}:{market}:{code} {producer}:{data_type}:{identifier} 示例: A股数据: westock:cn:quote:600519 — 腾讯自选股A股行情 a-stock-financial:balance:600519 — A股财报资产负债表 a-stock-financial:income:600519 — A股财报利润表 a-stock-financial:cashflow:600519 — A股财报现金流量表 cninfo:announcement:600519 — 巨潮资讯公告 akshare:quote:600519 — AkShare行情 neodata:query:贵州茅台股价 — 自然语言查询 港股数据: westock:hk:quote:00656 — 港股行情 hkex:annual:00656 — 港交所年报 irasia:news:00656 — IR Asia新闻 美股数据: yfinance:quote:AAPL — Yahoo Finance行情 sec:10k:AAPL — SEC 10-K文件 seekingalpha:rating:AAPL — Seeking Alpha评级 宏观数据: fred:series:GDP — FRED GDP数据 worldbank:indicator:NY.GDP.MKTP.CD — 世界银行GDP imf:dataflow:CPI — IMF CPI数据 dbnomics:series: — DBnomics数据 本地数据: wiki:entity:贵州茅台_600519.SH — 本地Wiki实体 wiki:concept:DCF估值 — 本地Wiki概念 wiki:raw:某报告.pdf — 本地Wiki原始文件
数据源优先级 优先级 Producer 数据源 类型 API需求 1 LocalWikiProducer 本地Wiki 本地 无 8 CninfoProducer 巨潮资讯 官方 无 9 SECEdgarProducer SEC EDGAR 官方 无 9 HKEXProducer 港交所披露易 官方 无 10 EastMoneyProducer 东方财富 在线 MX_APIKEY 11 THSProducer 同花顺问财 在线 IWENCAI_API_KEY 12 NeoDataProducer NeoData 在线 无 13 WeStockProducer 腾讯自选股 在线 无 15 AkShareProducer AkShare 开源 无 15 AStockFinancialProducer A股财报 开源 无 16 YFinanceProducer YFinance 开源 无 20 SeekingAlphaProducer Seeking Alpha 第三方 无 30 FREDProducer FRED 官方 FRED_API_KEY 31 DBnomicsProducer DBnomics 官方 无 32 WorldBankProducer 世界银行 官方 无 33 IMFProducer IMF 官方 IMF_COOKIES 40 AlphaVantageProducer Alpha Vantage 第三方 ALPHAVANTAGE_API_KEY 50 IRWebsiteProducer IR网站 IR 无 50 IRAsiaProducer IR Asia IR 无 优先级规则:数字越小优先级越高。本地数据 > 官方数据 > 在线数据 > 开源数据 > 第三方数据
核心功能
- 发布-订阅机制
from datahub import get_datahub, register_all_producers datahub = get_datahub() register_all_producers(datahub) # 精确订阅 def callback(data): print(f"收到数据: {data}") datahub.subscribe("westock:cn:quote:600519", callback) # 模式订阅(通配符) datahub.subscribe_pattern("westock:cn:quote:", callback) # 请求刷新 await datahub.request_refresh("westock:cn:quote:600519")
- 缓存系统
from datahub import get_preconfigured_cache
# 预配置缓存类型
cache = get_preconfigured_cache('quote')
# 5分钟TTL
# 缓存操作
await cache.set("600519", {"price": 1800})
data = await cache.get("600519")
stats = cache.stats()
# {'size': 1, 'max_size': 500, 'ttl': 300}
预配置缓存: 类型 TTL 最大条目 适用场景 quote 5分钟 500 实时行情 financial 1小时 200 财务报表 announcement 30分钟 300 公司公告 macro 1天 100 宏观数据 research 2小时 200 研报评级 ir 1小时 200 IR资讯
- 重试机制
from datahub import with_retry
@with_retry(max_retries=3, base_delay=1.0, max_delay=60.0)
async def fetch_data():
# 自动重试(指数退避:1秒、2秒、4秒)
return await api_call()
- 性能监控
stats = datahub.get_stats()
print(f"总发布数: {stats['total_publishes']}")
print(f"总错误数: {stats['total_errors']}")
print(f"主题数: {stats['topics_count']}")
print(f"Producer数: {stats['producers_count']}")