HiveQ Docs
HiveQ Flow

Strategy Development

Strategy Development Guide

Complete guide for developing trading strategies with HiveQ Flow.

Table of Contents


Strategy Basics

Strategy Structure

A HiveQ Flow strategy can be implemented in three ways:

1. Class-Based Strategy with Single Event Handler

import hiveq.flow as hf
from hiveq.flow.config import EventType, AssetType


class MyStrategy:
    def __init__(self):
        """Initialize strategy state."""
        self.param1 = 10
        self.param2 = 20
        self.state = {}

    def on_hiveq_event(self, ctx: hf.Context, event: hf.Event):
        """
        Main event handler for all events.

        Args:
            ctx: Context object providing API access
            event: Event object with type and data
        """
        if event.type == EventType.START:
            # Initialize strategy
            ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')

        elif event.type == EventType.BAR:
            # Process bar data
            bar = event.data()
            # Trading logic here

        elif event.type == EventType.ORDER_FILLED:
            # Handle order fills
            order = event.data()

Advantages:

  • Simple, single entry point
  • Easy to understand event flow
  • Good for smaller strategies

2. Class-Based Strategy with Individual Callbacks

import hiveq.flow as hf
from hiveq.flow.events import StartEvent, BarEvent, OrderEvent
from hiveq.flow.config import AssetType


class MyStrategy:
    def __init__(self):
        """Initialize strategy state."""
        self.params = {}

    def on_start(self, ctx: hf.Context, event: StartEvent):
        """Called when strategy starts."""
        ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')

    def on_bar(self, ctx: hf.Context, event: BarEvent):
        """Called for each bar."""
        bar = event.data()
        # Trading logic here

    def on_order(self, ctx: hf.Context, event: OrderEvent):
        """Called on order events (filled, rejected, etc.)."""
        order = event.data()

    def on_stop(self, ctx: hf.Context, event):
        """Called when strategy stops."""
        # Cleanup logic here

Advantages:

  • Type hints for each event
  • Better IDE autocomplete
  • Cleaner separation of concerns
  • Easier unit testing

3. Function-Based Strategy (Global Function)

import hiveq.flow as hf
from hiveq.flow.config import EventType, AssetType


def on_hiveq_event(ctx: hf.Context, event: hf.Event):
    """Global function strategy."""
    if event.type == EventType.START:
        ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')

    elif event.type == EventType.BAR:
        bar = event.data()
        # Trading logic here


# Use with empty strategy_configs list
hf.run_backtest(strategy_configs=[], ...)

Advantages:

  • Ultra simple for prototyping
  • Good for notebooks
  • No class boilerplate

Execution Modes

Understanding execution modes is crucial for strategy development as it affects how your strategy runs and manages state.

Strategy Type and Fetch Mode

When running backtests, you control execution behavior with two parameters:

strategy_typefetch_modeData FetchExecutionState Reset
'intraday' (default)'daily' (default)Day-by-dayDay-by-dayEach day
'intraday''prefetch'Single fetchSingle runNever
'overnight'(ignored)Single fetchSingle runNever

Mode Details

Intraday + Daily (Default)

report = hf.run_backtest(
    strategy_configs=[config],
    symbols=['AAPL'],
    start_date='2025-01-01',
    end_date='2025-06-30',
    data_configs=[...],
    strategy_type='intraday',  # default
    fetch_mode='daily'          # default
)

Characteristics:

  • Data is fetched day-by-day (memory efficient for long backtests)
  • Strategy instance is recreated each trading day
  • All strategy state automatically resets at day boundaries
  • Best for pure intraday strategies that shouldn't hold overnight

Important: Your __init__ method runs each day, so state does not persist between days.


Intraday + Prefetch

report = hf.run_backtest(
    strategy_configs=[config],
    symbols=['AAPL'],
    start_date='2025-08-01',
    end_date='2025-08-30',
    data_configs=[...],
    strategy_type='intraday',
    fetch_mode='prefetch'
)

