扩展事件源

了解事件,首先要从 RQAlpha 的事件驱动说起。

RQAlpha 大部分的组件是以 add_listener 的方式进行事件的注册。举例来说:

  • 当Bar数据生成,则会触发 EVENT.BAR 事件,那么用户的 handle_bar 相关的代码注册了该事件则会立即执行。
  • 当订单成交,则会触发 EVENT.TRADE 事件,那么系统的账户模块因为注册了该事件,就可以立即计算成交以后的收益和资金变化。
  • 当订单下单,则会触发 EVENT.ORDER_PENDING_NEW 事件,前端风控模块注册了该事件,则可以立即对该订单进行审核,如果不满足风控要求,则直接指定执行 order._cancel(some_reason) 来保证有问题的订单不会进入实际下单环节。

程序化交易中很多需求,都可以通过注册事件的方式无缝插入到 RQAlpha 中进行扩展。

事件源分类

  • SystemEvent: 系统事件源
    • POST_SYSTEM_INIT: 系统初始化后触发
    • POST_USER_INIT: 策略的 init 函数执行后触发
    • POST_SYSTEM_RESTORED: 在实盘时,你可能需要在此事件后根据其他信息源对系统状态进行调整
  • MarketEvent: 市场及数据事件源
    • POST_UNIVERSE_CHANGED: 策略证券池发生变化后触发
    • PRE_BEFORE_TRADING: 执行 before_trading 函数前触发
    • BEFORE_TRADING: 该事件会触发策略的 before_trading 函数
    • POST_BEFORE_TRADING: 执行 before_trading 函数后触发
    • PRE_BAR: 执行 handle_bar 函数前触发
    • BAR: 该事件会触发策略的 handle_bar 函数
    • POST_BAR: 执行 handle_bar 函数后触发
    • PRE_TICK: 执行 handle_tick 前触发
    • TICK: 该事件会触发策略的 handle_tick 函数
    • POST_TICK: 执行 handle_tick 后触发
    • PRE_SCHEDULED: 在 scheduler 执行前触发
    • POST_SCHEDULED: 在 scheduler 执行后触发
    • PRE_AFTER_TRADING: 执行 after_trading 函数前触发
    • AFTER_TRADING: 该事件会触发策略的 after_trading 函数
    • POST_AFTER_TRADING: 执行 after_trading 函数后触发
    • PRE_SETTLEMENT: 结算前触发该事件
    • SETTLEMENT: 触发结算事件
    • POST_SETTLEMENT: 结算后触发该事件
  • OrderEvent: 交易事件源
    • ORDER_PENDING_NEW: 创建订单
    • ORDER_CREATION_PASS: 创建订单成功
    • ORDER_CREATION_REJECT: 创建订单失败
    • ORDER_PENDING_CANCEL: 创建撤单
    • ORDER_CANCELLATION_PASS: 撤销订单成功
    • ORDER_CANCELLATION_REJECT: 撤销订单失败
    • ORDER_UNSOLICITED_UPDATE: 订单状态更新
    • TRADE: 成交

事件源的订阅及使用

我们可以订阅需要的事件源,从而在该事件发生时实现指定需求。

下面以最简单的 Mod - ProgressMod 为例,介绍事件源的订阅和使用。

ProgressMod 需要实现的需求非常的简单:在命令行输出目前回测的进度条。

https://raw.githubusercontent.com/ricequant/rq-resource/master/rqalpha/progress_bar.png

首先定义一个 ProgressMod 类,继承与接口类 AbstractMod

from rqalpha.interface import AbstractMod

class ProgressMod(AbstractMod):

    def __init__(self):
        pass

    def start_up(self, env, mod_config):
        """
        RQAlpha 在系统启动时会调用此接口;在此接口中,可以通过调用 ``env`` 的相应方法来覆盖系统默认组件。

        :param env: 系统环境
        :type env: :class:`~Environment`
        :param mod_config: 模块配置参数
        """
        pass

    def tear_down(self, success, exception=None):
        """
        RQAlpha 在系统退出前会调用此接口。

        :param code: 退出代码
        :type code: rqalpha.const.EXIT_CODE
        :param exception: 如果在策略执行过程中出现错误,此对象为相应的异常对象
        """
        pass

我们将需求进行分拆:

  • 在回测开始时初始化进度条
  • 在回测每日交易结束后更新进度条
  • 在回测结束后,终止进度条

为了实现以上需求,我们需要注册两个事件:

  • EVENT.POST_SYSTEM_INIT 系统初始化后
  • EVENT.POST_AFTER_TRADING 交易结束后

进度条相关 我们使用 click 库来实现,具体 API 这里不详细展开。

接下来,我们在 start_up 函数中进行事件注册,并定义 _init_tick 函数来响应事件。

from rqalpha.interface import AbstractMod

class ProgressMod(AbstractMod):

    def __init__(self):
        self._env = None

    def start_up(self, env, mod_config):
        self._env = env
        env.event_bus.add_listener(EVENT.POST_AFTER_TRADING, self._tick)
        env.event_bus.add_listener(EVENT.POST_SYSTEM_INIT, self._init)

    def tear_down(self, success, exception=None):
        pass

    def _init(self, event):
        pass

    def _tick(self, event):
        pass

