DataHandler

class base.DataHandler.DataHandler(sim_flag: bool, cn_flag: bool)

Bases: object

数据读取, 保存, 转换的处理中心.

其主要负责流入数据的处理和储存(包括暂存在内存中+储存进数据库中). 同时也可通过其调取数据库中的数据.

Attribute history_data

便于保存和调用历史数据的地方. 其暂存了所有通过接口保存的实时数据+调取的历史数据.

Attribute data_processor

一个:class:base.DataHandler.DataProcessor`类, 用于处理数据格式的转换功能(also known as `dirty work)

Attribute DBHandler

数据库接口, 参见:class:base.DBHandler.DBHandler.

Parameters
  • sim_flag – 是否为模拟状态(暂时没用)

  • cn_flag – 是否在国内

Methods Summary

load_data_into_array(data, channel)

将官网数据格式转化为统一的array格式

load_history_data(inst_dict)

读取数据库数据以初始化(or flush)history_data

merge_data(data1, data2)

将`data2`合并到`data1`中去.

process_realtime_data(msg, callback)

解析处理实时传入的数据

Methods Documentation

load_data_into_array(data: Dict[str, Any], channel: str)numpy.ndarray

将官网数据格式转化为统一的array格式

Parameters
  • data – 官网数据格式的Dict

  • channel – 数据所属的频道

Returns

转为了array格式的数据

load_history_data(inst_dict: Dict[str, Dict[str, Tuple[Optional[int]]]])

读取数据库数据以初始化(or flush)history_data

Parameters

inst_dict – 所需要读取的数据内容, 具体格式可见example.

Examples:
>>>
inst_dict:
    {inst_name: {"candle": (ts1, ts2),
                        "trades": (ts1, ts2), ...},
            ... : {...}}

ts1为起始时刻(ms), ts2为结束时刻(ms), 均可为None

merge_data(data1: Dict[str, Dict[str, numpy.ndarray]], data2: Dict[str, Dict[str, numpy.ndarray]])

将`data2`合并到`data1`中去.

两个数据都应是已经经过预处理转化后的格式.

process_realtime_data(msg: Dict[str, Any], callback: Callable[Dict[str, Dict[str, numpy.ndarray]], None])

解析处理实时传入的数据

解析新传入的数据, 若其是预先设定需要保存的数据类型(books, trades, candle) 则将其存入`history_data`中, 供之后调用. 完成之后调用`callback`进行处理. 同时使用另一个进程将数据存入数据库中.

Parameters
  • msg – 一个由JSON转化而来的字典, 包含实时的数据内容

  • callback – 在处理完实时数据(保存到array中)后调用, 包含新传入的数据 (但不包含历史数据!) 历史数据需要去 :py:attribute:`history_data` 中找