Characteristics:

  • All data is fetched upfront in a single request
  • Strategy runs continuously across all days
  • State persists across day boundaries
  • Faster for shorter backtests

Use When: You need state continuity across days but still want intraday behavior.


Overnight

report = hf.run_backtest(
    strategy_configs=[config],
    symbols=['AAPL'],
    start_date='2025-08-01',
    end_date='2025-08-30',
    data_configs=[...],
    strategy_type='overnight'
)

Characteristics:

  • All data is fetched upfront in a single request
  • Strategy runs continuously across all days
  • State persists across day boundaries
  • Positions can be held overnight

Use When: Building swing trading or position trading strategies that hold overnight.


Designing for Different Modes

Mode-Agnostic Strategy

If you want your strategy to work identically across all modes, manage state resets manually:

class ModeAgnosticStrategy:
    def __init__(self):
        self.current_date = None
        self.price_history = {}

    def on_bar(self, ctx, event):
        bar = event.data()
        bar_date = pd.Timestamp(bar.ts_event, unit='ns').date()

        # Detect day change and reset state
        if self.current_date is not None and bar_date != self.current_date:
            # Reset state at day boundary
            self.price_history = {}
        self.current_date = bar_date

        # Rest of your logic...

Overnight Strategy

For strategies that intentionally hold positions overnight:

class SwingStrategy:
    def __init__(self):
        # State that should persist across days
        self.entry_prices = {}
        self.days_in_position = {}

    def on_bar(self, ctx, event):
        bar = event.data()

        # Track days in position
        if ctx.is_net_long(bar.symbol):
            self.days_in_position[bar.symbol] = self.days_in_position.get(bar.symbol, 0) + 1

        # Exit after 5 days
        if self.days_in_position.get(bar.symbol, 0) >= 5:
            ctx.sell_order(bar.symbol, quantity=ctx.net_position(bar.symbol))

Event Handling

Available Event Types

Lifecycle Events

EventType.START   # Strategy initialization
EventType.STOP    # Strategy cleanup

START Event:

  • Called once when strategy begins
  • Use for subscriptions and initialization
  • Context available, but no market data yet
def on_start(self, ctx: hf.Context, event: StartEvent):
    # Subscribe to data
    ctx.subscribe_bars(symbols=['AAPL', 'GOOGL'], asset_type=AssetType.EQUITY, interval='1m')

    # Set timers
    ctx.set_timer('status_report', pd.Timedelta(minutes=5))

    # Initialize state from config
    self.param = ctx.strategy_config.params.get('param', default_value)

STOP Event:

  • Called once when strategy ends
  • Use for cleanup and final reporting
  • Last chance to log information
def on_stop(self, ctx: hf.Context, event):
    logger.info(f"Strategy completed. Total trades: {self.trade_count}")
    logger.info(f"Final P&L: {ctx.portfolio().total_pnl()}")

Market Data Events

EventType.BAR          # Bar (OHLCV) data
EventType.TICK         # Tick data
EventType.QUOTE        # Quote data
EventType.TRADE        # Trade data
EventType.CUSTOM_DATA  # Custom user data

BAR Event:

  • Most common event for strategies
  • Triggered for each subscribed bar interval
  • Contains OHLCV data
def on_bar(self, ctx: hf.Context, event: BarEvent):
    bar = event.data()

    # Bar attributes
    symbol = bar.symbol
    timestamp = bar.ts_event
    open_price = bar.open
    high_price = bar.high
    low_price = bar.low
    close_price = bar.close
    volume = bar.volume

    # Trading logic
    if close_price > open_price:
        ctx.buy_order(symbol, quantity=100)

CUSTOM_DATA Event:

  • For custom signals and indicators
  • Synchronized with market data by timestamp
