Examples
HiveQ Flow Examples
Comprehensive guide with working examples for all use cases.
Table of Contents
- Basic Strategy Examples
- Strategy Patterns
- Data Integration
- Order Management
- Asset Types
- Live Trading
Basic Strategy Examples
Simple Moving Average Crossover
Complete SMA crossover strategy with position management.
Source: examples/sma_strategy.py
from typing import Dict
import numpy as np
import hiveq.flow as hf
from hiveq.flow.config import EventType, AssetType
from hiveq.flow import StrategyConfig
from collections import deque
from hiveq.flow.logger import logger
logger = logger()
class SymbolInfo:
"""Track per-symbol data including prices and position info."""
def __init__(self, symbol: str, fast_window: int, slow_window: int):
self.symbol = symbol
self.fast_window = fast_window
self.slow_window = slow_window
# Price buffers for SMA calculation
self.fast_prices = deque(maxlen=fast_window)
self.slow_prices = deque(maxlen=slow_window)
# SMA values
self.fast_sma = None
self.slow_sma = None
def update_prices(self, price: float):
"""Update price buffers with new price."""
self.fast_prices.append(price)
self.slow_prices.append(price)
# Calculate SMAs if we have enough data
if len(self.slow_prices) >= self.slow_window:
self.fast_sma = np.mean(self.fast_prices)
self.slow_sma = np.mean(self.slow_prices)
return True
return False
def ready_for_trading(self) -> bool:
"""Check if we have enough data to calculate SMAs."""
return len(self.slow_prices) >= self.slow_window
class SMACrossoverStrategy:
def __init__(self):
self.fast_window = 10
self.slow_window = 30
self.trade_size = 100
# Dictionary to track info per symbol
self.symbols: Dict[str, SymbolInfo] = {}
def on_hiveq_event(self, ctx: hf.Context, event):
"""
SMA Crossover Strategy with Multi-Symbol Support.
Buys when fast SMA crosses above slow SMA (golden cross).
Sells when fast SMA crosses below slow SMA (death cross).
"""
if event.type == EventType.START:
# Subscribe to bars for all symbols
symbols = ctx.strategy_config.symbols
ctx.subscribe_bars(symbols=symbols, asset_type=AssetType.EQUITY, interval='1m')
# Initialize SymbolInfo for each symbol
for symbol in symbols:
self.symbols[symbol] = SymbolInfo(
symbol, self.fast_window, self.slow_window
)
logger.info(f"Initialized tracking for {symbol}")
elif event.type == EventType.BAR:
bar = event.data()
symbol = bar.symbol
close_price = bar.close
# Get or create symbol info
if symbol not in self.symbols:
self.symbols[symbol] = SymbolInfo(
symbol, self.fast_window, self.slow_window
)
symbol_info = self.symbols[symbol]
# Update prices and calculate SMAs
smas_ready = symbol_info.update_prices(close_price)
# Check if we have enough data
if not smas_ready:
return
fast_sma = symbol_info.fast_sma
slow_sma = symbol_info.slow_sma
# Get current position
net_pos = ctx.net_position(symbol)
# Golden Cross: Fast SMA > Slow SMA (bullish)
if fast_sma > slow_sma:
if net_pos == 0.0:
# Open long position
logger.info(
f"{symbol}: GOLDEN CROSS - Buying {self.trade_size} "
f"shares at ~{close_price:.2f}"
)
ctx.buy_order(symbol=symbol, quantity=self.trade_size)
elif net_pos < 0:
# Close short position
logger.info(
f"{symbol}: GOLDEN CROSS - Closing short position "
f"of {abs(net_pos)} shares"
)
ctx.buy_order(symbol=symbol, quantity=abs(net_pos))
# Death Cross: Fast SMA < Slow SMA (bearish)
elif fast_sma < slow_sma:
if net_pos == 0.0:
# Open short position
logger.info(
f"{symbol}: DEATH CROSS - Selling {self.trade_size} "
f"shares at ~{close_price:.2f}"
)
ctx.sell_order(symbol=symbol, quantity=self.trade_size)
elif net_pos > 0:
# Close long position
logger.info(
f"{symbol}: DEATH CROSS - Closing long position "
f"of {net_pos} shares"
)
ctx.sell_order(symbol=symbol, quantity=int(net_pos))
if __name__ == '__main__':
# Configure strategy
strategy_configs = [
StrategyConfig(
name='SMACross',
type='SMACrossoverStrategy',
params={'fast_window': 10, 'slow_window': 30}
)
]
# Configure backtest
from hiveq.flow.config import BacktestConfig
backtest_config = BacktestConfig(
initial_capital=1000000.0,
start_date='2025-09-19',
end_date='2025-09-25'
)
# Run backtest
report = hf.run_backtest(
strategy_configs=strategy_configs,
symbols=['AAPL'],
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m'],
}],
backtest_config=backtest_config
)
# Print results
event_df = hf.event_logs()
print(event_df.to_string())Key Concepts:
- Multi-symbol support with per-symbol state tracking
- SMA calculation using rolling windows
- Position state management
- Golden cross (bullish) and death cross (bearish) signals
- Proper initialization and cleanup
Strategy Patterns
Pattern 1: Single Event Handler
Use a single on_hiveq_event() method to handle all events.
Source: examples/global_function_strategy.py
import hiveq.flow as hf
from hiveq.flow.config import EventType
def on_hiveq_event(ctx: hf.Context, event: hf.Event):
"""
Global function strategy.
Simple buy and hold strategy.
"""
if event.type == EventType.START:
ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')
print("Strategy started!")
elif event.type == EventType.BAR:
bar = event.data()
symbol = bar.symbol
# Buy on first bar if flat
position = ctx.portfolio().net_position(symbol)
if position == 0:
ctx.buy_order(symbol=symbol, quantity=100)
print(f"Bought 100 shares of {symbol} at {bar.close}")
elif event.type == EventType.ORDER_FILLED:
print("Order filled!")
if __name__ == '__main__':
report = hf.run_backtest(
strategy_configs=[], # Empty - uses global function
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-02',
data_configs=[{
'type': 'csv',
'data_type': 'bars_1m',
'id': '1_MIN_BAR',
'path': 'bars/AAPL_bars.csv'
}]
)Advantages:
- Simple, flat structure
- Easy to understand event flow
- Good for small strategies
Pattern 2: Individual Callback Methods
Use separate callback methods for each event type.
Source: examples/sma_strategy_event_callbacks.py
from collections import deque
import numpy as np
import pandas as pd
import hiveq.flow as hf
from hiveq.flow.events import StartEvent, BarEvent, TimerEvent
from hiveq.flow import Context, StrategyConfig
from hiveq.flow.logger import logger
logger = logger()
class SMAEventCallbackStrategy:
"""SMA Crossover Strategy with per-event callbacks."""
def __init__(self):
"""Initialize strategy state and parameters."""
self.fast_window: int = 10
self.slow_window: int = 30
self.trade_size: int = 100
# Price buffers
self.fast_prices: deque = deque(maxlen=self.fast_window)
self.slow_prices: deque = deque(maxlen=self.slow_window)
logger.info(f"Initialized SMA Strategy - Fast: {self.fast_window}, "
f"Slow: {self.slow_window}")
def on_start(self, ctx: Context, event: StartEvent) -> None:
"""Called when the strategy starts."""
logger.info("=== Strategy Starting ===")
# Subscribe to bar data
symbols = ctx.strategy_config.symbols
logger.info(f"Subscribing to 1-minute bars for: {symbols}")
ctx.subscribe_bars(symbols=symbols, asset_type=AssetType.EQUITY, interval='1m')
# Set up periodic timer
ctx.set_timer('status_report', pd.Timedelta(minutes=5))
# Override parameters from config
if ctx.strategy_config.params:
self.fast_window = ctx.strategy_config.params.get(
'fast_window', self.fast_window
)
self.slow_window = ctx.strategy_config.params.get(
'slow_window', self.slow_window
)
self.trade_size = ctx.strategy_config.params.get(
'trade_size', self.trade_size
)
# Reinitialize deques
self.fast_prices = deque(maxlen=self.fast_window)
self.slow_prices = deque(maxlen=self.slow_window)
def on_bar(self, ctx: Context, event: BarEvent) -> None:
"""Called for each bar."""
bar = event.data()
symbol = bar.symbol
close_price = bar.close
# Update price buffers
self.fast_prices.append(close_price)
self.slow_prices.append(close_price)
# Check if we have enough data
if len(self.slow_prices) < self.slow_window:
return
# Calculate SMAs
fast_sma = np.mean(self.fast_prices)
slow_sma = np.mean(self.slow_prices)
# Generate trading signals
self._check_trading_signals(ctx, symbol, fast_sma, slow_sma, close_price)
def on_timer(self, ctx: Context, event: TimerEvent) -> None:
"""Called when timer fires."""
timer = event.data()
logger.info(f'Timer fired: {timer.timer_id}')
# Access last bar
inst = ctx.instrument('AAPL')
if inst.last_bar:
print(f"Last bar open: {inst.last_bar.open}")
def _check_trading_signals(self, ctx: Context, symbol: str,
fast_sma: float, slow_sma: float,
current_price: float) -> None:
"""Check and execute trading signals."""
net_pos = ctx.net_position(symbol)
# Golden Cross: Fast > Slow (bullish)
if fast_sma > slow_sma:
if net_pos <= 0:
logger.info(f"GOLDEN CROSS - Buying {self.trade_size} shares")
ctx.buy_order(symbol=symbol, quantity=self.trade_size)
# Death Cross: Fast < Slow (bearish)
elif fast_sma < slow_sma:
if net_pos > 0:
logger.info(f"DEATH CROSS - Selling {net_pos} shares")
ctx.sell_order(symbol=symbol, quantity=int(net_pos))
if __name__ == '__main__':
# Configure strategy
strategy_configs = [
StrategyConfig(
name='SMAEventCallbacks',
type='SMAEventCallbackStrategy',
params={
'fast_window': 10,
'slow_window': 30,
'trade_size': 100
}
)
]
# Run backtest
report = hf.run_backtest(
strategy_configs=strategy_configs,
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-02',
data_configs=[{
'type': 'csv',
'data_type': 'bars_1m',
'id': '1_MIN_BAR',
'path': 'bars/AAPL_bars.csv'
}]
)Advantages:
- Type hints for each event type
- Better IDE autocomplete
- Cleaner separation of concerns
- Easier to test individual methods
Pattern 3: Jupyter/Notebook Strategy
Define strategy class inline (useful for Jupyter notebooks).
Source: examples/notebook_simulation_class.py
import numpy as np
import hiveq.flow as hf
from hiveq.flow.config import EventType
from hiveq.flow import StrategyConfig
from collections import deque
class NotebookStrategy:
def __init__(self):
self.fast_window = 5
self.slow_window = 20
self.fast_prices = deque(maxlen=self.fast_window)
self.slow_prices = deque(maxlen=self.slow_window)
def on_hiveq_event(self, ctx: hf.Context, event):
"""Jupyter-defined strategy class."""
if event.type == EventType.START:
ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')
elif event.type == EventType.BAR:
bar = event.data()
symbol = bar.symbol
self.fast_prices.append(bar.close)
self.slow_prices.append(bar.close)
if len(self.slow_prices) < self.slow_window:
return
fast_ma = np.mean(self.fast_prices)
slow_ma = np.mean(self.slow_prices)
# Crossover logic
position = ctx.portfolio().net_position(symbol)
if fast_ma > slow_ma and position <= 0:
ctx.buy_order(symbol=symbol, quantity=50)
print(f"Golden cross! Bought 50 shares at {bar.close}")
elif fast_ma < slow_ma and position >= 0:
ctx.sell_order(symbol=symbol, quantity=50)
print(f"Death cross! Sold 50 shares at {bar.close}")
elif event.type == EventType.ORDER_FILLED:
print("Order filled!")
# Jupyter cell execution
if __name__ == '__main__':
strategy_configs = [
StrategyConfig(
name='NotebookStrategy',
type='NotebookStrategy',
params={'fast_window': 5, 'slow_window': 20}
)
]
report = hf.run_backtest(
strategy_configs=strategy_configs,
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-02',
data_configs=[{
'type': 'csv',
'data_type': 'bars_1m',
'id': '1_MIN_BAR',
'path': 'bars/AAPL_bars.csv'
}]
)Advantages:
- Quick prototyping in Jupyter
- No separate file needed
- Immediate feedback
- Easy parameter tuning
Data Integration
Custom Data Integration
Integrate custom signals or data with market data.
Source: examples/custom_data.py
import hiveq.flow as hf
from hiveq.flow.config import EventType
class CustomDataStrategy:
def on_hiveq_event(self, ctx: hf.Context, event):
"""Strategy that uses custom data signals."""
if event.type == EventType.START:
# Subscribe to both bars and custom data
ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')
ctx.subscribe_data(data_id=ctx.strategy_config.user_data_id)
elif event.type == EventType.CUSTOM_DATA:
# Process custom data event
data = event.data()
hf.logger().info(f"Received custom data: {data.row}")
# Custom data can contain signals, indicators, etc.
# Access fields from your CSV: data.signal, data.indicator, etc.
from hiveq.flow import StrategyConfig
# Configure strategy
strategy_configs = [
StrategyConfig(
name='CustomDataTest',
type='CustomDataStrategy',
params={'user_data_id': 'UserDataTest'}
)
]
# Run with both market data and custom data
report = hf.run_backtest(
strategy_configs=strategy_configs,
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-02',
data_configs=[
{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m'],
},
{
'type': 'csv',
'data_type': 'custom',
'path': 'userdata/user_data.csv',
'id': 'UserDataTest' # Must match subscribe_data() ID
}
]
)Custom Data CSV Format:
timestamp,symbol,signal,value
2025-08-01 09:30:00,AAPL,BUY,1.5
2025-08-01 09:31:00,AAPL,HOLD,0.8
2025-08-01 09:32:00,AAPL,SELL,-1.2Key Points:
- Custom data synchronized with bar data by timestamp
- Access via
EventType.CUSTOM_DATAevents - Subscribe using
ctx.subscribe_data(data_id='your_id') - Data ID must match the 'id' field in data_config
Pre-Built Strategy with Custom Data
Using built-in strategies with custom signals.
Source: examples/pre_built_strategy.py
import hiveq.flow as hf
from hiveq.flow import StrategyConfig
strategy_configs = [
StrategyConfig(
name='TestUserSignal',
type='UserSignalStrategy', # Pre-built strategy
params={
'user_data_list': ['UserDataTest'],
'universe_file': 'universe/univ.csv',
'mm': 1.0,
'enable_holding': False,
'mm_reset': 1.0,
'entry_begin_time': "00:00:01",
'entry_end_time': "15:49:00",
'exit_begin_time': "15:49:30",
'exit_end_time': "16:00:00",
'stop_loss_bps': 0
}
)
]
report = hf.run_backtest(
strategy_configs=strategy_configs,
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-30',
data_configs=[
{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m'],
},
{
'type': 'csv',
'data_type': 'custom',
'path': 'userdata/user_data.csv',
'id': 'UserDataTest'
}
]
)
# Access results
print(report.return_stats.to_string())
print(report.run_info.to_string())
print(report.positions.to_string())
print(report.fills.to_string())Order Management
MOO/MOC Orders (Market On Open/Close)
Special order types for market open and close execution.
Source: examples/moo_moc_strategy.py
import hiveq.flow as hf
from hiveq.flow.config import EventType
from datetime import time
class StrategyState:
def __init__(self):
self.moo_order_placed = False
self.moo_order_filled = False
self.moc_order_placed = False
self.moc_order_filled = False
self.moo_order_id = None
self.moc_order_id = None
state = StrategyState()
def get_eastern_time(bar):
"""Convert bar timestamp to Eastern Time."""
import pandas as pd
ts_utc = pd.Timestamp(bar.ts_event, unit='ns', tz='UTC')
ts_eastern = ts_utc.tz_convert('America/New_York')
return ts_eastern.time(), ts_eastern
def on_hiveq_event(ctx: hf.Context, event: hf.Event):
"""Strategy testing MOO and MOC orders."""
global state
if event.type == EventType.START:
ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')
print("MOO/MOC Strategy started!")
elif event.type == EventType.BAR:
bar = event.data()
symbol = bar.symbol
eastern_time, ts_eastern = get_eastern_time(bar)
# MOO Order: Place at 8:00 AM EST (before 9:30 AM open)
if not state.moo_order_placed:
if eastern_time >= time(8, 0, 0) and eastern_time < time(8, 1, 0):
print(f"[{ts_eastern}] Placing MOO order")
# Use MOO order type
order = ctx.buy_order(symbol, quantity=100, order_type=OrderType.MOO)
state.moo_order_id = order.order_id
state.moo_order_placed = True
ctx.add_event_log(
message=f"MOO order placed at {eastern_time}",
symbol=symbol,
state_variable={"order_id": state.moo_order_id}
)
# MOC Order: Place at 3:55 PM EST (before 4:00 PM close)
if state.moo_order_filled and not state.moc_order_placed:
if eastern_time >= time(15, 55, 0) and eastern_time < time(15, 56, 0):
position = ctx.net_position(symbol)
if position > 0:
print(f"[{ts_eastern}] Placing MOC order")
# Use MOC order type
order = ctx.sell_order(symbol, quantity=position, order_type=OrderType.MOC)
state.moc_order_id = order.order_id
state.moc_order_placed = True
ctx.add_event_log(
message=f"MOC order placed at {eastern_time}",
symbol=symbol,
state_variable={"order_id": state.moc_order_id}
)
elif event.type == EventType.ORDER_FILLED:
order = event.data()
if order.order_id == state.moo_order_id:
state.moo_order_filled = True
print(f"MOO ORDER FILLED - Price: {order.avg_px}")
if order.order_id == state.moc_order_id:
state.moc_order_filled = True
print(f"MOC ORDER FILLED - Price: {order.avg_px}")
# MOO and MOC orders are supported via the order_type parameter:
# ctx.buy_order(symbol, quantity, order_type=OrderType.MOO)
# ctx.sell_order(symbol, quantity, order_type=OrderType.MOC)
if __name__ == '__main__':
report = hf.run_backtest(
strategy_configs=[], # Uses global function
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-02',
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m'],
}]
)
print(hf.event_logs().to_string())Key Points:
- MOO orders placed before market open (9:30 AM EST)
- MOC orders placed before cutoff (typically 3:55 PM EST)
- Use
TimeInForce.AT_THE_OPENfor MOO - Use
TimeInForce.AT_THE_CLOSEfor MOC - Orders fill at opening/closing auction prices
Asset Types
Futures Trading
Subscribe and trade futures contracts.
Source: examples/subscribe_futures.py
import hiveq.flow as hf
from hiveq.flow import Context, StrategyConfig
from hiveq.flow.config import AssetType
class TestFuturesStrategy:
def __init__(self):
self.bar_count = 0
def on_start(self, ctx: Context, event):
"""Initialize strategy and subscribe to futures."""
print("="*80)
print("STRATEGY STARTED - Subscribing to ES.1C (Front Month)")
print("="*80)
# Subscribe to futures contract
ctx.subscribe_bars(symbols=["ES.H25"], asset_type=AssetType.FUTURES, interval="1m")
ctx.add_event_log(
message="Subscribed to ES.1C continuous futures",
symbol="ES.1C"
)
def on_bar(self, ctx: Context, event):
"""Handle bar events."""
bar = event.bar
print(f"[BAR] {bar.symbol} | Time: {bar.ts_event} | "
f"O: {bar.open:.2f} H: {bar.high:.2f} "
f"L: {bar.low:.2f} C: {bar.close:.2f} | V: {bar.volume}")
# Place order on first bar
symbol = bar.symbol
if ctx.is_flat(symbol) and self.bar_count == 0:
print(f"Placing BUY order for {symbol}")
order = ctx.buy_order(symbol=symbol, quantity=1)
print(f"Order ID: {order.order_id}")
self.bar_count += 1
def on_stop(self, ctx: Context, event):
"""Strategy cleanup."""
print("="*80)
print("STRATEGY STOPPED")
print("="*80)
if __name__ == "__main__":
# Configure strategy
strategy_config = StrategyConfig(
name='test_futures_strategy',
type='TestFuturesStrategy'
)
# Run backtest
report = hf.run_backtest(
strategy_configs=[strategy_config],
symbols=['ES.1C'], # Continuous contract
data_configs=[{
'type': 'csv',
'data_type': 'bars_1m',
'id': 'test_bars',
'path': 'bars/test_rollover_bars.csv'
}],
start_date='2025-03-13',
end_date='2025-03-15'
)
# Display results
print("="*80)
print("BACKTEST RESULTS")
print("="*80)
if results.run_info is not None:
print(results.run_info.to_string())
print("Event Logs:")
print(hf.event_logs().to_string())
print(results.fills.to_string())Options Trading
Subscribe and trade options contracts.
Source: examples/subscribe_options.py
import hiveq.flow as hf
from hiveq.flow.config import AssetType, EventType
class TestOptionsStrategy:
"""Simple test strategy for options data."""
def on_hiveq_event(self, ctx: hf.Context, event):
"""Handle strategy events."""
if event.type == EventType.START:
print("="*60)
print("Starting Test Options Strategy")
print("="*60)
# Subscribe to option symbols
ctx.subscribe_bars(
symbols=ctx.strategy_config.symbols,
asset_type=AssetType.OPTIONS,
interval='1m'
)
self.bar_count = 0
self.symbols_seen = set()
elif event.type == EventType.BAR:
bar = event.data()
symbol = bar.symbol
# Place order on first bar
if ctx.is_flat(symbol) and self.bar_count == 0:
print(f"Placing BUY order for {symbol}")
print(f" Price: {bar.close}")
order = ctx.buy_order(symbol=symbol, quantity=1)
print(f" Order ID: {order.order_id}")
self.bar_count += 1
self.symbols_seen.add(symbol)
elif event.type == EventType.ORDER_FILLED:
order = event.data()
print(f"** ORDER FILLED **")
print(f" Symbol: {order.symbol}")
print(f" Side: {order.side}")
print(f" Quantity: {order.filled_qty}")
print(f" Fill Price: {order.avg_px}")
elif event.type == EventType.STOP:
print("="*60)
print("Test Options Strategy Complete")
print(f"Total bars: {self.bar_count}")
print(f"Unique symbols: {len(self.symbols_seen)}")
print("="*60)
def main():
"""Run the test options backtest."""
# Configure strategy
strategy_config = hf.StrategyConfig(
name='TestOptionsStrategy',
type='TestOptionsStrategy',
params={}
)
# Run backtest
report = hf.run_backtest(
strategy_configs=[strategy_config],
symbols=[
"SPXW251014C00450000" # SPX Weekly Call
],
start_date='2025-10-14',
end_date='2025-10-15',
data_configs=[{
'type': 'csv',
'data_type': 'bars_1m',
'path': 'bars/test_options_bars.csv',
'id': 'test_options_bars'
}]
)
print(hf.event_logs().to_string())
print(report.fills.to_string())
if __name__ == "__main__":
main()Option Symbol Format:
- Format:
UNDERLYING + EXPIRY + TYPE + STRIKE - Example:
SPXW251014C00450000- SPXW: Underlying (SPX Weekly)
- 251014: Expiry (Oct 14, 2025 → YYMMDD)
- C: Call (or P for Put)
- 00450000: Strike price ($450.00 with padding)
Options Trading with HiveQ Historical Data
Trade options using HiveQ historical options dataset.
Important: Use HIVEQ_US_OPT dataset for options trading, not HIVEQ_US_EQ.
import hiveq.flow as hf
from hiveq.flow import StrategyConfig
from hiveq.flow.config import EventType
class SimpleOptionsStrategy:
"""Trade SPY call options based on momentum."""
def __init__(self):
self.call_option = 'SPY251114C00600000' # SPY Nov 14 $600 Call
self.entry_threshold = 0.005 # 0.5% move
self.open_price = None
def on_hiveq_event(self, ctx, event):
if event.type == EventType.START:
# Subscribe to underlying and option
ctx.subscribe_bars(symbols=['SPY'], asset_type=AssetType.EQUITY, interval='1m')
ctx.subscribe_bars(
symbols=[self.call_option],
asset_type=AssetType.OPTIONS,
interval='1m'
)
elif event.type == EventType.BAR:
bar = event.data()
# Track underlying price
if bar.symbol == 'SPY':
if self.open_price is None:
self.open_price = bar.close
# Check for entry signal
price_change = (bar.close - self.open_price) / self.open_price
if price_change >= self.entry_threshold:
if ctx.is_flat(self.call_option):
ctx.buy_order(symbol=self.call_option, quantity=1)
# Run backtest with HIVEQ_US_OPT dataset
report = hf.run_backtest(
strategy_configs=[
StrategyConfig(
name='SimpleOptions',
type='SimpleOptionsStrategy',
params={}
)
],
symbols=['SPY', 'SPY251114C00600000'],
start_date='2025-09-01',
end_date='2025-09-05',
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_OPT', # Use HIVEQ_US_OPT for options!
'schema': ['bars_1m']
}]
)
print(report.return_stats)Key Points:
- Dataset: Always use
HIVEQ_US_OPTfor options trading - Symbols: Include both underlying ('SPY') and options ('SPY251114C00600000')
- Subscribe: Use
subscribe_bars()withasset_type=AssetType.OPTIONSfor options symbols - Monitoring: Subscribe to underlying to generate entry/exit signals
Common Datasets:
HIVEQ_US_EQ→ For equities (stocks) onlyHIVEQ_US_OPT→ For options tradingHIVEQ_US_FUT→ For futures trading
Live Trading
Live Trading with Databento
Run strategies in live mode with real-time data.
Source: examples/live_example.py
import hiveq.flow as hf
from hiveq.flow import StrategyConfig
# Stop with SIGINT/CTRL+C
if __name__ == "__main__":
strategy_configs = [
StrategyConfig(
name='SMALive',
type='SMACrossoverStrategy'
)
]
# Run live (blocking call)
hf.run_live(
strategy_configs=strategy_configs,
symbols=['AAPL'],
data_configs=[{
'type': 'databento',
'id': 'databento_client',
'api_key': 'db-kirXAdmRKUqxrqBH7vvrv5WL5777m',
'venue_dataset_map': {"SIM": "EQUS.MINI"}
}]
)Key Differences from Backtest:
- Uses
run_live()instead ofrun_backtest() - Requires live data provider (Databento, Interactive Brokers, etc.)
- Blocking call - runs until interrupted
- Real-time event processing
- Use Ctrl+C to stop
Live Data Config:
{
'type': 'databento', # Live data provider
'id': 'databento_client',
'api_key': 'your_api_key',
'venue_dataset_map': {
'SIM': 'EQUS.MINI', # Simulated equities
'XCME': 'GLBX.MDP3' # CME futures
}
}Performance Reporting
Accessing Results
import hiveq.flow as hf
# Run backtest
report = hf.run_backtest(...)
# Return statistics
print(results.return_stats.to_string())
# Shows: total return, annualized return, Sharpe ratio, etc.
# Run information
print(results.run_info.to_string())
# Shows: dates, symbols, capital, commission, etc.
# Positions
print(results.positions.to_string())
# Shows: all positions opened/closed with P&L
# Fills
print(results.fills.to_string())
# Shows: all order fills with execution details
# Event logs
event_df = hf.event_logs()
print(event_df.to_string())
# Shows: all logged events
# Create tearsheet (for Marimo/Jupyter)
import marimo as mo
mo.md(results.create_tearsheet())Summary
All examples demonstrate: 2. Configuration: Use StrategyConfig to configure strategies 3. Data Sources: Support for CSV, HiveQ historical, and live data 4. Event Handling: Multiple patterns (single handler, callbacks, inline) 5. Order Management: Market, limit, stop, MOO, MOC orders 6. Position Tracking: Real-time position and P&L tracking 7. Multi-Asset: Equities, futures, options support 8. Custom Data: Integration of custom signals and data 9. Performance: Comprehensive reporting and analytics
All source files available in /examples/ directory.