_init 函数中,初始化 progressBar,进度条的长度为回测的总时长

def _init(self):
    trading_length = len(self._env.config.base.trading_calendar)
    self.progress_bar = click.progressbar(length=trading_length, show_eta=False)

_tick 函数中,更新进度条

def _tick(self, event):
    self.progress_bar.update(1)

tear_down 函数中,终止进度条

def tear_down(self, success, exception=None):
    self.progress_bar.render_finish()

至此,我们就完成了整个 ProgressMod 的编写

import click

from rqalpha.interface import AbstractMod
from rqalpha.events import EVENT


class ProgressMod(AbstractMod):
    def __init__(self):
        self._env = None
        self.progress_bar = None

    def start_up(self, env, mod_config):
        self._env = env
        env.event_bus.add_listener(EVENT.POST_AFTER_TRADING, self._tick)
        env.event_bus.add_listener(EVENT.POST_SYSTEM_INIT, self._init)

    def _init(self, event):
        trading_length = len(self._env.config.base.trading_calendar)
        self.progress_bar = click.progressbar(length=trading_length, show_eta=False)

    def _tick(self, event):
        self.progress_bar.update(1)

    def tear_down(self, success, exception=None):
        self.progress_bar.render_finish()

最后,我们添加默认的载入函数 load_mod,一个完整的进度条的Mod就完成了

import click

from rqalpha.interface import AbstractMod
from rqalpha.events import EVENT


class ProgressMod(AbstractMod):
    def __init__(self):
        self._env = None
        self.progress_bar = None

    def start_up(self, env, mod_config):
        self._env = env
        env.event_bus.add_listener(EVENT.POST_AFTER_TRADING, self._tick)
        env.event_bus.add_listener(EVENT.POST_SYSTEM_INIT, self._init)

    def _init(self, event):
        trading_length = len(self._env.config.base.trading_calendar)
        self.progress_bar = click.progressbar(length=trading_length, show_eta=False)

    def _tick(self, event):
        self.progress_bar.update(1)

    def tear_down(self, success, exception=None):
        self.progress_bar.render_finish()


def load_mod():
    return ProgressMod()

事件源的扩展

上一节讲的是如何订阅事件源,那么如何发布事件呢?其实也很简单,只需要通过 publish_event 就可以进行事件的发布。

RQAlpha 整个回测模块是通过 rqalpha_mod_sys_simulation 实现的,其中定义了基于Bar回测的 event_sourcesimulation_broker, 其中包含了 MarketEvent 和 OrderEvent 大部分事件源的定义和发布。

我们简单来分析一下日线回测 simulation_event_source 中 MaketEvent 相关事件的触发流程。

class SimulationEventSource(AbstractEventSource):

    ...

    def events(self, start_date, end_date, frequency):
        # 根据起始日期和结束日期,获取所有的交易日,然后再循环获取每一个交易日
        for day in self._env.data_proxy.get_trading_dates(start_date, end_date):
            date = day.to_pydatetime()
            dt_before_trading = date.replace(hour=0, minute=0)
            dt_bar = date.replace(hour=15, minute=0)
            dt_after_trading = date.replace(hour=15, minute=30)
            dt_settlement = date.replace(hour=17, minute=0)

            yield Event(EVENT.BEFORE_TRADING, calendar_dt=dt_before_trading, trading_dt=dt_before_trading)
            yield Event(EVENT.BAR, calendar_dt=dt_bar, trading_dt=dt_bar)

            yield Event(EVENT.AFTER_TRADING, calendar_dt=dt_after_trading, trading_dt=dt_after_trading)
            yield Event(EVENT.SETTLEMENT, calendar_dt=dt_settlement, trading_dt=dt_settlement)

event 函数是一个generator, 在 rqalpha_mod_sys_simulation 中主要返回 BEFORE_TRADING, BAR, AFTER_TRADINGSETTLEMENT 事件。RQAlpha 在接受到对应的事件后,会自动的进行相应的 publish_event 操作,并且会自动 publish 相关的 PRE_POST_ 事件。

而在 simulation_broker 中可以看到,当被调用 cancel_order 时,会模拟撤单的执行流程,分别触发 ORDER_PENDING_CANCEL && ORDER_CANCELLATION_PASS 事件,并将 accountorder 传递给回调函数,使其可以获取其可能需要到的数据。

class SimulationBroker(AbstractBroker, Persistable):

    def cancel_order(self, order):
        account = self._get_account_for(order.order_book_id)

        self._env.event_bus.publish_event(Event(EVENT.ORDER_PENDING_CANCEL, account=account, order=order))

        order._mark_cancelled(_("{order_id} order has been cancelled by user.").format(order_id=order.order_id))

        self._env.event_bus.publish_event(Event(EVENT.ORDER_CANCELLATION_PASS, account=account, order=order))

        # account.on_order_cancellation_pass(order)
        try:
            self._open_orders.remove((account, order))
        except ValueError:
            try:
                self._delayed_orders.remove((account, order))
            except ValueError:
                pass

如果想查看详细的事件源相关的内容,建议直接阅读 rqalpha_mod_sys_simulation 源码,您会发现,扩展事件源比想象中要简单。

您也可以基于 rqalpha_mod_sys_simulation 扩展一个自定义的回测引擎,实现您特定的回测需求。