def on_hiveq_event(self, ctx: hf.Context, event: hf.Event):
    if event.type == EventType.CUSTOM_DATA:
        data = event.data()

        # Access custom fields from your CSV
        signal = data.signal  # Assuming 'signal' column in CSV
        value = data.value    # Assuming 'value' column in CSV

        if signal == 'BUY':
            ctx.buy_order(data.symbol, quantity=100)

Order Events

EventType.ORDER_SUBMITTED  # Order sent to exchange
EventType.ORDER_ACCEPTED   # Order accepted by exchange
EventType.ORDER_REJECTED   # Order rejected
EventType.ORDER_FILLED     # Order completely filled
EventType.ORDER_CANCELED   # Order canceled

Order Lifecycle:

SUBMITTED → ACCEPTED → FILLED

                REJECTED
                CANCELED

Use the on_order() callback to handle all order events. Check event.type to distinguish:

def on_order(self, ctx: hf.Context, event: OrderEvent):
    order = event.data()

    if event.type == EventType.ORDER_SUBMITTED:
        logger.info(f"Order {order.order_id} submitted")

    elif event.type == EventType.ORDER_ACCEPTED:
        logger.info(f"Order {order.order_id} accepted")

    elif event.type == EventType.ORDER_FILLED:
        logger.info(f"Order {order.order_id} filled at {order.avg_px}")
        # Update strategy state
        self.last_fill_price = order.avg_px

    elif event.type == EventType.ORDER_REJECTED:
        logger.error(f"Order {order.order_id} rejected!")
        # Handle rejection (e.g., retry with different price)

Position Events

EventType.POSITION_OPENED   # New position opened
EventType.POSITION_CHANGED  # Position size changed
EventType.POSITION_CLOSED   # Position closed
def on_position_opened(self, ctx: hf.Context, event):
    position = event.data()
    logger.info(f"Opened position in {position.symbol}: {position.quantity}")

def on_position_changed(self, ctx: hf.Context, event):
    position = event.data()
    logger.info(f"Position changed: {position.symbol}{position.quantity}")

def on_position_closed(self, ctx: hf.Context, event):
    position = event.data()
    logger.info(f"Closed position in {position.symbol}")

Timer Events

EventType.TIMER  # Periodic timer callback
def on_start(self, ctx: hf.Context, event: StartEvent):
    # Set 5-minute status report timer
    ctx.set_timer('status_report', pd.Timedelta(minutes=5))

def on_timer(self, ctx: hf.Context, event: TimerEvent):
    timer = event.data()

    if timer.timer_id == 'status_report':
        # Run status report
        portfolio = ctx.portfolio()
        logger.info(f"Current P&L: {portfolio.total_pnl()}")

        # Cancel timer if needed
        ctx.cancel_timer('status_report')

State Management

Instance Variables

Store strategy state in instance variables:

class MyStrategy:
    def __init__(self):
        """Initialize strategy state."""
        # Parameters
        self.fast_window = 10
        self.slow_window = 30

        # State variables
        self.trade_count = 0
        self.last_signal = None
        self.entry_price = None

        # Data structures
        self.price_history = deque(maxlen=100)
        self.indicators = {}

    def on_bar(self, ctx: hf.Context, event: BarEvent):
        bar = event.data()

        # Update state
        self.price_history.append(bar.close)

        # Use state in logic
        if len(self.price_history) >= self.fast_window:
            fast_ma = np.mean(list(self.price_history)[-self.fast_window:])
            # ...

Per-Symbol State

Use dictionaries to track state per symbol:

class MyStrategy:
    def __init__(self):
        """Initialize per-symbol state tracking."""
        # Dictionary: symbol → state
        self.symbol_states = {}

    def get_or_create_state(self, symbol: str):
        """Get or create state for symbol."""
        if symbol not in self.symbol_states:
            self.symbol_states[symbol] = {
                'prices': deque(maxlen=100),
                'position_side': None,
                'entry_price': None,
                'trade_count': 0
            }
        return self.symbol_states[symbol]

    def on_bar(self, ctx: hf.Context, event: BarEvent):
        bar = event.data()
        symbol = bar.symbol

        # Get symbol-specific state
        state = self.get_or_create_state(symbol)

        # Update state
        state['prices'].append(bar.close)

        # Use state
        if len(state['prices']) >= 10:
            ma = np.mean(state['prices'])
            # Trading logic...

