Strategy Development
Strategy Development Guide
Complete guide for developing trading strategies with HiveQ Flow.
Table of Contents
- Strategy Basics
- Execution Modes
- Event Handling
- State Management
- Data Subscriptions
- Order Placement
- Position Management
- Best Practices
Strategy Basics
Strategy Structure
A HiveQ Flow strategy can be implemented in three ways:
1. Class-Based Strategy with Single Event Handler
import hiveq.flow as hf
from hiveq.flow.config import EventType, AssetType
class MyStrategy:
def __init__(self):
"""Initialize strategy state."""
self.param1 = 10
self.param2 = 20
self.state = {}
def on_hiveq_event(self, ctx: hf.Context, event: hf.Event):
"""
Main event handler for all events.
Args:
ctx: Context object providing API access
event: Event object with type and data
"""
if event.type == EventType.START:
# Initialize strategy
ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')
elif event.type == EventType.BAR:
# Process bar data
bar = event.data()
# Trading logic here
elif event.type == EventType.ORDER_FILLED:
# Handle order fills
order = event.data()Advantages:
- Simple, single entry point
- Easy to understand event flow
- Good for smaller strategies
2. Class-Based Strategy with Individual Callbacks
import hiveq.flow as hf
from hiveq.flow.events import StartEvent, BarEvent, OrderEvent
from hiveq.flow.config import AssetType
class MyStrategy:
def __init__(self):
"""Initialize strategy state."""
self.params = {}
def on_start(self, ctx: hf.Context, event: StartEvent):
"""Called when strategy starts."""
ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')
def on_bar(self, ctx: hf.Context, event: BarEvent):
"""Called for each bar."""
bar = event.data()
# Trading logic here
def on_order(self, ctx: hf.Context, event: OrderEvent):
"""Called on order events (filled, rejected, etc.)."""
order = event.data()
def on_stop(self, ctx: hf.Context, event):
"""Called when strategy stops."""
# Cleanup logic hereAdvantages:
- Type hints for each event
- Better IDE autocomplete
- Cleaner separation of concerns
- Easier unit testing
3. Function-Based Strategy (Global Function)
import hiveq.flow as hf
from hiveq.flow.config import EventType, AssetType
def on_hiveq_event(ctx: hf.Context, event: hf.Event):
"""Global function strategy."""
if event.type == EventType.START:
ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')
elif event.type == EventType.BAR:
bar = event.data()
# Trading logic here
# Use with empty strategy_configs list
hf.run_backtest(strategy_configs=[], ...)Advantages:
- Ultra simple for prototyping
- Good for notebooks
- No class boilerplate
Execution Modes
Understanding execution modes is crucial for strategy development as it affects how your strategy runs and manages state.
Strategy Type and Fetch Mode
When running backtests, you control execution behavior with two parameters:
| strategy_type | fetch_mode | Data Fetch | Execution | State Reset |
|---|---|---|---|---|
'intraday' (default) | 'daily' (default) | Day-by-day | Day-by-day | Each day |
'intraday' | 'prefetch' | Single fetch | Single run | Never |
'overnight' | (ignored) | Single fetch | Single run | Never |
Mode Details
Intraday + Daily (Default)
report = hf.run_backtest(
strategy_configs=[config],
symbols=['AAPL'],
start_date='2025-01-01',
end_date='2025-06-30',
data_configs=[...],
strategy_type='intraday', # default
fetch_mode='daily' # default
)Characteristics:
- Data is fetched day-by-day (memory efficient for long backtests)
- Strategy instance is recreated each trading day
- All strategy state automatically resets at day boundaries
- Best for pure intraday strategies that shouldn't hold overnight
Important: Your __init__ method runs each day, so state does not persist between days.
Intraday + Prefetch
report = hf.run_backtest(
strategy_configs=[config],
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-30',
data_configs=[...],
strategy_type='intraday',
fetch_mode='prefetch'
)Characteristics:
- All data is fetched upfront in a single request
- Strategy runs continuously across all days
- State persists across day boundaries
- Faster for shorter backtests
Use When: You need state continuity across days but still want intraday behavior.
Overnight
report = hf.run_backtest(
strategy_configs=[config],
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-30',
data_configs=[...],
strategy_type='overnight'
)Characteristics:
- All data is fetched upfront in a single request
- Strategy runs continuously across all days
- State persists across day boundaries
- Positions can be held overnight
Use When: Building swing trading or position trading strategies that hold overnight.
Designing for Different Modes
Mode-Agnostic Strategy
If you want your strategy to work identically across all modes, manage state resets manually:
class ModeAgnosticStrategy:
def __init__(self):
self.current_date = None
self.price_history = {}
def on_bar(self, ctx, event):
bar = event.data()
bar_date = pd.Timestamp(bar.ts_event, unit='ns').date()
# Detect day change and reset state
if self.current_date is not None and bar_date != self.current_date:
# Reset state at day boundary
self.price_history = {}
self.current_date = bar_date
# Rest of your logic...Overnight Strategy
For strategies that intentionally hold positions overnight:
class SwingStrategy:
def __init__(self):
# State that should persist across days
self.entry_prices = {}
self.days_in_position = {}
def on_bar(self, ctx, event):
bar = event.data()
# Track days in position
if ctx.is_net_long(bar.symbol):
self.days_in_position[bar.symbol] = self.days_in_position.get(bar.symbol, 0) + 1
# Exit after 5 days
if self.days_in_position.get(bar.symbol, 0) >= 5:
ctx.sell_order(bar.symbol, quantity=ctx.net_position(bar.symbol))Event Handling
Available Event Types
Lifecycle Events
EventType.START # Strategy initialization
EventType.STOP # Strategy cleanupSTART Event:
- Called once when strategy begins
- Use for subscriptions and initialization
- Context available, but no market data yet
def on_start(self, ctx: hf.Context, event: StartEvent):
# Subscribe to data
ctx.subscribe_bars(symbols=['AAPL', 'GOOGL'], asset_type=AssetType.EQUITY, interval='1m')
# Set timers
ctx.set_timer('status_report', pd.Timedelta(minutes=5))
# Initialize state from config
self.param = ctx.strategy_config.params.get('param', default_value)STOP Event:
- Called once when strategy ends
- Use for cleanup and final reporting
- Last chance to log information
def on_stop(self, ctx: hf.Context, event):
logger.info(f"Strategy completed. Total trades: {self.trade_count}")
logger.info(f"Final P&L: {ctx.portfolio().total_pnl()}")Market Data Events
EventType.BAR # Bar (OHLCV) data
EventType.TICK # Tick data
EventType.QUOTE # Quote data
EventType.TRADE # Trade data
EventType.CUSTOM_DATA # Custom user dataBAR Event:
- Most common event for strategies
- Triggered for each subscribed bar interval
- Contains OHLCV data
def on_bar(self, ctx: hf.Context, event: BarEvent):
bar = event.data()
# Bar attributes
symbol = bar.symbol
timestamp = bar.ts_event
open_price = bar.open
high_price = bar.high
low_price = bar.low
close_price = bar.close
volume = bar.volume
# Trading logic
if close_price > open_price:
ctx.buy_order(symbol, quantity=100)CUSTOM_DATA Event:
- For custom signals and indicators
- Synchronized with market data by timestamp
def on_hiveq_event(self, ctx: hf.Context, event: hf.Event):
if event.type == EventType.CUSTOM_DATA:
data = event.data()
# Access custom fields from your CSV
signal = data.signal # Assuming 'signal' column in CSV
value = data.value # Assuming 'value' column in CSV
if signal == 'BUY':
ctx.buy_order(data.symbol, quantity=100)Order Events
EventType.ORDER_SUBMITTED # Order sent to exchange
EventType.ORDER_ACCEPTED # Order accepted by exchange
EventType.ORDER_REJECTED # Order rejected
EventType.ORDER_FILLED # Order completely filled
EventType.ORDER_CANCELED # Order canceledOrder Lifecycle:
SUBMITTED → ACCEPTED → FILLED
↓
REJECTED
CANCELEDUse the on_order() callback to handle all order events. Check event.type to distinguish:
def on_order(self, ctx: hf.Context, event: OrderEvent):
order = event.data()
if event.type == EventType.ORDER_SUBMITTED:
logger.info(f"Order {order.order_id} submitted")
elif event.type == EventType.ORDER_ACCEPTED:
logger.info(f"Order {order.order_id} accepted")
elif event.type == EventType.ORDER_FILLED:
logger.info(f"Order {order.order_id} filled at {order.avg_px}")
# Update strategy state
self.last_fill_price = order.avg_px
elif event.type == EventType.ORDER_REJECTED:
logger.error(f"Order {order.order_id} rejected!")
# Handle rejection (e.g., retry with different price)Position Events
EventType.POSITION_OPENED # New position opened
EventType.POSITION_CHANGED # Position size changed
EventType.POSITION_CLOSED # Position closeddef on_position_opened(self, ctx: hf.Context, event):
position = event.data()
logger.info(f"Opened position in {position.symbol}: {position.quantity}")
def on_position_changed(self, ctx: hf.Context, event):
position = event.data()
logger.info(f"Position changed: {position.symbol} → {position.quantity}")
def on_position_closed(self, ctx: hf.Context, event):
position = event.data()
logger.info(f"Closed position in {position.symbol}")Timer Events
EventType.TIMER # Periodic timer callbackdef on_start(self, ctx: hf.Context, event: StartEvent):
# Set 5-minute status report timer
ctx.set_timer('status_report', pd.Timedelta(minutes=5))
def on_timer(self, ctx: hf.Context, event: TimerEvent):
timer = event.data()
if timer.timer_id == 'status_report':
# Run status report
portfolio = ctx.portfolio()
logger.info(f"Current P&L: {portfolio.total_pnl()}")
# Cancel timer if needed
ctx.cancel_timer('status_report')State Management
Instance Variables
Store strategy state in instance variables:
class MyStrategy:
def __init__(self):
"""Initialize strategy state."""
# Parameters
self.fast_window = 10
self.slow_window = 30
# State variables
self.trade_count = 0
self.last_signal = None
self.entry_price = None
# Data structures
self.price_history = deque(maxlen=100)
self.indicators = {}
def on_bar(self, ctx: hf.Context, event: BarEvent):
bar = event.data()
# Update state
self.price_history.append(bar.close)
# Use state in logic
if len(self.price_history) >= self.fast_window:
fast_ma = np.mean(list(self.price_history)[-self.fast_window:])
# ...Per-Symbol State
Use dictionaries to track state per symbol:
class MyStrategy:
def __init__(self):
"""Initialize per-symbol state tracking."""
# Dictionary: symbol → state
self.symbol_states = {}
def get_or_create_state(self, symbol: str):
"""Get or create state for symbol."""
if symbol not in self.symbol_states:
self.symbol_states[symbol] = {
'prices': deque(maxlen=100),
'position_side': None,
'entry_price': None,
'trade_count': 0
}
return self.symbol_states[symbol]
def on_bar(self, ctx: hf.Context, event: BarEvent):
bar = event.data()
symbol = bar.symbol
# Get symbol-specific state
state = self.get_or_create_state(symbol)
# Update state
state['prices'].append(bar.close)
# Use state
if len(state['prices']) >= 10:
ma = np.mean(state['prices'])
# Trading logic...Configuration Parameters
Access parameters from StrategyConfig:
# In backtest setup
strategy_config = StrategyConfig(
name='MyStrategy',
type='MyStrategyClass',
params={
'fast_window': 10,
'slow_window': 30,
'trade_size': 100,
'stop_loss_pct': 0.02
}
)
# In strategy
class MyStrategyClass:
def __init__(self):
# Default parameters
self.fast_window = 20
self.slow_window = 50
def on_start(self, ctx: hf.Context, event: StartEvent):
# Override from config
params = ctx.strategy_config.params
self.fast_window = params.get('fast_window', self.fast_window)
self.slow_window = params.get('slow_window', self.slow_window)
self.trade_size = params.get('trade_size', 100)
logger.info(f"Strategy parameters: fast={self.fast_window}, "
f"slow={self.slow_window}")Data Subscriptions
Bar Data
Subscribe to OHLCV bar data:
def on_start(self, ctx: hf.Context, event: StartEvent):
# Single symbol, 1-minute bars
ctx.subscribe_bars(symbols=['AAPL'], asset_type=AssetType.EQUITY, interval='1m')
# Multiple symbols
ctx.subscribe_bars(symbols=['AAPL', 'GOOGL', 'MSFT'], asset_type=AssetType.EQUITY, interval='1m')
# Different intervals
ctx.subscribe_bars(symbols=['AAPL'], asset_type=AssetType.EQUITY, interval='5m') # 5-minute
ctx.subscribe_bars(symbols=['SPY'], asset_type=AssetType.EQUITY, interval='1h') # 1-hour
ctx.subscribe_bars(symbols=['SPY'], asset_type=AssetType.EQUITY, interval='1d') # DailySupported Intervals:
- Seconds:
1s,5s,30s - Minutes:
1m,5m,15m,30m - Hours:
1h,4h - Days:
1d
Custom Data
Subscribe to custom signals and indicators:
def on_start(self, ctx: hf.Context, event: StartEvent):
# Subscribe to bars
ctx.subscribe_bars(symbols=['AAPL'], asset_type=AssetType.EQUITY, interval='1m')
# Subscribe to custom data
ctx.subscribe_data(data_id='UserSignals')
def on_hiveq_event(self, ctx: hf.Context, event: hf.Event):
if event.type == EventType.CUSTOM_DATA:
data = event.data()
# Access custom fields
signal = data.signal
symbol = data.symbol
value = data.value
if signal == 'BUY':
ctx.buy_order(symbol, quantity=100)Custom Data CSV:
timestamp,symbol,signal,value
2025-08-01 09:30:00,AAPL,BUY,1.5
2025-08-01 09:31:00,AAPL,HOLD,0.8Futures Data
Subscribe to futures contracts:
def on_start(self, ctx: hf.Context, event: StartEvent):
# Subscribe to futures contracts
ctx.subscribe_bars(symbols=["ES.H25"], asset_type=AssetType.FUTURES, interval="1m")Options Data
Subscribe to options contracts:
def on_start(self, ctx: hf.Context, event: StartEvent):
ctx.subscribe_bars(
symbols=[
"SPY251014C00450000", # Call
"SPY251014P00450000", # Put
],
asset_type=AssetType.OPTIONS,
interval="1m"
)Order Placement
Market Orders
Simplest order type - executes at current market price:
def on_bar(self, ctx: hf.Context, event: BarEvent):
bar = event.data()
# Market buy order
order = ctx.buy_order(symbol='AAPL', quantity=100)
logger.info(f"Placed market buy order: {order.order_id}")
# Market sell order
order = ctx.sell_order(symbol='AAPL', quantity=100)
logger.info(f"Placed market sell order: {order.order_id}")Limit Orders
Execute at specified price or better:
def on_bar(self, ctx: hf.Context, event: BarEvent):
bar = event.data()
# Buy limit order (buy at $150.50 or lower)
order = ctx.buy_order(
symbol='AAPL',
quantity=100,
order_type=OrderType.LIMIT,
limit_price=150.50
)
# Sell limit order (sell at $152.00 or higher)
order = ctx.sell_order(
symbol='AAPL',
quantity=100,
order_type=OrderType.LIMIT,
limit_price=152.00
)Stop Orders
Trigger when price reaches stop level:
def on_bar(self, ctx: hf.Context, event: BarEvent):
# Buy stop order (buy when price hits $155)
order = ctx.buy_order(
symbol='AAPL',
quantity=100,
order_type=OrderType.STOP,
stop_price=155.00
)
# Sell stop order (stop loss at $148)
order = ctx.sell_order(
symbol='AAPL',
quantity=100,
order_type=OrderType.STOP,
stop_price=148.00
)Order Management
Track and cancel orders:
def on_bar(self, ctx: hf.Context, event: BarEvent):
bar = event.data()
# Place limit order
order = ctx.buy_order(
symbol='AAPL',
quantity=100,
order_type=OrderType.LIMIT,
limit_price=150.00
)
# Store order ID
self.pending_order_id = order.order_id
# Later, check if still open
if ctx.has_open_order('AAPL'):
# Cancel if price moved away
if bar.close > 152.00:
success = ctx.cancel_order(self.pending_order_id)
if success:
logger.info(f"Canceled order {self.pending_order_id}")Position-Aware Order Placement
Check position before placing orders:
def on_bar(self, ctx: hf.Context, event: BarEvent):
bar = event.data()
symbol = bar.symbol
# Check current position
position = ctx.net_position(symbol)
if ctx.is_flat(symbol):
# No position - can enter new trade
ctx.buy_order(symbol, quantity=100)
elif ctx.is_net_long(symbol):
# Long position - can exit or add
if should_exit():
# Close entire position
ctx.sell_order(symbol, quantity=position)
elif ctx.is_net_short(symbol):
# Short position - can cover
if should_cover():
# Cover entire short
ctx.buy_order(symbol, quantity=abs(position))
# Check for pending orders
pending_qty = ctx.open_order_qty(symbol)
total_exposure = position + pending_qty
if abs(total_exposure) < 1000:
# Safe to place more orders
ctx.buy_order(symbol, quantity=100)Position Management
Querying Positions
def on_bar(self, ctx: hf.Context, event: BarEvent):
symbol = 'AAPL'
# Get net position
position = ctx.net_position(symbol)
# Returns: positive=long, negative=short, 0=flat
# Check position state
is_flat = ctx.is_flat(symbol)
is_long = ctx.is_net_long(symbol)
is_short = ctx.is_net_short(symbol)
# Get pending order quantity
pending = ctx.open_order_qty(symbol)
# Calculate total exposure
total_exposure = position + pending
logger.info(f"Position: {position}, Pending: {pending}, "
f"Total: {total_exposure}")Portfolio Information
def on_bar(self, ctx: hf.Context, event: BarEvent):
# Strategy-filtered portfolio
portfolio = ctx.portfolio()
# Get positions
aapl_position = portfolio.net_position('AAPL')
# Check position state
if portfolio.is_flat('AAPL'):
logger.info("No AAPL position")
if portfolio.is_net_long('AAPL'):
logger.info("Long AAPL")
# Get P&L (strategy-filtered)
total_pnl = portfolio.total_pnl()
unrealized_pnl = portfolio.unrealized_pnl()
realized_pnl = portfolio.realized_pnl()
# Available cash
balance = portfolio.cash()
logger.info(f"Balance: {balance}, PnL: {total_pnl}")Global Portfolio (Multi-Strategy)
def on_bar(self, ctx: hf.Context, event: BarEvent):
# Account-level portfolio (all strategies)
global_portfolio = ctx.global_portfolio()
# Get aggregate position across all strategies
total_aapl = global_portfolio.net_position('AAPL')
# Get aggregate P&L
account_pnl = global_portfolio.total_pnl()
# Compare strategy vs account
strategy_pnl = ctx.portfolio().total_pnl()
logger.info(f"Strategy P&L: {strategy_pnl}")
logger.info(f"Account P&L: {account_pnl}")
# Risk management across all strategies
total_exposure = global_portfolio.net_exposure()
if total_exposure > 1000000:
logger.warning("Total exposure exceeds limit!")Best Practices
1. Initialize in START Event
Always subscribe to data and initialize in START:
def on_start(self, ctx: hf.Context, event: StartEvent):
# Subscribe to all required data
ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')
# Set up timers
ctx.set_timer('status', pd.Timedelta(minutes=5))
# Initialize state
self.initialized = True2. Handle Missing Data Gracefully
Check for sufficient data before trading:
def on_bar(self, ctx: hf.Context, event: BarEvent):
bar = event.data()
# Build price history
self.prices.append(bar.close)
# Wait for enough data
if len(self.prices) < self.window_size:
logger.debug(f"Building history: {len(self.prices)}/{self.window_size}")
return
# Now safe to calculate indicators
ma = np.mean(self.prices)
# Trading logic...3. Use Per-Symbol State for Multi-Symbol Strategies
Don't mix state between symbols:
class MyStrategy:
def __init__(self):
# Per-symbol state
self.symbol_data = {}
def get_state(self, symbol):
if symbol not in self.symbol_data:
self.symbol_data[symbol] = {
'prices': deque(maxlen=100),
'indicators': {},
'position_side': None
}
return self.symbol_data[symbol]
def on_bar(self, ctx: hf.Context, event: BarEvent):
bar = event.data()
# Get symbol-specific state
state = self.get_state(bar.symbol)
# Update and use state
state['prices'].append(bar.close)4. Log Important Events
Use event logging for debugging and analysis:
def on_bar(self, ctx: hf.Context, event: BarEvent):
bar = event.data()
# Log signal generation
if signal_detected:
ctx.add_event_log(
message="Bullish signal detected",
symbol=bar.symbol,
state_variable={
'price': bar.close,
'indicator_value': indicator_value
}
)
# Log trade entry
if entering_position:
ctx.add_event_log(
message="Entering long position",
symbol=bar.symbol,
state_variable={'quantity': quantity}
)
order = ctx.buy_order(bar.symbol, quantity=quantity)5. Handle Order Rejections
Always handle potential order failures:
def on_order(self, ctx: hf.Context, event: OrderEvent):
order = event.data()
if event.type != EventType.ORDER_REJECTED:
return
logger.error(f"Order rejected: {order.order_id}")
logger.error(f"Symbol: {order.symbol}, Side: {order.side}")
# Log for analysis
ctx.add_event_log(
message="Order rejected",
symbol=order.symbol,
state_variable={'order_id': order.order_id}
)
# Retry with different parameters or skip
# ...6. Avoid Excessive Order Placement
Check for pending orders before placing new ones:
def on_bar(self, ctx: hf.Context, event: BarEvent):
symbol = 'AAPL'
# Don't place orders if we already have pending ones
if ctx.has_open_order(symbol):
logger.debug(f"Skipping - already have pending orders for {symbol}")
return
# Check total exposure
position = ctx.net_position(symbol)
pending = ctx.open_order_qty(symbol)
total = position + pending
if abs(total) >= self.max_position:
logger.debug(f"Skipping - at max position: {total}")
return
# Safe to place order
ctx.buy_order(symbol, quantity=100)7. Use Parameters for Flexibility
Make strategy configurable:
# Strategy definition
class MyStrategy:
def __init__(self):
# Defaults
self.window = 20
self.threshold = 0.02
self.trade_size = 100
def on_start(self, ctx: hf.Context, event: StartEvent):
# Override from config
params = ctx.strategy_config.params
self.window = params.get('window', self.window)
self.threshold = params.get('threshold', self.threshold)
self.trade_size = params.get('trade_size', self.trade_size)
# Run with different parameters
strategy_config = StrategyConfig(
name='MyStrategy',
type='MyStrategy',
params={
'window': 30,
'threshold': 0.03,
'trade_size': 200
}
)8. Clean Up in STOP Event
Always clean up resources:
def on_stop(self, ctx: hf.Context, event):
# Cancel any pending timers
ctx.cancel_timer('status_report')
# Log final statistics
logger.info("="*60)
logger.info("Strategy Completed")
logger.info(f"Total trades: {self.trade_count}")
logger.info(f"Final P&L: {ctx.portfolio().total_pnl()}")
logger.info("="*60)
# Close any remaining positions if needed
for symbol in ctx.strategy_config.symbols:
if not ctx.is_flat(symbol):
position = ctx.net_position(symbol)
logger.warning(f"Closing remaining position: {symbol} = {position}")
if position > 0:
ctx.sell_order(symbol, quantity=position)
else:
ctx.buy_order(symbol, quantity=abs(position))Complete Example
Putting it all together:
import numpy as np
import hiveq.flow as hf
from hiveq.flow import Context, StrategyConfig
from hiveq.flow.config import EventType
from hiveq.flow.logger import logger
from collections import deque
from typing import Dict
logger = logger()
class ProductionReadyStrategy:
"""
Production-ready strategy template with best practices.
"""
def __init__(self):
"""Initialize strategy with defaults."""
# Parameters (overridden from config)
self.window = 20
self.trade_size = 100
self.max_position = 1000
# State
self.symbol_data: Dict[str, dict] = {}
self.trade_count = 0
self.initialized = False
def get_symbol_data(self, symbol: str) -> dict:
"""Get or create per-symbol data structure."""
if symbol not in self.symbol_data:
self.symbol_data[symbol] = {
'prices': deque(maxlen=self.window),
'indicators': {},
'last_signal': None
}
return self.symbol_data[symbol]
def on_start(self, ctx: Context, event):
"""Initialize strategy."""
logger.info("="*60)
logger.info("Strategy Starting")
logger.info("="*60)
# Get symbols
symbols = ctx.strategy_config.symbols
logger.info(f"Trading symbols: {symbols}")
# Subscribe to data
ctx.subscribe_bars(symbols=symbols, asset_type=AssetType.EQUITY, interval='1m')
# Set up timer
ctx.set_timer('status_report', pd.Timedelta(minutes=5))
# Load parameters from config
params = ctx.strategy_config.params
self.window = params.get('window', self.window)
self.trade_size = params.get('trade_size', self.trade_size)
self.max_position = params.get('max_position', self.max_position)
logger.info(f"Parameters: window={self.window}, "
f"trade_size={self.trade_size}")
self.initialized = True
def on_bar(self, ctx: Context, event):
"""Process bar data and execute strategy logic."""
if not self.initialized:
return
bar = event.data()
symbol = bar.symbol
# Get symbol-specific data
data = self.get_symbol_data(symbol)
# Update price history
data['prices'].append(bar.close)
# Wait for enough data
if len(data['prices']) < self.window:
return
# Calculate indicator
ma = np.mean(data['prices'])
data['indicators']['ma'] = ma
# Generate signals
signal = self._generate_signal(bar, ma)
# Execute trades
self._execute_signal(ctx, symbol, signal, bar.close)
def _generate_signal(self, bar, ma):
"""Generate trading signal."""
if bar.close > ma * 1.01: # 1% above MA
return 'BUY'
elif bar.close < ma * 0.99: # 1% below MA
return 'SELL'
return 'HOLD'
def _execute_signal(self, ctx: Context, symbol: str, signal: str, price: float):
"""Execute trading signal."""
# Skip if pending orders
if ctx.has_open_order(symbol):
return
# Check position
position = ctx.net_position(symbol)
pending = ctx.open_order_qty(symbol)
total_exposure = position + pending
# BUY signal
if signal == 'BUY' and ctx.is_flat(symbol):
if abs(total_exposure + self.trade_size) <= self.max_position:
order = ctx.buy_order(symbol, quantity=self.trade_size)
ctx.add_event_log(
message=f"BUY signal executed",
symbol=symbol,
state_variable={'price': price, 'order_id': order.order_id}
)
# SELL signal
elif signal == 'SELL' and ctx.is_net_long(symbol):
order = ctx.sell_order(symbol, quantity=position)
ctx.add_event_log(
message=f"SELL signal executed",
symbol=symbol,
state_variable={'price': price, 'order_id': order.order_id}
)
def on_order(self, ctx: Context, event):
"""Handle order events (filled, rejected, etc.)."""
order = event.data()
self.trade_count += 1
logger.info(f"Order filled: {order.symbol} {order.side} "
f"{order.filled_qty} @ {order.avg_px}")
def on_timer(self, ctx: Context, event):
"""Periodic status report."""
timer = event.data()
if timer.timer_id == 'status_report':
portfolio = ctx.portfolio()
logger.info(f"Status: Trades={self.trade_count}, "
f"P&L={portfolio.total_pnl():.2f}")
def on_stop(self, ctx: Context, event):
"""Clean up strategy."""
logger.info("="*60)
logger.info("Strategy Stopped")
logger.info(f"Total trades: {self.trade_count}")
logger.info(f"Final P&L: {ctx.portfolio().total_pnl()}")
logger.info("="*60)
# Cancel timers
ctx.cancel_timer('status_report')
if __name__ == '__main__':
# Configure and run
strategy_config = StrategyConfig(
name='ProductionStrategy',
type='ProductionReadyStrategy',
params={
'window': 30,
'trade_size': 100,
'max_position': 1000
}
)
report = hf.run_backtest(
strategy_configs=[strategy_config],
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-30',
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m'],
}]
)
# Print results
print(results.return_stats.to_string())
print(hf.event_logs().to_string())This template includes all best practices and is ready for production use.