rqalpha.portfolio.account 源代码

# -*- coding: utf-8 -*-
# 版权所有 2019 深圳米筐科技有限公司(下称“米筐科技”)
#
# 除非遵守当前许可,否则不得使用本软件。
#
#     * 非商业用途(非商业用途指个人出于非商业目的使用本软件,或者高校、研究所等非营利机构出于教育、科研等目的使用本软件):
#         遵守 Apache License 2.0(下称“Apache 2.0 许可”),
#         您可以在以下位置获得 Apache 2.0 许可的副本:http://www.apache.org/licenses/LICENSE-2.0。
#         除非法律有要求或以书面形式达成协议,否则本软件分发时需保持当前许可“原样”不变,且不得附加任何条件。
#
#     * 商业用途(商业用途指个人出于任何商业目的使用本软件,或者法人或其他组织出于任何目的使用本软件):
#         未经米筐科技授权,任何个人不得出于任何商业目的使用本软件(包括但不限于向第三方提供、销售、出租、出借、转让本软件、
#         本软件的衍生产品、引用或借鉴了本软件功能或源代码的产品或服务),任何法人或其他组织不得出于任何目的使用本软件,
#         否则米筐科技有权追究相应的知识产权侵权责任。
#         在此前提下,对本软件的使用同样需要遵守 Apache 2.0 许可,Apache 2.0 许可与本许可冲突之处,以本许可为准。
#         详细的授权流程,请联系 public@ricequant.com 获取。

from itertools import chain
from datetime import date
from typing import Callable, Dict, Iterable, List, Optional, Union, Tuple

import six
from rqalpha.const import POSITION_DIRECTION, POSITION_EFFECT, DEFAULT_ACCOUNT_TYPE, DAYS_CNT
from rqalpha.environment import Environment
from rqalpha.core.events import EVENT
from rqalpha.model.order import Order, OrderStyle
from rqalpha.model.trade import Trade
from rqalpha.utils.class_helper import deprecated_property
from rqalpha.utils.functools import lru_cache
from rqalpha.utils.i18n import gettext as _
from rqalpha.utils.logger import user_system_log
from rqalpha.portfolio.position import Position, PositionProxyDict

OrderApiType = Callable[[str, Union[int, float], OrderStyle, bool], List[Order]]


class AccountMeta(type):
    def __new__(mcs, *args, **kwargs):
        cls = type.__new__(mcs, *args, **kwargs)
        cls._margin = cls.margin
        cls.margin = property(lambda s: 0)  # black magic: improve performance for pure stock strategy
        return cls