Configuration Parameters

Access parameters from StrategyConfig:

# In backtest setup
strategy_config = StrategyConfig(
    name='MyStrategy',
    type='MyStrategyClass',
    params={
        'fast_window': 10,
        'slow_window': 30,
        'trade_size': 100,
        'stop_loss_pct': 0.02
    }
)

# In strategy
class MyStrategyClass:
    def __init__(self):
        # Default parameters
        self.fast_window = 20
        self.slow_window = 50

    def on_start(self, ctx: hf.Context, event: StartEvent):
        # Override from config
        params = ctx.strategy_config.params
        self.fast_window = params.get('fast_window', self.fast_window)
        self.slow_window = params.get('slow_window', self.slow_window)
        self.trade_size = params.get('trade_size', 100)

        logger.info(f"Strategy parameters: fast={self.fast_window}, "
                   f"slow={self.slow_window}")

Data Subscriptions

Bar Data

Subscribe to OHLCV bar data:

def on_start(self, ctx: hf.Context, event: StartEvent):
    # Single symbol, 1-minute bars
    ctx.subscribe_bars(symbols=['AAPL'], asset_type=AssetType.EQUITY, interval='1m')

    # Multiple symbols
    ctx.subscribe_bars(symbols=['AAPL', 'GOOGL', 'MSFT'], asset_type=AssetType.EQUITY, interval='1m')

    # Different intervals
    ctx.subscribe_bars(symbols=['AAPL'], asset_type=AssetType.EQUITY, interval='5m')  # 5-minute
    ctx.subscribe_bars(symbols=['SPY'], asset_type=AssetType.EQUITY, interval='1h')   # 1-hour
    ctx.subscribe_bars(symbols=['SPY'], asset_type=AssetType.EQUITY, interval='1d')   # Daily

Supported Intervals:

  • Seconds: 1s, 5s, 30s
  • Minutes: 1m, 5m, 15m, 30m
  • Hours: 1h, 4h
  • Days: 1d

Custom Data

Subscribe to custom signals and indicators:

def on_start(self, ctx: hf.Context, event: StartEvent):
    # Subscribe to bars
    ctx.subscribe_bars(symbols=['AAPL'], asset_type=AssetType.EQUITY, interval='1m')

    # Subscribe to custom data
    ctx.subscribe_data(data_id='UserSignals')

def on_hiveq_event(self, ctx: hf.Context, event: hf.Event):
    if event.type == EventType.CUSTOM_DATA:
        data = event.data()

        # Access custom fields
        signal = data.signal
        symbol = data.symbol
        value = data.value

        if signal == 'BUY':
            ctx.buy_order(symbol, quantity=100)

Custom Data CSV:

timestamp,symbol,signal,value
2025-08-01 09:30:00,AAPL,BUY,1.5
2025-08-01 09:31:00,AAPL,HOLD,0.8

Futures Data

Subscribe to futures contracts:

def on_start(self, ctx: hf.Context, event: StartEvent):
    # Subscribe to futures contracts
    ctx.subscribe_bars(symbols=["ES.H25"], asset_type=AssetType.FUTURES, interval="1m")

Options Data

Subscribe to options contracts:

def on_start(self, ctx: hf.Context, event: StartEvent):
    ctx.subscribe_bars(
        symbols=[
            "SPY251014C00450000",  # Call
            "SPY251014P00450000",  # Put
        ],
        asset_type=AssetType.OPTIONS,
        interval="1m"
    )

Order Placement

Market Orders

Simplest order type - executes at current market price:

