如果您遇到了以下场景:
我的框架有了,可是我如何将我自己的数据源接入这个框架呢?
我发现我的框架可能需要多个数据源,一部分是我自己买的,可是还有一部分需要从另一个供应商获取,我该怎么办?
Datasource模块就是用来解决您上述的问题的。他的主要作用是获取外部第三方数据的模块,可以同时支持多个数据源。模块主要使用的是配置驱动+基类复写的方式完成扩展的。
位置:core/modules/data_source
文档:
在看到更多用例之前,我们来明确一些基本概念和术语,这样能更好地帮助您使用这个模块:
Provider:数据供应商,比如新浪财经,东方财富,Akshare,Tushare系统默认自带了一些例子,其他的供应商或API需要用户自己定义
Schema:数据格式规范。我们从数据源(可以是多个)获得数据后,需要把多个API的数据整合成我们最后需要存储或使用的一个标准格式,这个格式就是由Schema定义的。这里我们说的schema都定义在我们数据库表的定义里,比如core/tables/macro/cpi/schema.py
Handler: 处理与第三方数据源对接,获取数据,重组数据,最后入库的一个处理器。我们作为用户主要定义的就是这个handler以及其对应的配置config
Mapping:位置:userspace/data_source/mapping.py 这里是定义我们的每个数据源是通过哪个handler从哪个provider里获取数据的关系表
注意:
一个data source只能对应一个schema(数据表),多个获取数据的API,其中这些API可以来自不同的Provider。
一个data source最终只能由一个handler来完成,但这个data source可以有多个handler。(可以把handler看成获取数据的手段,手段可以是多个,但最终落地执行的方案只能是一个)
接下来我们来看一个常用的例子:
获取中国过去N年的CPI数据:
第一步:整理需求:
数据供应商:Tushare
Tushare API:
ts.pro_api().get_cpi(start_date, end_date)限流:每分钟10次就够了
获取数据后存入本地数据表:sys_cpi
更新数据:因为CPI发布时间不固定而且发布后数日/周/月内还可能会进行修正,因此滚动获取12个月的数据每次覆盖数据库了现有的数据好想会合理一些。
第二步:定义provider
那么这时候我只需要在userspace的data_source里新建一个provider叫tushare,它的主要作用是验证token,定义前置API变量比如 self.api = ts.pro_api() (这样您就可以直接使用get_cpi 而不是每次都写 ts.pro_api().get_cpi)
这一步框架里有,您可以参考代码:userspace/data_source/providers/tushare/provider.py
第三步:声明config
首先,创建config文件。
进入userspace/data_source/ 拷贝 config.example.py 到 handlers/cpi/ 下并重命名为config.py
完整路径:userspace/data_source/handlers/cpi/config.py
接下来,我们需要声明这个数据源获取数据后存入哪张数据表,这里我们要存入 sys_cpi 表 (此表的schema可在core/tables/macro/cpi/schema.py里查看)
CONFIG = {
"table": "sys_cpi"
}
再次,我需要申明这个数据获取使用的是Tushare里的get_cpi这个API,变成设置就是:
CONFIG = {
"table": "sys_cpi",
"apis": {
"cpi_data": {
"provider_name": "tushare",
"method": "get_cpi",
},
},
} 注:
- 这里 cpi_data 就是一个API请求的名字,可以随便取,只是不要在apis里重复就行
- provider_name 是指当前使用tushare的API,从这里可以看出,我们一个数据源的获取是可以使用多个provider的多个API的
再次,当我获取完第一次数据后之后要从当月起的前12个月再次获取数据并且覆盖数据库的数据以达到修正的目的,那么:
CONFIG = {
"table": "sys_cpi",
"apis": {
"cpi_data": {
"provider_name": "tushare",
"method": "get_cpi",
},
},
"renew": {
"type": "rolling",
"rolling": {
"unit": DateUtils.PERIOD_MONTH,
"length": 12,
},
},
}到这里为止,其实我们的基本配置已经完成了,最后还需要加一些细节:
CONFIG = {
"table": "sys_cpi",
"apis": {
"cpi_data": {
"provider_name": "tushare",
"method": "get_cpi",
"max_per_minute": 10,
"result_mapping": {
"date": "month",
"cpi": "nt_val",
"cpi_yoy": "nt_yoy",
"cpi_mom": "nt_mom",
},
},
},
"renew": {
"type": "rolling",
"rolling": {
"unit": DateUtils.PERIOD_MONTH,
"length": 12,
},
"last_update_info": {
"date_field": "date",
"date_format": DateUtils.PERIOD_MONTH,
},
},
}- max_per_minute: 请求每分钟最多发生10次
- result_mapping:在TushareAPI返回的month值写入我们db table里的date字段,其他以此类推
- last_update_info:我们从db的什么地方获取上次更新的信息
- date_field:我们从这个属性指向的字段中获得时间信息
- date_format:我们的日期该转化成什么格式
至此,我们就完成了这个data source的所有配置。
第四步:定义handler
在当前的例子里,我们的CPI非常简单,就是call一个Tushare的API拿到数据再入库,我们的框架默认会根据我们的配置自动执行流程,不需要额外操作。
因此,我们只需要创建一个空的handler就可以了
注意:
- handler一定要继承 BaseHandler
- 这个类名后续要在mapping中注册使用
from core.modules.data_source.base_class.base_handler import BaseHandler
class CpiHandler(BaseHandler):
pass最后一步:注册进入mapping
最后一步就是告诉我们的系统我们新加了一个数据源的获取方式,需要告诉系统去哪儿找它。
找到 userspace/data_source/mapping.py
在里边新加一个data source用以下格式:
[data source name]: {
handler: [your handler class name],
is_enabled: True / False
depends_on: [any dependencies on other data source?]
}按照这个规则,我们需要新加一个:
"cpi": {
"handler": "cpi.CpiHandler",
"is_enabled": True,
"depends_on": ["latest_trading_date"],
},这里的cpi.CpiHandler就是指handler下的cpi文件夹下的handler里的CpiHandler这个类(我们上一步定义的)
- is_enabled:是否开启这个provider
- depends_on:我们这个handler需要依赖最后完成的交易日这个数据(好倒推12个月)
到这里,我们就完整地完成了整个CPI获取的流程和代码
测试数据获取
在项目更目录运行:
python start-cli.py -renew或者是:
python start-cli.py -r就可以看到新的data source运行了。
记得调试的时候在mapping中把其他data source的is_enabled都变成False会更直观