更多用例 - 使用DataSource模块获取数据

如果您遇到了以下场景:

我的框架有了,可是我如何将我自己的数据源接入这个框架呢?

我发现我的框架可能需要多个数据源,一部分是我自己买的,可是还有一部分需要从另一个供应商获取,我该怎么办?

Datasource模块就是用来解决您上述的问题的。他的主要作用是获取外部第三方数据的模块,可以同时支持多个数据源。模块主要使用的是配置驱动+基类复写的方式完成扩展的。

位置:core/modules/data_source

文档:

在看到更多用例之前,我们来明确一些基本概念和术语,这样能更好地帮助您使用这个模块:

Provider:数据供应商,比如新浪财经,东方财富,Akshare,Tushare系统默认自带了一些例子,其他的供应商或API需要用户自己定义

Schema:数据格式规范。我们从数据源(可以是多个)获得数据后,需要把多个API的数据整合成我们最后需要存储或使用的一个标准格式,这个格式就是由Schema定义的。这里我们说的schema都定义在我们数据库表的定义里,比如core/tables/macro/cpi/schema.py

Handler: 处理与第三方数据源对接,获取数据,重组数据,最后入库的一个处理器。我们作为用户主要定义的就是这个handler以及其对应的配置config

Mapping:位置:userspace/data_source/mapping.py 这里是定义我们的每个数据源是通过哪个handler从哪个provider里获取数据的关系表

 

注意:

一个data source只能对应一个schema(数据表),多个获取数据的API,其中这些API可以来自不同的Provider。

一个data source最终只能由一个handler来完成,但这个data source可以有多个handler。(可以把handler看成获取数据的手段,手段可以是多个,但最终落地执行的方案只能是一个)

 

接下来我们来看一个常用的例子:

获取中国过去N年的CPI数据:

第一步:整理需求:

数据供应商:Tushare

Tushare API

ts.pro_api().get_cpi(start_date, end_date)

限流:每分钟10次就够了

获取数据后存入本地数据表:sys_cpi

更新数据:因为CPI发布时间不固定而且发布后数日/周/月内还可能会进行修正,因此滚动获取12个月的数据每次覆盖数据库了现有的数据好想会合理一些。

第二步:定义provider

那么这时候我只需要在userspace的data_source里新建一个provider叫tushare,它的主要作用是验证token,定义前置API变量比如 self.api = ts.pro_api() (这样您就可以直接使用get_cpi 而不是每次都写 ts.pro_api().get_cpi)

这一步框架里有,您可以参考代码:userspace/data_source/providers/tushare/provider.py

第三步:声明config

首先,创建config文件。

进入userspace/data_source/ 拷贝 config.example.pyhandlers/cpi/ 下并重命名为config.py

完整路径:userspace/data_source/handlers/cpi/config.py

接下来,我们需要声明这个数据源获取数据后存入哪张数据表,这里我们要存入 sys_cpi 表 (此表的schema可在core/tables/macro/cpi/schema.py里查看)

CONFIG = {
	"table": "sys_cpi"
}

 

再次,我需要申明这个数据获取使用的是Tushare里的get_cpi这个API,变成设置就是:

CONFIG = {
	"table": "sys_cpi",
	
	"apis": {
        "cpi_data": {
            "provider_name": "tushare",
            "method": "get_cpi",
        },
    },
}   

注:

  1. 这里 cpi_data 就是一个API请求的名字,可以随便取,只是不要在apis里重复就行
  2. provider_name 是指当前使用tushare的API,从这里可以看出,我们一个数据源的获取是可以使用多个provider的多个API的

 

再次,当我获取完第一次数据后之后要从当月起的前12个月再次获取数据并且覆盖数据库的数据以达到修正的目的,那么:

CONFIG = {
	"table": "sys_cpi",
	
	"apis": {
        "cpi_data": {
            "provider_name": "tushare",
            "method": "get_cpi",
        },
    },
    
    "renew": {
        "type": "rolling",
        "rolling": {
            "unit": DateUtils.PERIOD_MONTH,
            "length": 12,
        },
    },
}

到这里为止,其实我们的基本配置已经完成了,最后还需要加一些细节:

CONFIG = {
	"table": "sys_cpi",
	
	"apis": {
        "cpi_data": {
            "provider_name": "tushare",
            "method": "get_cpi",
            "max_per_minute": 10,
            "result_mapping": {
                "date": "month",
                "cpi": "nt_val",
                "cpi_yoy": "nt_yoy",
                "cpi_mom": "nt_mom",
            },
        },
    },
    
    "renew": {
        "type": "rolling",
        "rolling": {
            "unit": DateUtils.PERIOD_MONTH,
            "length": 12,
        },
        "last_update_info": {
            "date_field": "date",
            "date_format": DateUtils.PERIOD_MONTH,
        },
    },
}
  • max_per_minute: 请求每分钟最多发生10次
  • result_mapping:在TushareAPI返回的month值写入我们db table里的date字段,其他以此类推
  • last_update_info:我们从db的什么地方获取上次更新的信息
    • date_field:我们从这个属性指向的字段中获得时间信息
    • date_format:我们的日期该转化成什么格式

至此,我们就完成了这个data source的所有配置。

第四步:定义handler

在当前的例子里,我们的CPI非常简单,就是call一个Tushare的API拿到数据再入库,我们的框架默认会根据我们的配置自动执行流程,不需要额外操作。

因此,我们只需要创建一个空的handler就可以了

注意:

  • handler一定要继承 BaseHandler
  • 这个类名后续要在mapping中注册使用
from core.modules.data_source.base_class.base_handler import BaseHandler

class CpiHandler(BaseHandler):
    pass

最后一步:注册进入mapping

最后一步就是告诉我们的系统我们新加了一个数据源的获取方式,需要告诉系统去哪儿找它。

找到 userspace/data_source/mapping.py

在里边新加一个data source用以下格式:

[data source name]: {
    handler: [your handler class name],
    is_enabled: True / False
    depends_on: [any dependencies on other data source?]
}

按照这个规则,我们需要新加一个:

    "cpi": {
        "handler": "cpi.CpiHandler",
        "is_enabled": True,
        "depends_on": ["latest_trading_date"],
    },

这里的cpi.CpiHandler就是指handler下的cpi文件夹下的handler里的CpiHandler这个类(我们上一步定义的)

  • is_enabled:是否开启这个provider
  • depends_on:我们这个handler需要依赖最后完成的交易日这个数据(好倒推12个月)

到这里,我们就完整地完成了整个CPI获取的流程和代码

测试数据获取

在项目更目录运行:

python start-cli.py -renew

或者是:

python start-cli.py -r

就可以看到新的data source运行了。

记得调试的时候在mapping中把其他data source的is_enabled都变成False会更直观