def on_bar(self, ctx: hf.Context, event: BarEvent):
    bar = event.data()

    # Market buy order
    order = ctx.buy_order(symbol='AAPL', quantity=100)
    logger.info(f"Placed market buy order: {order.order_id}")

    # Market sell order
    order = ctx.sell_order(symbol='AAPL', quantity=100)
    logger.info(f"Placed market sell order: {order.order_id}")

Limit Orders

Execute at specified price or better:

def on_bar(self, ctx: hf.Context, event: BarEvent):
    bar = event.data()

    # Buy limit order (buy at $150.50 or lower)
    order = ctx.buy_order(
        symbol='AAPL',
        quantity=100,
        order_type=OrderType.LIMIT,
        limit_price=150.50
    )

    # Sell limit order (sell at $152.00 or higher)
    order = ctx.sell_order(
        symbol='AAPL',
        quantity=100,
        order_type=OrderType.LIMIT,
        limit_price=152.00
    )

Stop Orders

Trigger when price reaches stop level:

def on_bar(self, ctx: hf.Context, event: BarEvent):
    # Buy stop order (buy when price hits $155)
    order = ctx.buy_order(
        symbol='AAPL',
        quantity=100,
        order_type=OrderType.STOP,
        stop_price=155.00
    )

    # Sell stop order (stop loss at $148)
    order = ctx.sell_order(
        symbol='AAPL',
        quantity=100,
        order_type=OrderType.STOP,
        stop_price=148.00
    )

Order Management

Track and cancel orders:

def on_bar(self, ctx: hf.Context, event: BarEvent):
    bar = event.data()

    # Place limit order
    order = ctx.buy_order(
        symbol='AAPL',
        quantity=100,
        order_type=OrderType.LIMIT,
        limit_price=150.00
    )

    # Store order ID
    self.pending_order_id = order.order_id

    # Later, check if still open
    if ctx.has_open_order('AAPL'):
        # Cancel if price moved away
        if bar.close > 152.00:
            success = ctx.cancel_order(self.pending_order_id)
            if success:
                logger.info(f"Canceled order {self.pending_order_id}")

Position-Aware Order Placement

Check position before placing orders:

def on_bar(self, ctx: hf.Context, event: BarEvent):
    bar = event.data()
    symbol = bar.symbol

    # Check current position
    position = ctx.net_position(symbol)

    if ctx.is_flat(symbol):
        # No position - can enter new trade
        ctx.buy_order(symbol, quantity=100)

    elif ctx.is_net_long(symbol):
        # Long position - can exit or add
        if should_exit():
            # Close entire position
            ctx.sell_order(symbol, quantity=position)

    elif ctx.is_net_short(symbol):
        # Short position - can cover
        if should_cover():
            # Cover entire short
            ctx.buy_order(symbol, quantity=abs(position))

    # Check for pending orders
    pending_qty = ctx.open_order_qty(symbol)
    total_exposure = position + pending_qty

    if abs(total_exposure) < 1000:
        # Safe to place more orders
        ctx.buy_order(symbol, quantity=100)

Position Management

Querying Positions

def on_bar(self, ctx: hf.Context, event: BarEvent):
    symbol = 'AAPL'

    # Get net position
    position = ctx.net_position(symbol)
    # Returns: positive=long, negative=short, 0=flat

    # Check position state
    is_flat = ctx.is_flat(symbol)
    is_long = ctx.is_net_long(symbol)
    is_short = ctx.is_net_short(symbol)

    # Get pending order quantity
    pending = ctx.open_order_qty(symbol)

    # Calculate total exposure
    total_exposure = position + pending

    logger.info(f"Position: {position}, Pending: {pending}, "
               f"Total: {total_exposure}")

Portfolio Information

