Strategy Development
Strategy Development Guide
Learn how to develop trading strategies using the HiveQ Flow event-driven framework.
Overview
Flow strategies are Python classes that respond to market data and trading events through callback methods. All strategies interact with the platform through the Context API, which provides access to market data subscriptions, order management, position queries, and portfolio state.
Required Imports
Always use the correct import paths to avoid ImportError:
# Core framework
import hiveq.flow as hf
from hiveq.flow import StrategyConfig
# Event types and callbacks
from hiveq.flow import events # For per-event handlers
from hiveq.flow.config import EventType # For event.type comparisons
from hiveq.flow.config import AssetType # EQUITY, OPTIONS, FUTURES, CRYPTO
# Order types and states (CRITICAL: from trading_types, NOT config)
from hiveq.flow.trading_types import OrderType # MARKET, LIMIT, STOP, STOP_LIMIT
from hiveq.flow.trading_types import OrderSide # BUY, SELL
from hiveq.flow.trading_types import OrderStatus # FILLED, CANCELED, REJECTED
# Event logging
from hiveq.flow.config import EventLogType # USER_LOG, ENTRY_TRADE, EXIT_TRADE
# Configuration (optional - only for advanced configs)
from hiveq.flow.config import BacktestConfig, ExecutionModeCritical Import Rules:
EventType,AssetType,ExecutionMode,EventLogType→hiveq.flow.configOrderType,OrderSide,OrderStatus→hiveq.flow.trading_types- NEVER import
OrderType,OrderSide,OrderStatusfromhiveq.flow.config
Strategy Implementation
Two Callback Approaches
CRITICAL: Choose ONE approach per strategy class. NEVER mix both approaches!
Approach 1: Single Global Handler
import hiveq.flow as hf
from hiveq.flow.config import EventType, AssetType
class MyStrategy:
def on_hiveq_event(self, ctx: hf.Context, event: hf.Event) -> None:
"""Single handler for all events."""
if event.type == EventType.START:
# Initialize strategy
ctx.subscribe_bars(symbols=['AAPL'], asset_type=AssetType.EQUITY, interval='1m')
elif event.type == EventType.BAR:
bar = event.data() # Get Bar object from event
# Trading logic
if ctx.is_flat(bar.symbol):
ctx.buy_order(symbol=bar.symbol, quantity=100)
elif event.type == EventType.ORDER_FILLED:
order = event.data() # Get Order object from event
ctx.add_event_log(
message=f"Order filled: {order.filled_qty} @ ${order.avg_px:.2f}",
symbol=order.symbol
)Approach 2: Per-Event Handlers (Recommended for Type Safety)
import hiveq.flow as hf
from hiveq.flow import events
from hiveq.flow.config import EventType, AssetType
class MyStrategy:
def on_start(self, ctx: hf.Context, event: events.StartEvent) -> None:
"""Called once when strategy starts."""
ctx.subscribe_bars(symbols=['AAPL'], asset_type=AssetType.EQUITY, interval='1m')
def on_bar(self, ctx: hf.Context, event: events.BarEvent) -> None:
"""Called for each new bar."""
bar = event.data() # Get Bar object from event
if ctx.is_flat(bar.symbol):
ctx.buy_order(symbol=bar.symbol, quantity=100)
def on_order(self, ctx: hf.Context, event: events.OrderEvent) -> None:
"""Called for ALL order events - check event.type to distinguish."""
order = event.data() # Get Order object from event
if event.type == EventType.ORDER_FILLED:
ctx.add_event_log(
message=f"Order filled: {order.filled_qty} @ ${order.avg_px:.2f}",
symbol=order.symbol
)
def on_stop(self, ctx: hf.Context, event: events.StopEvent) -> None:
"""Called once when strategy stops."""
portfolio = ctx.portfolio()
ctx.add_event_log(
message=f"Strategy stopped. Total P&L: ${portfolio.total_pnl():,.2f}"
)Event System
Event Types
from hiveq.flow.config import EventType
# Lifecycle events
EventType.START # Strategy initialization
EventType.STOP # Strategy teardown
# Market data events
EventType.BAR # Generic bar event
EventType.BAR_1_MIN # 1-minute bar
EventType.BAR_5_MIN # 5-minute bar
EventType.BAR_15_MIN # 15-minute bar
EventType.BAR_30_MIN # 30-minute bar
EventType.BAR_1_HOUR # 1-hour bar
EventType.BAR_1_DAY # 1-day bar
# Tick data events
EventType.TICK # Generic tick
EventType.TRADE # Trade tick
EventType.QUOTE # Quote (bid/ask) tick
EventType.SNAP # Options snapshot
# Order events
EventType.ORDER_SUBMITTED # Order submitted to exchange
EventType.ORDER_ACCEPTED # Order accepted by exchange
EventType.ORDER_FILLED # Order filled
EventType.ORDER_REJECTED # Order rejected
EventType.ORDER_CANCELED # Order canceled
EventType.ORDER_UPDATED # Order updated
# Position events
EventType.POSITION_OPENED # New position opened
EventType.POSITION_CHANGED # Position quantity changed
EventType.POSITION_CLOSED # Position closed
# Custom events
EventType.CUSTOM_DATA # Custom data received
EventType.TIMER # Timer eventAvailable Event Callbacks
from hiveq.flow import events
class MyStrategy:
# Lifecycle callbacks
def on_start(self, ctx: hf.Context, event: events.StartEvent) -> None:
"""Initialize strategy - subscribe to data here."""
pass
def on_stop(self, ctx: hf.Context, event: events.StopEvent) -> None:
"""Cleanup - log final statistics, cancel timers."""
pass
# Market data callbacks
def on_bar(self, ctx: hf.Context, event: events.BarEvent) -> None:
"""Process bar (OHLCV) data."""
bar = event.data() # Must call event.data() to get Bar object
pass
def on_trade(self, ctx: hf.Context, event: events.TradeEvent) -> None:
"""Process trade ticks."""
trade = event.data()
pass
def on_quote(self, ctx: hf.Context, event: events.QuoteEvent) -> None:
"""Process quote (bid/ask) ticks."""
quote = event.data()
pass
def on_snap(self, ctx: hf.Context, event: events.SnapEvent) -> None:
"""Process options snapshot data."""
snap = event.data()
pass
def on_custom_data(self, ctx: hf.Context, event: events.CustomDataEvent) -> None:
"""Process custom data from CSV or signals."""
data = event.data()
pass
# Trading callbacks (use event.type to distinguish states)
def on_order(self, ctx: hf.Context, event: events.OrderEvent) -> None:
"""Handle ALL order events - check event.type for state."""
order = event.data()
if event.type == EventType.ORDER_FILLED:
# Handle fill
pass
elif event.type == EventType.ORDER_REJECTED:
# Handle rejection
pass
def on_position(self, ctx: hf.Context, event: events.PositionEvent) -> None:
"""Handle ALL position events - check event.type for state."""
pos = event.data()
if event.type == EventType.POSITION_OPENED:
# Handle new position
pass
elif event.type == EventType.POSITION_CLOSED:
# Handle position close
pass
# Timer callback
def on_timer(self, ctx: hf.Context, event: events.TimerEvent) -> None:
"""Handle timer events."""
timer = event.data()
passCRITICAL NOTES:
- All callbacks receive TWO parameters:
ctx: hf.Contextandevent: Event - Always call
event.data()to access the underlying data object - For
on_orderandon_position, useevent.typeto distinguish states - DO NOT use
on_order_filled(),on_order_rejected(), etc. - they don't exist!
Context API
The Context object provides all strategy operations:
Market Data Subscriptions
# Bar data (OHLCV)
ctx.subscribe_bars(
symbols: list[str],
asset_type: AssetType = AssetType.EQUITY,
interval: str = '1m' # '1s', '5s', '30s', '1m', '5m', '15m', '30m', '1h', '4h', '1d'
) -> None
# Futures bars with continuous contracts
ctx.subscribe_futures_bars(
symbols: list[str], # e.g., ['ES.c.0', 'NQ.v.0']
interval: str
) -> None
# Top-of-book quotes + trades (TBBO)
ctx.subscribe_tbbo(
symbols: list[str],
asset_type: AssetType = AssetType.EQUITY
) -> None
ctx.subscribe_futures_tbbo(symbols: list[str]) -> None
# Options snapshots with filtering
ctx.subscribe_option_snaps(
symbol: str, # Underlying symbol (e.g., 'SPX')
option_type: str = None, # 'C'/'Call', 'P'/'Put', or None (both)
strike: float = None, # Specific strike or None (all)
expiration_type: str = None, # '0dte', 'YYYYMMDD', 'YYYY-MM-DD', or None (all)
interval: str = '1s'
) -> None
# Tick-level data
ctx.subscribe_trade_ticks(
symbols: list[str],
asset_type: AssetType = AssetType.EQUITY
) -> None
ctx.subscribe_quotes(
symbols: list[str],
asset_type: AssetType = AssetType.EQUITY
) -> None
# Custom data (signals, CSV)
ctx.subscribe_data(data_id: str) -> NoneOrder Management
from hiveq.flow.trading_types import OrderType, OrderSide
# Market orders
ctx.buy_order(
symbol: str,
quantity: float,
order_type: OrderType = OrderType.MARKET,
limit_price: float = None,
stop_price: float = None
) -> Order
ctx.sell_order(
symbol: str,
quantity: float,
order_type: OrderType = OrderType.MARKET,
limit_price: float = None,
stop_price: float = None
) -> Order
ctx.short_order(
symbol: str,
quantity: float,
order_type: OrderType = OrderType.MARKET,
limit_price: float = None,
stop_price: float = None
) -> Order
# Limit orders
ctx.limit_order(
symbol: str,
quantity: float,
limit_price: float,
side: OrderSide
) -> Order
# Market-on-Open and Market-on-Close
ctx.moo_order(symbol: str, quantity: float, side: OrderSide) -> Order
ctx.moc_order(symbol: str, quantity: float, side: OrderSide) -> Order
# Order management
ctx.cancel_order(order_id: str) -> bool
ctx.replace_order(
order_id: str,
quantity: float = None,
limit_price: float = None,
stop_price: float = None
) -> str # Returns new order_id
# Order queries
ctx.get_order_state(order_id: str) -> OrderState
ctx.has_open_order(symbol: str = None) -> bool
ctx.open_order_qty(symbol: str = None) -> float # Net pending order quantityPosition Queries
# Position state
ctx.net_position(symbol: str = None) -> float # Positive=long, negative=short
ctx.is_flat(symbol: str = None) -> bool
ctx.is_net_long(symbol: str) -> bool
ctx.is_net_short(symbol: str) -> boolPortfolio (Strategy-Level)
portfolio = ctx.portfolio() # Returns strategy's portfolio
# P&L methods
portfolio.total_pnl(symbol: str = None) -> float
portfolio.realized_pnl(symbol: str = None) -> float
portfolio.unrealized_pnl(symbol: str = None) -> float
# Position methods
portfolio.net_position(symbol: str = None) -> float
portfolio.net_exposure(symbol: str = None, mark_price: float = None) -> float
portfolio.net_exposures() -> dict[str, float] # All exposures
portfolio.get_last_price(symbol: str) -> float | None # Last quote/trade/bar price
# Position state
portfolio.is_flat(symbol: str = None) -> bool
portfolio.is_net_long(symbol: str) -> bool
portfolio.is_net_short(symbol: str) -> boolGlobal Portfolio (Account-Level)
global_portfolio = ctx.global_portfolio() # All strategies combined
# Same methods as portfolio, aggregated across all strategies
global_portfolio.total_pnl(symbol: str = None) -> float
global_portfolio.net_position(symbol: str = None) -> float
global_portfolio.net_exposure(symbol: str = None) -> floatTimers
import pandas as pd
ctx.set_timer(timer_id: str, timer_interval: pd.Timedelta) -> None
ctx.cancel_timer(timer_id: str) -> NoneInstruments
inst = ctx.instrument(symbol: str)
# Access: inst.symbol, inst.last_bar, inst.multiplier, inst.min_tick, inst.asset_typeEvent Logging
ctx.add_event_log(
message: str,
sub_event_type: str = None,
symbol: str = None
) -> NoneProperties
ctx.strategy_config # Access StrategyConfig objectData Types
Bar
Bar objects provide zero-copy access to OHLCV data:
bar = event.data() # In on_bar callback
bar.symbol: str # Trading symbol
bar.open: float # Opening price (returns float directly)
bar.high: float # High price
bar.low: float # Low price
bar.close: float # Closing price
bar.volume: float # Trading volume
bar.ts_event: int # Event timestamp in nanoseconds
bar.ts_init: int # Initialization timestamp in nanosecondsImportant: Bar attributes return float directly - NO need for .as_double() or type conversion.
Order
order = event.data() # In on_order callback
# Identification
order.order_id: str # Unique order identifier
order.client_order_id: str # Alias for order_id
# Symbol and Side
order.symbol: str # Trading symbol
order.side: OrderSide # OrderSide.BUY or OrderSide.SELL
# Quantities
order.quantity: float # Total order quantity
order.filled_qty: float # Quantity filled
order.leaves_qty: float # Remaining unfilled quantity
# Order Type and Prices
order.order_type: OrderType # OrderType.MARKET, LIMIT, STOP, etc.
order.limit_price: float | None # Limit price (None if not limit order)
order.stop_price: float | None # Stop price (None if not stop order)
# Status and Fills
order.status: OrderStatus # OrderStatus.SUBMITTED, ACCEPTED, FILLED, etc.
order.avg_px: float | None # Average fill price (None if not filled)
order.last_qty: float | None # Quantity filled in last execution
order.last_px: float | None # Price of last execution
# Timestamps
order.ts_event: int # Event timestamp in nanoseconds
order.ts_init: int # Initialization timestamp in nanoseconds
# Time in Force
order.time_in_force: str # 'GTC', 'IOC', 'FOK', 'GTD', 'DAY', etc.Important: Use order.limit_price, not order.price. Use order.avg_px, not order.avg_price.
Trade (Trade Tick)
trade = event.data() # In on_trade callback
trade.symbol: str
trade.price: float
trade.size: int
trade.aggressor_side: str # 'BUYER', 'SELLER', 'NO_AGGRESSOR'
trade.trade_id: str
trade.ts_event: datetimeQuote (Bid/Ask)
quote = event.data() # In on_quote callback
quote.symbol: str
quote.bid_price: float
quote.ask_price: float
quote.bid_size: float
quote.ask_size: float
quote.ts_event: datetimeSnapData (Options Snapshot)
snap = event.data() # In on_snap callback
snap.symbol: str # Root symbol (e.g., 'SPXW')
snap.chain: str # OCC symbol (e.g., 'SPXW 251028C06875000')
snap.strike: float # Strike price
snap.option_type: str # 'C' for Call or 'P' for Put
snap.bid_px: float | None # Best bid price (can be None)
snap.ask_px: float | None # Best ask price (can be None)
snap.bid_sz: int # Best bid size
snap.ask_sz: int # Best ask size
snap.price: float | None # Last trade price (can be None)
snap.size: int # Last trade size
snap.ts_event: int # Event timestamp in nanoseconds
snap.ts_init: int # Initialization timestamp in nanosecondsCRITICAL: bid_px, ask_px, and price can be None. Always check before formatting:
# Correct
last_price = f"${snap.price:.2f}" if snap.price is not None else "N/A"
# Wrong - will raise TypeError if price is None
last_price = f"${snap.price:.2f}"CustomData
data = event.data() # In on_custom_data callback
data.symbol: str
data.ts_event: datetime
data.column_data(column_name: str, default=None) -> Any # Access CSV columns
data.header: str # CSV header
data.row: str # CSV row dataComplete Examples
Example 1: Simple Moving Average Crossover
import hiveq.flow as hf
from hiveq.flow import StrategyConfig
from hiveq.flow.config import EventType, AssetType
from collections import deque
import numpy as np
hf.init(trader_id='T123', api_key='KEY')
class SMACrossover:
def __init__(self):
self.fast_window = 10
self.slow_window = 30
self.prices = {}
def on_hiveq_event(self, ctx: hf.Context, event: hf.Event) -> None:
if event.type == EventType.START:
ctx.subscribe_bars(
symbols=ctx.strategy_config.symbols,
asset_type=AssetType.EQUITY,
interval='1m'
)
for symbol in ctx.strategy_config.symbols:
self.prices[symbol] = {
'fast': deque(maxlen=self.fast_window),
'slow': deque(maxlen=self.slow_window)
}
elif event.type == EventType.BAR:
bar = event.data()
self.prices[bar.symbol]['fast'].append(bar.close)
self.prices[bar.symbol]['slow'].append(bar.close)
if len(self.prices[bar.symbol]['slow']) < self.slow_window:
return
fast_sma = np.mean(self.prices[bar.symbol]['fast'])
slow_sma = np.mean(self.prices[bar.symbol]['slow'])
position = ctx.net_position(bar.symbol)
if fast_sma > slow_sma and position == 0:
ctx.buy_order(symbol=bar.symbol, quantity=100)
elif fast_sma < slow_sma and position > 0:
ctx.sell_order(symbol=bar.symbol, quantity=position)
report = hf.run_backtest(
strategy_configs=[StrategyConfig(name='SMA', type='SMACrossover', params={})],
symbols=['AAPL'],
start_date='2025-09-01',
end_date='2025-09-30',
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m']
}]
)Example 2: Per-Event Handlers with Order Tracking
import hiveq.flow as hf
from hiveq.flow import StrategyConfig, events
from hiveq.flow.config import EventType, AssetType
from hiveq.flow.trading_types import OrderStatus
hf.init(trader_id='T123', api_key='KEY')
class TypedStrategy:
def __init__(self):
self.entry_prices = {}
self.profit_target = 1.05 # 5% profit target
def on_start(self, ctx: hf.Context, event: events.StartEvent) -> None:
ctx.subscribe_bars(
symbols=ctx.strategy_config.symbols,
asset_type=AssetType.EQUITY,
interval='1m'
)
ctx.add_event_log(
message=f"Strategy started with profit target {self.profit_target}",
sub_event_type="INIT"
)
def on_bar(self, ctx: hf.Context, event: events.BarEvent) -> None:
bar = event.data()
if ctx.is_flat(bar.symbol):
ctx.buy_order(symbol=bar.symbol, quantity=100)
self.entry_prices[bar.symbol] = bar.close
ctx.add_event_log(
message=f"Entered long at {bar.close:.2f}",
symbol=bar.symbol
)
elif ctx.is_net_long(bar.symbol):
entry = self.entry_prices.get(bar.symbol)
if entry and bar.close >= entry * self.profit_target:
ctx.add_event_log(
message=f"Profit target hit: {bar.close:.2f} vs entry {entry:.2f}",
symbol=bar.symbol
)
ctx.sell_order(symbol=bar.symbol, quantity=ctx.net_position(bar.symbol))
def on_order(self, ctx: hf.Context, event: events.OrderEvent) -> None:
order = event.data()
if event.type == EventType.ORDER_FILLED:
ctx.add_event_log(
message=f"Order filled: {order.filled_qty} @ ${order.avg_px:.2f}",
symbol=order.symbol
)
portfolio = ctx.portfolio()
ctx.add_event_log(
message=f"Current P&L: ${portfolio.total_pnl():.2f}",
symbol=order.symbol
)
def on_stop(self, ctx: hf.Context, event: events.StopEvent) -> None:
portfolio = ctx.portfolio()
ctx.add_event_log(
message=f"Strategy completed - Total P&L: ${portfolio.total_pnl():,.2f}",
sub_event_type="TEARDOWN"
)
report = hf.run_backtest(
strategy_configs=[StrategyConfig(name='Typed', type='TypedStrategy', params={})],
symbols=['AAPL'],
start_date='2025-09-01',
end_date='2025-09-05',
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m']
}]
)Example 3: Futures Trading with Continuous Contracts
import hiveq.flow as hf
from hiveq.flow import StrategyConfig, events
from hiveq.flow.config import AssetType
hf.init(trader_id='T123', api_key='KEY')
class FuturesTrendStrategy:
def __init__(self):
self.contracts_per_trade = 1
def on_start(self, ctx: hf.Context, event: events.StartEvent) -> None:
# Subscribe to continuous futures (auto-rollover)
ctx.subscribe_futures_bars(['ES.c.0', 'NQ.c.0'], interval='5m')
ctx.subscribe_futures_tbbo(['ES.c.0'])
def on_bar(self, ctx: hf.Context, event: events.BarEvent) -> None:
bar = event.data()
if bar.close > bar.open and ctx.is_flat(bar.symbol):
# Bullish bar, enter long
ctx.buy_order(bar.symbol, self.contracts_per_trade)
elif bar.close < bar.open and ctx.is_net_long(bar.symbol):
# Bearish bar, exit
ctx.sell_order(bar.symbol, ctx.net_position(bar.symbol))
def on_trade(self, ctx: hf.Context, event: events.TradeEvent) -> None:
trade = event.data()
# Process tick-level trade data if needed
pass
report = hf.run_backtest(
strategy_configs=[StrategyConfig(name='Futures', type='FuturesTrendStrategy')],
symbols=['ES.c.0', 'NQ.c.0'],
start_date='2025-09-01',
end_date='2025-09-30',
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_FUT',
'schema': ['bars_5m']
}]
)Example 4: Options Trading with Chain Filtering
import hiveq.flow as hf
from hiveq.flow import StrategyConfig, events
hf.init(trader_id='T123', api_key='KEY')
class OptionsSpreadStrategy:
def on_start(self, ctx: hf.Context, event: events.StartEvent) -> None:
# Subscribe to 0DTE options only
ctx.subscribe_option_snaps(
'SPX',
strike=5000,
option_type='C',
expiration_type='0dte',
interval='1s'
)
def on_snap(self, ctx: hf.Context, event: events.SnapEvent) -> None:
snap = event.data()
# Check bid-ask spread for tight markets
if snap.bid_px is not None and snap.ask_px is not None:
spread = snap.ask_px - snap.bid_px
mid_price = (snap.bid_px + snap.ask_px) / 2
if spread < 0.10: # Tight spread
ctx.add_event_log(
message=f"Tight spread: {snap.chain} spread=${spread:.2f} mid=${mid_price:.2f}",
symbol=snap.symbol
)
report = hf.run_backtest(
strategy_configs=[StrategyConfig(name='Options', type='OptionsSpreadStrategy')],
symbols=['SPX'],
start_date='2025-09-01',
end_date='2025-09-05',
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_OPT',
'schema': ['snaps_1s']
}]
)Best Practices
- Initialize First: Always call
hf.init(trader_id, api_key)before running backtests - Choose ONE Callback Approach: Never mix global
on_hiveq_eventwith per-event callbacks - Subscribe in START: Always subscribe to data in
on_startorEventType.START - Call event.data(): Always call
event.data()to access the underlying data object - Check Positions: Use
ctx.is_flat()andctx.has_open_order()to avoid duplicate orders - Use Event Types: In
on_orderandon_position, checkevent.typeto distinguish states - Handle None Values: For options data, check if
bid_px,ask_px,priceare None before using - Correct Imports: Import order types from
hiveq.flow.trading_types, nothiveq.flow.config - Date Requirements:
end_datemust be AFTERstart_date(at least T+1) - Debug with Logs: Use
hf.event_logs()after backtest to view all events
Common Patterns
Position Check Before Entry:
if ctx.is_flat(symbol) and not ctx.has_open_order(symbol):
ctx.buy_order(symbol=symbol, quantity=100)Profit Target Exit:
if ctx.is_net_long(symbol):
entry_price = self.entry_prices[symbol]
if bar.close > entry_price * 1.05: # 5% profit
ctx.sell_order(symbol=symbol, quantity=ctx.net_position(symbol))Order State Handling:
def on_order(self, ctx: hf.Context, event: events.OrderEvent) -> None:
order = event.data()
if event.type == EventType.ORDER_FILLED:
# Handle fill
pass
elif event.type == EventType.ORDER_REJECTED:
# Handle rejection
passNext Steps
- Configuration: Configure backtest parameters and strategy settings
Document Version: 1.0 Last Updated: 2025-11-20