Skip to main content
Skip to main content
数据获取 - DataSource 模块
文档类型
概览
相关模块
data source
相关版本
v0.x
发布日期
2026-02-15

Data Source 模块概览

提示:本文档提供快速上手指南。如需了解详细的设计理念、架构设计和决策记录,请参考 architecture.mddecisions.md。**API 级说明(类、方法、参数)**见 api.md

📋 模块简介

Data Source 模块是一个灵活、简单、强大的数据获取框架,用于统一管理从多个第三方数据源(如 Tushare、AKShare、EastMoney)获取数据的过程。

核心能力

  • 统一数据格式:获取的数据会被整理成统一格式(由 Schema 定义)
  • 多源数据融合:一个标准数据可以来源于多个数据源(如 K 线来自 Tushare,复权因子来自 AKShare)
  • 灵活实现方式:一个 DataSource 支持定义多种获取方法,可轻松扩展或更改实现

核心特性

  • 配置驱动:通过 mapping.json 配置 handler 的启用、依赖和参数
  • 框架定义准则,用户控制实现:框架定义 Schema,用户实现 Handler
  • 多实现共存:一个 data source 可以有多个 handler 实现,但运行时只能选择一个
  • 运行时切换:通过配置文件切换 handler,无需修改代码
  • 三层架构:Manager(协调层)→ Handler(业务层)→ Provider(基础层)

🎯 核心概念

业务名词

名词定义说明
DataSource标准数据源框架需要的一种数据类型(如 K 线、财务数据)
Schema标准数据源格式定义数据字段、类型、验证规则
Handler执行器实现数据获取逻辑,将多个 Provider 的数据合并成 Schema 格式
Provider第三方数据供应商封装第三方 API(如 Tushare、AKShare),提供 token、API 调用、限流信息
mapping.json映射配置声明标准数据源(DataSource)与执行器(Handler)的对应关系

业务链路

完整流程

DataSource(标准数据源)
    ↓ 对应唯一
Schema(标准数据格式定义)
    ↓ 通过映射配置
mapping.json(指定使用哪个 Handler)
    ↓ 运行时选择
Handler(执行器,运行时只能选择一个)
    ↓ 调用多个
Provider(第三方数据供应商 API)
    ↓ 合并数据
Schema 格式的标准输出
    ↓ 可选
Data Manager(数据库存储)

关键设计

  1. Provider 层:用户自定义,包含三个要素:
    • Token/认证信息(如需要)
    • 数据获取 API 方法
    • 限流信息声明
  2. Handler 层:通过自定义逻辑合并多个 Provider 的数据,输出符合 Schema 格式的标准数据
    • 提供丰富的生命周期钩子函数
    • 可在 after_normalize() 钩子中使用 Data Manager 进行数据库存储
  3. 映射关系
    • 一个 DataSource 对应一个 Schema(1:1)
    • 一个 DataSource 可以有多个 Handler 实现,但运行时只能选择一个(通过 mapping.json 声明)
    • 一个 Schema 的字段可以来源于多个 Provider API(Handler 负责合并)

详细的设计理念和架构说明请参考 architecture.md

📦 模块的组件

DataSourceManager (协调层)
    │
    ├── BaseHandler (业务层)
    │       │
    │       ├── ApiJob / ApiJobBundle (API 任务与批次)
    │       │
    │       └── ApiJobExecutor (API 任务执行器,含限流/并发)
    │
    └── BaseProvider (基础层)

📁 模块的文件夹结构

core/modules/data_source/
├── __init__.py
├── data_source_manager.py          # DataSourceManager 主类(协调层)
├── execution_scheduler.py          # DataSourceExecutionScheduler:调度各 Handler 执行
├── renew_manager.py                # RenewManager:日期范围与续跑编排
├── base_class/                     # 基类
│   ├── base_handler.py             # Handler 基类 BaseHandler
│   └── base_provider.py            # Provider 基类 BaseProvider
│
├── data_class/                     # 数据类定义
│   ├── api_job.py                  # ApiJob
│   ├── api_job_bundle.py           # ApiJobBundle(批次)
│   ├── config.py                   # DataSourceConfig
│   ├── handler_mapping.py          # HandlerMapping
│   ├── schema.py                   # Schema 定义
│   ├── field.py                    # Field 定义
│   └── ...                         # 其他数据类(error、renew_config 等)
│
├── service/                        # 服务层与 Helper
│   ├── api_job_executor.py         # ApiJobExecutor:执行 ApiJob/ApiJobBundle,包含限流与并发
│   ├── normalization/              # 标准化 service + helper
│   ├── date_range/                 # 日期范围 & renew 计算 service + helper
│   ├── renew/                      # incremental/rolling/refresh renew service
│   ├── executor/                   # bundle 执行与 fetched_data 重组
│   ├── persistence/                # persistence_service:写入绑定表
│   ├── utils/                      # 通用工具(如 record_utils)
│   └── ...                         # 其他 manager/helper
│
├── reserved_dependencies.py        # 预置依赖定义
└── examples/                       # 示例脚本(极端 renew 场景等)

🚀 模块的使用方法

基本使用

from core.modules.data_source.data_source_manager import DataSourceManager

# 初始化
manager = DataSourceManager(is_verbose=False)

# 执行数据更新(配置驱动,自动运行所有启用的 handler)
await manager.renew_data(
    latest_completed_trading_date="20251222",
    stock_list=None,  # 如果不提供,会自动从数据库读取
    test_mode=False,
    dry_run=False
)

测试单个 Handler

# 测试单个 handler(需要 handler 在 mapping.json 中启用)
result = await manager.fetch(
    "kline",
    context={
        "latest_completed_trading_date": "20251222",
        "stock_list": ["000001.SZ", "000002.SZ"]
    }
)

其他概念

概念作用关键点
ApiJob单个 API 调用任务包含 Provider、方法、参数、依赖
ApiJobBundle一批 API 任务表示一个批次(如某一实体的一组 ApiJobs)
ApiJobExecutorAPI 任务执行引擎依赖解析、限流、并发执行

📚 模块详细文档

  • architecture.md:架构文档,包含详细的技术设计、核心组件、运行时 Workflow
  • decisions.md:重要决策记录,包含架构设计决策和理由

阅读建议:先阅读本文档快速上手,然后阅读 architecture.md 了解详细设计,最后阅读 decisions.md 了解设计决策。


文档结束