def on_bar(self, ctx: hf.Context, event: BarEvent):
    # Strategy-filtered portfolio
    portfolio = ctx.portfolio()

    # Get positions
    aapl_position = portfolio.net_position('AAPL')

    # Check position state
    if portfolio.is_flat('AAPL'):
        logger.info("No AAPL position")

    if portfolio.is_net_long('AAPL'):
        logger.info("Long AAPL")

    # Get P&L (strategy-filtered)
    total_pnl = portfolio.total_pnl()
    unrealized_pnl = portfolio.unrealized_pnl()
    realized_pnl = portfolio.realized_pnl()

    # Available cash
    balance = portfolio.cash()

    logger.info(f"Balance: {balance}, PnL: {total_pnl}")

Global Portfolio (Multi-Strategy)

def on_bar(self, ctx: hf.Context, event: BarEvent):
    # Account-level portfolio (all strategies)
    global_portfolio = ctx.global_portfolio()

    # Get aggregate position across all strategies
    total_aapl = global_portfolio.net_position('AAPL')

    # Get aggregate P&L
    account_pnl = global_portfolio.total_pnl()

    # Compare strategy vs account
    strategy_pnl = ctx.portfolio().total_pnl()

    logger.info(f"Strategy P&L: {strategy_pnl}")
    logger.info(f"Account P&L: {account_pnl}")

    # Risk management across all strategies
    total_exposure = global_portfolio.net_exposure()
    if total_exposure > 1000000:
        logger.warning("Total exposure exceeds limit!")

Best Practices

1. Initialize in START Event

Always subscribe to data and initialize in START:

def on_start(self, ctx: hf.Context, event: StartEvent):
    # Subscribe to all required data
    ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')

    # Set up timers
    ctx.set_timer('status', pd.Timedelta(minutes=5))

    # Initialize state
    self.initialized = True

2. Handle Missing Data Gracefully

Check for sufficient data before trading:

def on_bar(self, ctx: hf.Context, event: BarEvent):
    bar = event.data()

    # Build price history
    self.prices.append(bar.close)

    # Wait for enough data
    if len(self.prices) < self.window_size:
        logger.debug(f"Building history: {len(self.prices)}/{self.window_size}")
        return

    # Now safe to calculate indicators
    ma = np.mean(self.prices)
    # Trading logic...

3. Use Per-Symbol State for Multi-Symbol Strategies

Don't mix state between symbols:

class MyStrategy:
    def __init__(self):
        # Per-symbol state
        self.symbol_data = {}

    def get_state(self, symbol):
        if symbol not in self.symbol_data:
            self.symbol_data[symbol] = {
                'prices': deque(maxlen=100),
                'indicators': {},
                'position_side': None
            }
        return self.symbol_data[symbol]

    def on_bar(self, ctx: hf.Context, event: BarEvent):
        bar = event.data()

        # Get symbol-specific state
        state = self.get_state(bar.symbol)

        # Update and use state
        state['prices'].append(bar.close)

4. Log Important Events

Use event logging for debugging and analysis:

def on_bar(self, ctx: hf.Context, event: BarEvent):
    bar = event.data()

    # Log signal generation
    if signal_detected:
        ctx.add_event_log(
            message="Bullish signal detected",
            symbol=bar.symbol,
            state_variable={
                'price': bar.close,
                'indicator_value': indicator_value
            }
        )

    # Log trade entry
    if entering_position:
        ctx.add_event_log(
            message="Entering long position",
            symbol=bar.symbol,
            state_variable={'quantity': quantity}
        )
        order = ctx.buy_order(bar.symbol, quantity=quantity)

5. Handle Order Rejections

Always handle potential order failures:

def on_order(self, ctx: hf.Context, event: OrderEvent):
    order = event.data()
    if event.type != EventType.ORDER_REJECTED:
        return

    logger.error(f"Order rejected: {order.order_id}")
    logger.error(f"Symbol: {order.symbol}, Side: {order.side}")

    # Log for analysis
    ctx.add_event_log(
        message="Order rejected",
        symbol=order.symbol,
        state_variable={'order_id': order.order_id}
    )

    # Retry with different parameters or skip
    # ...