[文档]class Account(metaclass=AccountMeta): """ 账户,多种持仓和现金的集合。 不同品种的合约持仓可能归属于不同的账户,如股票、转债、场内基金、ETF 期权归属于股票账户,期货、期货期权归属于期货账户 """ __abandon_properties__ = [ "holding_pnl", "realized_pnl", "dividend_receivable", ] def __init__( self, account_type: str, total_cash: float, init_positions: Dict[str, Tuple[int, Optional[float]]], financing_rate: float ): self._type = account_type self._total_cash = total_cash # 包含保证金的总资金 self._env = Environment.get_instance() self._positions: Dict[str, Dict[POSITION_DIRECTION, Position]] = {} self._backward_trade_set = set() self._frozen_cash = 0 self._pending_deposit_withdraw: List[Tuple[date, float]] = [] self._cash_liabilities = 0 # 现金负债 self.register_event() self._management_fee_calculator_func = lambda account, rate: account.total_value * rate self._management_fee_rate = 0.0 self._management_fees = 0.0 # 融资利率/年 self._financing_rate = financing_rate for order_book_id, (init_quantity, init_price) in init_positions.items(): position_direction = POSITION_DIRECTION.LONG if init_quantity > 0 else POSITION_DIRECTION.SHORT self._get_or_create_pos(order_book_id, position_direction, init_quantity, init_price) def __repr__(self): positions_repr = {} for order_book_id, positions in self._positions.items(): for direction, position in positions.items(): if position.quantity != 0: positions_repr.setdefault(order_book_id, {})[direction.value] = position.quantity return "Account(cash={}, total_value={}, positions={})".format( self.cash, self.total_value, positions_repr ) def register_event(self): event_bus = self._env.event_bus event_bus.add_listener( EVENT.TRADE, lambda e: self.apply_trade(e.trade, e.order) if e.account == self else None ) event_bus.add_listener(EVENT.ORDER_PENDING_NEW, self._on_order_pending_new) event_bus.add_listener(EVENT.ORDER_CREATION_REJECT, self._on_order_unsolicited_update) event_bus.add_listener(EVENT.ORDER_UNSOLICITED_UPDATE, self._on_order_unsolicited_update) event_bus.add_listener(EVENT.ORDER_CANCELLATION_PASS, self._on_order_unsolicited_update) event_bus.add_listener(EVENT.PRE_BEFORE_TRADING, self._on_before_trading) event_bus.add_listener(EVENT.SETTLEMENT, self._on_settlement) event_bus.prepend_listener(EVENT.BAR, self._on_bar) event_bus.prepend_listener(EVENT.TICK, self._on_tick) def get_state(self): return { 'positions': { order_book_id: { POSITION_DIRECTION.LONG: positions[POSITION_DIRECTION.LONG].get_state(), POSITION_DIRECTION.SHORT: positions[POSITION_DIRECTION.SHORT].get_state() } for order_book_id, positions in self._positions.items() }, 'frozen_cash': self._frozen_cash, "total_cash": self._total_cash, 'backward_trade_set': list(self._backward_trade_set), } def set_state(self, state): self._frozen_cash = state['frozen_cash'] self._backward_trade_set = set(state['backward_trade_set']) self._total_cash = state["total_cash"] self._positions.clear() for order_book_id, positions_state in state['positions'].items(): for direction in POSITION_DIRECTION: position = self._get_or_create_pos(order_book_id, direction) if direction in positions_state.keys(): position.set_state(positions_state[direction]) else: position.set_state(positions_state[direction.lower()]) def fast_forward(self, orders=None, trades=None): if trades: close_trades = [] # 先处理开仓 for trade in trades: if trade.exec_id in self._backward_trade_set: continue if trade.position_effect == POSITION_EFFECT.OPEN: self.apply_trade(trade) else: close_trades.append(trade) # 后处理平仓 for trade in close_trades: self.apply_trade(trade) # 计算 Frozen Cash if orders: self._frozen_cash = sum( order.unfilled_quantity * order.quantity / order.init_frozen_cash for order in orders if order.is_active())
[文档] def get_positions(self): # type: () -> Iterable[Position] """ 获取所有持仓对象列表, """ for position in self._iter_pos(): if position.quantity == 0 and position.equity == 0: continue yield position
[文档] def get_position(self, order_book_id: str, direction: POSITION_DIRECTION = POSITION_DIRECTION.LONG) -> Position: """ 获取某个标的的持仓对象 :param order_book_id: 标的编号 :param direction: 持仓方向 """ try: return self._positions[order_book_id][direction] except KeyError: return Position(order_book_id, direction)
def calc_close_today_amount(self, order_book_id, trade_amount, position_direction, position_effect): return self._get_or_create_pos(order_book_id, position_direction).calc_close_today_amount(trade_amount, position_effect) @property def type(self): return self._type @property @lru_cache(None) def positions(self): return PositionProxyDict(self._positions) @property def frozen_cash(self): # type: () -> float """ 冻结资金 """ return self._frozen_cash @property def cash(self): # type: () -> float """ 可用资金 """ return self._total_cash - self.margin - self._frozen_cash @property def market_value(self): # type: () -> float """ [float] 市值 """ return sum(p.market_value * (1 if p.direction == POSITION_DIRECTION.LONG else -1) for p in self._iter_pos()) @property def transaction_cost(self): # type: () -> float """ 总费用 """ return sum(p.transaction_cost for p in self._iter_pos()) @property def cash_liabilities(self): # type: () -> float """ 现金负债 """ return self._cash_liabilities @property def cash_liabilities_interest(self): # type: () -> float """ 现金负债当日的利息 """ return self._cash_liabilities * self._financing_rate / DAYS_CNT.DAYS_A_YEAR @property def margin(self) -> float: """ 总保证金 """ return sum(getattr(p, "margin", 0) for p in self._iter_pos()) @property def buy_margin(self): # type: () -> float """ 多方向保证金 """ return sum(getattr(p, "margin", 0) for p in self._iter_pos(POSITION_DIRECTION.LONG)) @property def sell_margin(self): # type: () -> float """ 空方向保证金 """ return sum(getattr(p, "margin", 0) for p in self._iter_pos(POSITION_DIRECTION.SHORT)) @property def daily_pnl(self): # type: () -> float """ 当日盈亏 """ return self.trading_pnl + self.position_pnl - self.transaction_cost - self.cash_liabilities_interest @property def position_equity(self): # type: () -> float """ 持仓总权益 """ return sum(p.equity for p in self._iter_pos()) @property def total_value(self) -> float: """ 账户总权益 """ total_value = self._total_cash + self.position_equity - self.cash_liabilities - self.cash_liabilities_interest if self._pending_deposit_withdraw: total_value += sum(amount for _, amount in self._pending_deposit_withdraw) return total_value @property def total_cash(self): # type: () -> float """ 账户总资金 """ return self._total_cash - self.margin @property def position_pnl(self): # type: () -> float """ 昨仓盈亏 """ return sum(p.position_pnl for p in self._iter_pos()) @property def trading_pnl(self): # type: () -> float """ 交易盈亏 """ return sum(p.trading_pnl for p in self._iter_pos()) def _on_before_trading(self, _): for order_book_id, positions in list(self._positions.items()): if all(p.quantity == 0 and p.equity == 0 for p in six.itervalues(positions)): del self._positions[order_book_id] trading_date = self._env.trading_dt.date() while self._pending_deposit_withdraw and self._pending_deposit_withdraw[0][0].date() <= trading_date: _, amount = self._pending_deposit_withdraw.pop(0) self._total_cash += amount for position in self._iter_pos(): self._total_cash += position.before_trading(trading_date) # 负债自增利息 if self._cash_liabilities > 0: self._cash_liabilities += self.cash_liabilities_interest def _on_settlement(self, event): trading_date = self._env.trading_dt.date() for order_book_id, positions in list(self._positions.items()): for position in six.itervalues(positions): delta_cash = position.settlement(trading_date) self._total_cash += delta_cash self._backward_trade_set.clear() fee = self._management_fee() self._management_fees += fee self._total_cash -= fee # 如果 total_value <= 0 则认为已爆仓,清空仓位,资金归0 forced_liquidation = self._env.config.base.forced_liquidation if self.total_value <= 0 and forced_liquidation: if self._positions: user_system_log.warn(_("Trigger Forced Liquidation, current total_value is 0")) self._positions.clear() self._total_cash = 0 def _on_order_pending_new(self, event): if event.account != self: return order = event.order order.set_frozen_cash(self._frozen_cash_of_order(order)) self._frozen_cash += order.init_frozen_cash def _on_order_unsolicited_update(self, event): if event.account != self: return order = event.order if order.filled_quantity != 0: self._frozen_cash -= order.unfilled_quantity / order.quantity * order.init_frozen_cash else: self._frozen_cash -= order.init_frozen_cash def apply_trade(self, trade, order=None): # type: (Trade, Optional[Order]) -> None if trade.exec_id in self._backward_trade_set: return order_book_id = trade.order_book_id if order and trade.position_effect != POSITION_EFFECT.MATCH: if trade.last_quantity != order.quantity: self._frozen_cash -= trade.last_quantity / order.quantity * order.init_frozen_cash else: self._frozen_cash -= order.init_frozen_cash if trade.position_effect == POSITION_EFFECT.MATCH: delta_cash = self._get_or_create_pos( order_book_id, POSITION_DIRECTION.LONG ).apply_trade(trade) + self._get_or_create_pos( order_book_id, POSITION_DIRECTION.SHORT ).apply_trade(trade) self._total_cash += delta_cash else: delta_cash = self._get_or_create_pos(order_book_id, trade.position_direction).apply_trade(trade) self._total_cash += delta_cash self._backward_trade_set.add(trade.exec_id) def _iter_pos(self, direction=None): # type: (Optional[POSITION_DIRECTION]) -> Iterable[Position] if direction: return (p[direction] for p in six.itervalues(self._positions)) else: return chain(*[six.itervalues(p) for p in six.itervalues(self._positions)]) def _get_or_create_pos( self, order_book_id: str, direction: Union[POSITION_DIRECTION, str], init_quantity: float = 0, init_price : Optional[float] = None ) -> Position: if order_book_id not in self._positions: if direction == POSITION_DIRECTION.LONG: long_quantity, short_quantity = init_quantity, 0 else: long_quantity, short_quantity = 0, init_quantity positions = self._positions.setdefault(order_book_id, { POSITION_DIRECTION.LONG: Position(order_book_id, POSITION_DIRECTION.LONG, long_quantity, init_price), POSITION_DIRECTION.SHORT: Position(order_book_id, POSITION_DIRECTION.SHORT, short_quantity, init_price) }) if not init_price: last_price = self._env.get_last_price(order_book_id) for p in positions.values(): p.update_last_price(last_price) if hasattr(positions[direction], "margin") and hasattr(self.__class__, "_margin"): # black magic: improve performance for pure stock strategy setattr(self.__class__, "margin", self.__class__._margin) del self.__class__._margin else: positions = self._positions[order_book_id] return positions[direction] def _on_tick(self, event): tick = event.tick try: positions = self._positions[tick.order_book_id] except KeyError: return for position in positions.values(): position.update_last_price(tick.last) def _on_bar(self, _): for order_book_id, positions in self._positions.items(): price = self._env.get_last_price(order_book_id) if price == price: for position in six.itervalues(positions): position.update_last_price(price) def _frozen_cash_of_order(self, order): if order.position_effect == POSITION_EFFECT.OPEN: instrument = self._env.data_proxy.instrument(order.order_book_id) order_cost = instrument.calc_cash_occupation(order.frozen_price, order.quantity, order.position_direction) else: order_cost = 0 return order_cost + self._env.get_order_transaction_cost(order) def _management_fee(self): # type: () -> float """计算账户管理费用""" if self._management_fee_rate == 0: return 0 fee = self._management_fee_calculator_func(self, self._management_fee_rate) return fee
[文档] def register_management_fee_calculator(self, calculator): # type: (Callable[[Account, float], float]) -> None """ 设置管理费用计算逻辑 该方法需要传入一个函数 .. code-block:: python def management_fee_calculator(account, rate): return len(account.positions) * rate def init(context): context.portfolio.accounts["STOCK"].set_management_fee_calculator(management_fee_calculator) """ self._management_fee_calculator_func = calculator
[文档] def set_management_fee_rate(self, rate): # type: (float) -> None """管理费用计算费率""" self._management_fee_rate = rate
@property def management_fees(self): # type: () -> float """该账户的管理费用总计""" return self._management_fees
[文档] def deposit_withdraw(self, amount: float, receiving_days: int = 0): """出入金""" if (amount < 0) and (self.cash < amount * -1): raise ValueError(_('insufficient cash, current {}, target withdrawal {}').format(self._total_cash, amount)) if receiving_days >= 1: receiving_date = self._env.data_proxy.get_next_trading_date(self._env.trading_dt.date(), n=receiving_days) self._pending_deposit_withdraw.append((receiving_date, amount)) self._pending_deposit_withdraw.sort(key=lambda i: i[0]) else: self._total_cash += amount
[文档] def finance_repay(self, amount): """ 融资还款 """ if self.type == DEFAULT_ACCOUNT_TYPE.STOCK: if amount > 0: # 融资 self._cash_liabilities += amount self._total_cash += amount elif amount < 0: # 还款 amount *= -1 if amount > self.cash: user_system_log.warn(_('insufficient cash, current {}, target withdrawal {}').format(self.cash, amount)) # 预防还多了 excess = min(0, self._cash_liabilities - amount) if excess < 0: user_system_log.warn("repay amount is greater than cash liabilities") self._cash_liabilities = max(0, self._cash_liabilities - amount) self._total_cash -= amount + excess else: pass else: user_system_log.warn(f"{self.type} not support finance_repay")
holding_pnl = deprecated_property("holding_pnl", "position_pnl") realized_pnl = deprecated_property("realized_pnl", "trading_pnl") equity = deprecated_property("equity", "position_equity")