6. Avoid Excessive Order Placement

Check for pending orders before placing new ones:

def on_bar(self, ctx: hf.Context, event: BarEvent):
    symbol = 'AAPL'

    # Don't place orders if we already have pending ones
    if ctx.has_open_order(symbol):
        logger.debug(f"Skipping - already have pending orders for {symbol}")
        return

    # Check total exposure
    position = ctx.net_position(symbol)
    pending = ctx.open_order_qty(symbol)
    total = position + pending

    if abs(total) >= self.max_position:
        logger.debug(f"Skipping - at max position: {total}")
        return

    # Safe to place order
    ctx.buy_order(symbol, quantity=100)

7. Use Parameters for Flexibility

Make strategy configurable:

# Strategy definition
class MyStrategy:
    def __init__(self):
        # Defaults
        self.window = 20
        self.threshold = 0.02
        self.trade_size = 100

    def on_start(self, ctx: hf.Context, event: StartEvent):
        # Override from config
        params = ctx.strategy_config.params
        self.window = params.get('window', self.window)
        self.threshold = params.get('threshold', self.threshold)
        self.trade_size = params.get('trade_size', self.trade_size)

# Run with different parameters
strategy_config = StrategyConfig(
    name='MyStrategy',
    type='MyStrategy',
    params={
        'window': 30,
        'threshold': 0.03,
        'trade_size': 200
    }
)

8. Clean Up in STOP Event

Always clean up resources:

def on_stop(self, ctx: hf.Context, event):
    # Cancel any pending timers
    ctx.cancel_timer('status_report')

    # Log final statistics
    logger.info("="*60)
    logger.info("Strategy Completed")
    logger.info(f"Total trades: {self.trade_count}")
    logger.info(f"Final P&L: {ctx.portfolio().total_pnl()}")
    logger.info("="*60)

    # Close any remaining positions if needed
    for symbol in ctx.strategy_config.symbols:
        if not ctx.is_flat(symbol):
            position = ctx.net_position(symbol)
            logger.warning(f"Closing remaining position: {symbol} = {position}")
            if position > 0:
                ctx.sell_order(symbol, quantity=position)
            else:
                ctx.buy_order(symbol, quantity=abs(position))

Complete Example

Putting it all together:

import numpy as np
import hiveq.flow as hf
from hiveq.flow import Context, StrategyConfig
from hiveq.flow.config import EventType
from hiveq.flow.logger import logger
from collections import deque
from typing import Dict

logger = logger()


class ProductionReadyStrategy:
    """
    Production-ready strategy template with best practices.
    """

    def __init__(self):
        """Initialize strategy with defaults."""
        # Parameters (overridden from config)
        self.window = 20
        self.trade_size = 100
        self.max_position = 1000

        # State
        self.symbol_data: Dict[str, dict] = {}
        self.trade_count = 0
        self.initialized = False

    def get_symbol_data(self, symbol: str) -> dict:
        """Get or create per-symbol data structure."""
        if symbol not in self.symbol_data:
            self.symbol_data[symbol] = {
                'prices': deque(maxlen=self.window),
                'indicators': {},
                'last_signal': None
            }
        return self.symbol_data[symbol]

    def on_start(self, ctx: Context, event):
        """Initialize strategy."""
        logger.info("="*60)
        logger.info("Strategy Starting")
        logger.info("="*60)

        # Get symbols
        symbols = ctx.strategy_config.symbols
        logger.info(f"Trading symbols: {symbols}")

        # Subscribe to data
        ctx.subscribe_bars(symbols=symbols, asset_type=AssetType.EQUITY, interval='1m')

        # Set up timer
        ctx.set_timer('status_report', pd.Timedelta(minutes=5))

        # Load parameters from config
        params = ctx.strategy_config.params
        self.window = params.get('window', self.window)
        self.trade_size = params.get('trade_size', self.trade_size)
        self.max_position = params.get('max_position', self.max_position)

        logger.info(f"Parameters: window={self.window}, "
                   f"trade_size={self.trade_size}")

        self.initialized = True

    def on_bar(self, ctx: Context, event):
        """Process bar data and execute strategy logic."""
        if not self.initialized:
            return

        bar = event.data()
        symbol = bar.symbol

        # Get symbol-specific data
        data = self.get_symbol_data(symbol)

        # Update price history
        data['prices'].append(bar.close)

        # Wait for enough data
        if len(data['prices']) < self.window:
            return

        # Calculate indicator
        ma = np.mean(data['prices'])
        data['indicators']['ma'] = ma

        # Generate signals
        signal = self._generate_signal(bar, ma)

        # Execute trades
        self._execute_signal(ctx, symbol, signal, bar.close)

    def _generate_signal(self, bar, ma):
        """Generate trading signal."""
        if bar.close > ma * 1.01:  # 1% above MA
            return 'BUY'
        elif bar.close < ma * 0.99:  # 1% below MA
            return 'SELL'
        return 'HOLD'

    def _execute_signal(self, ctx: Context, symbol: str, signal: str, price: float):
        """Execute trading signal."""
        # Skip if pending orders
        if ctx.has_open_order(symbol):
            return

        # Check position
        position = ctx.net_position(symbol)
        pending = ctx.open_order_qty(symbol)
        total_exposure = position + pending

        # BUY signal
        if signal == 'BUY' and ctx.is_flat(symbol):
            if abs(total_exposure + self.trade_size) <= self.max_position:
                order = ctx.buy_order(symbol, quantity=self.trade_size)

                ctx.add_event_log(
                    message=f"BUY signal executed",
                    symbol=symbol,
                    state_variable={'price': price, 'order_id': order.order_id}
                )

        # SELL signal
        elif signal == 'SELL' and ctx.is_net_long(symbol):
            order = ctx.sell_order(symbol, quantity=position)

            ctx.add_event_log(
                message=f"SELL signal executed",
                symbol=symbol,
                state_variable={'price': price, 'order_id': order.order_id}
            )

    def on_order(self, ctx: Context, event):
        """Handle order events (filled, rejected, etc.)."""
        order = event.data()

        self.trade_count += 1

        logger.info(f"Order filled: {order.symbol} {order.side} "
                   f"{order.filled_qty} @ {order.avg_px}")

    def on_timer(self, ctx: Context, event):
        """Periodic status report."""
        timer = event.data()

        if timer.timer_id == 'status_report':
            portfolio = ctx.portfolio()
            logger.info(f"Status: Trades={self.trade_count}, "
                       f"P&L={portfolio.total_pnl():.2f}")

    def on_stop(self, ctx: Context, event):
        """Clean up strategy."""
        logger.info("="*60)
        logger.info("Strategy Stopped")
        logger.info(f"Total trades: {self.trade_count}")
        logger.info(f"Final P&L: {ctx.portfolio().total_pnl()}")
        logger.info("="*60)

        # Cancel timers
        ctx.cancel_timer('status_report')


if __name__ == '__main__':
    # Configure and run
    strategy_config = StrategyConfig(
        name='ProductionStrategy',
        type='ProductionReadyStrategy',
        params={
            'window': 30,
            'trade_size': 100,
            'max_position': 1000
        }
    )

    report = hf.run_backtest(
        strategy_configs=[strategy_config],
        symbols=['AAPL'],
        start_date='2025-08-01',
        end_date='2025-08-30',
        data_configs=[{
            'type': 'hiveq_historical',
            'dataset': 'HIVEQ_US_EQ',
            'schema': ['bars_1m'],
        }]
    )

    # Print results
    print(results.return_stats.to_string())
    print(hf.event_logs().to_string())

This template includes all best practices and is ready for production use.

On this page