HiveQ Docs
HiveQ Flow

Examples

HiveQ Flow Examples

Comprehensive guide with working examples for all use cases.

Table of Contents


Basic Strategy Examples

Simple Moving Average Crossover

Complete SMA crossover strategy with position management.

Source: examples/sma_strategy.py

from typing import Dict
import numpy as np
import hiveq.flow as hf
from hiveq.flow.config import EventType, AssetType
from hiveq.flow import StrategyConfig
from collections import deque
from hiveq.flow.logger import logger

logger = logger()


class SymbolInfo:
    """Track per-symbol data including prices and position info."""

    def __init__(self, symbol: str, fast_window: int, slow_window: int):
        self.symbol = symbol
        self.fast_window = fast_window
        self.slow_window = slow_window

        # Price buffers for SMA calculation
        self.fast_prices = deque(maxlen=fast_window)
        self.slow_prices = deque(maxlen=slow_window)

        # SMA values
        self.fast_sma = None
        self.slow_sma = None

    def update_prices(self, price: float):
        """Update price buffers with new price."""
        self.fast_prices.append(price)
        self.slow_prices.append(price)

        # Calculate SMAs if we have enough data
        if len(self.slow_prices) >= self.slow_window:
            self.fast_sma = np.mean(self.fast_prices)
            self.slow_sma = np.mean(self.slow_prices)
            return True
        return False

    def ready_for_trading(self) -> bool:
        """Check if we have enough data to calculate SMAs."""
        return len(self.slow_prices) >= self.slow_window


class SMACrossoverStrategy:
    def __init__(self):
        self.fast_window = 10
        self.slow_window = 30
        self.trade_size = 100

        # Dictionary to track info per symbol
        self.symbols: Dict[str, SymbolInfo] = {}

    def on_hiveq_event(self, ctx: hf.Context, event):
        """
        SMA Crossover Strategy with Multi-Symbol Support.
        Buys when fast SMA crosses above slow SMA (golden cross).
        Sells when fast SMA crosses below slow SMA (death cross).
        """

        if event.type == EventType.START:
            # Subscribe to bars for all symbols
            symbols = ctx.strategy_config.symbols
            ctx.subscribe_bars(symbols=symbols, asset_type=AssetType.EQUITY, interval='1m')

            # Initialize SymbolInfo for each symbol
            for symbol in symbols:
                self.symbols[symbol] = SymbolInfo(
                    symbol, self.fast_window, self.slow_window
                )
                logger.info(f"Initialized tracking for {symbol}")

        elif event.type == EventType.BAR:
            bar = event.data()
            symbol = bar.symbol
            close_price = bar.close

            # Get or create symbol info
            if symbol not in self.symbols:
                self.symbols[symbol] = SymbolInfo(
                    symbol, self.fast_window, self.slow_window
                )

            symbol_info = self.symbols[symbol]

            # Update prices and calculate SMAs
            smas_ready = symbol_info.update_prices(close_price)

            # Check if we have enough data
            if not smas_ready:
                return

            fast_sma = symbol_info.fast_sma
            slow_sma = symbol_info.slow_sma

            # Get current position
            net_pos = ctx.net_position(symbol)

            # Golden Cross: Fast SMA > Slow SMA (bullish)
            if fast_sma > slow_sma:
                if net_pos == 0.0:
                    # Open long position
                    logger.info(
                        f"{symbol}: GOLDEN CROSS - Buying {self.trade_size} "
                        f"shares at ~{close_price:.2f}"
                    )
                    ctx.buy_order(symbol=symbol, quantity=self.trade_size)

                elif net_pos < 0:
                    # Close short position
                    logger.info(
                        f"{symbol}: GOLDEN CROSS - Closing short position "
                        f"of {abs(net_pos)} shares"
                    )
                    ctx.buy_order(symbol=symbol, quantity=abs(net_pos))

            # Death Cross: Fast SMA < Slow SMA (bearish)
            elif fast_sma < slow_sma:
                if net_pos == 0.0:
                    # Open short position
                    logger.info(
                        f"{symbol}: DEATH CROSS - Selling {self.trade_size} "
                        f"shares at ~{close_price:.2f}"
                    )
                    ctx.sell_order(symbol=symbol, quantity=self.trade_size)

                elif net_pos > 0:
                    # Close long position
                    logger.info(
                        f"{symbol}: DEATH CROSS - Closing long position "
                        f"of {net_pos} shares"
                    )
                    ctx.sell_order(symbol=symbol, quantity=int(net_pos))


if __name__ == '__main__':
    # Configure strategy
    strategy_configs = [
        StrategyConfig(
            name='SMACross',
            type='SMACrossoverStrategy',
            params={'fast_window': 10, 'slow_window': 30}
        )
    ]

    # Configure backtest
    from hiveq.flow.config import BacktestConfig

    backtest_config = BacktestConfig(
        initial_capital=1000000.0,
        start_date='2025-09-19',
        end_date='2025-09-25'
    )

    # Run backtest
    report = hf.run_backtest(
        strategy_configs=strategy_configs,
        symbols=['AAPL'],
        data_configs=[{
            'type': 'hiveq_historical',
            'dataset': 'HIVEQ_US_EQ',
            'schema': ['bars_1m'],
        }],
        backtest_config=backtest_config
    )

    # Print results
    event_df = hf.event_logs()
    print(event_df.to_string())

Key Concepts:

  • Multi-symbol support with per-symbol state tracking
  • SMA calculation using rolling windows
  • Position state management
  • Golden cross (bullish) and death cross (bearish) signals
  • Proper initialization and cleanup

Strategy Patterns

Pattern 1: Single Event Handler

Use a single on_hiveq_event() method to handle all events.

Source: examples/global_function_strategy.py

import hiveq.flow as hf
from hiveq.flow.config import EventType


def on_hiveq_event(ctx: hf.Context, event: hf.Event):
    """
    Global function strategy.
    Simple buy and hold strategy.
    """
    if event.type == EventType.START:
        ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')
        print("Strategy started!")

    elif event.type == EventType.BAR:
        bar = event.data()
        symbol = bar.symbol

        # Buy on first bar if flat
        position = ctx.portfolio().net_position(symbol)
        if position == 0:
            ctx.buy_order(symbol=symbol, quantity=100)
            print(f"Bought 100 shares of {symbol} at {bar.close}")

    elif event.type == EventType.ORDER_FILLED:
        print("Order filled!")


if __name__ == '__main__':
    report = hf.run_backtest(
        strategy_configs=[],  # Empty - uses global function
        symbols=['AAPL'],
        start_date='2025-08-01',
        end_date='2025-08-02',
        data_configs=[{
            'type': 'csv',
            'data_type': 'bars_1m',
            'id': '1_MIN_BAR',
            'path': 'bars/AAPL_bars.csv'
        }]
    )

Advantages:

  • Simple, flat structure
  • Easy to understand event flow
  • Good for small strategies

Pattern 2: Individual Callback Methods

Use separate callback methods for each event type.

Source: examples/sma_strategy_event_callbacks.py

from collections import deque
import numpy as np
import pandas as pd

import hiveq.flow as hf
from hiveq.flow.events import StartEvent, BarEvent, TimerEvent
from hiveq.flow import Context, StrategyConfig
from hiveq.flow.logger import logger

logger = logger()


class SMAEventCallbackStrategy:
    """SMA Crossover Strategy with per-event callbacks."""

    def __init__(self):
        """Initialize strategy state and parameters."""
        self.fast_window: int = 10
        self.slow_window: int = 30
        self.trade_size: int = 100

        # Price buffers
        self.fast_prices: deque = deque(maxlen=self.fast_window)
        self.slow_prices: deque = deque(maxlen=self.slow_window)

        logger.info(f"Initialized SMA Strategy - Fast: {self.fast_window}, "
                   f"Slow: {self.slow_window}")

    def on_start(self, ctx: Context, event: StartEvent) -> None:
        """Called when the strategy starts."""
        logger.info("=== Strategy Starting ===")

        # Subscribe to bar data
        symbols = ctx.strategy_config.symbols
        logger.info(f"Subscribing to 1-minute bars for: {symbols}")
        ctx.subscribe_bars(symbols=symbols, asset_type=AssetType.EQUITY, interval='1m')

        # Set up periodic timer
        ctx.set_timer('status_report', pd.Timedelta(minutes=5))

        # Override parameters from config
        if ctx.strategy_config.params:
            self.fast_window = ctx.strategy_config.params.get(
                'fast_window', self.fast_window
            )
            self.slow_window = ctx.strategy_config.params.get(
                'slow_window', self.slow_window
            )
            self.trade_size = ctx.strategy_config.params.get(
                'trade_size', self.trade_size
            )

            # Reinitialize deques
            self.fast_prices = deque(maxlen=self.fast_window)
            self.slow_prices = deque(maxlen=self.slow_window)

    def on_bar(self, ctx: Context, event: BarEvent) -> None:
        """Called for each bar."""
        bar = event.data()
        symbol = bar.symbol
        close_price = bar.close

        # Update price buffers
        self.fast_prices.append(close_price)
        self.slow_prices.append(close_price)

        # Check if we have enough data
        if len(self.slow_prices) < self.slow_window:
            return

        # Calculate SMAs
        fast_sma = np.mean(self.fast_prices)
        slow_sma = np.mean(self.slow_prices)

        # Generate trading signals
        self._check_trading_signals(ctx, symbol, fast_sma, slow_sma, close_price)

    def on_timer(self, ctx: Context, event: TimerEvent) -> None:
        """Called when timer fires."""
        timer = event.data()
        logger.info(f'Timer fired: {timer.timer_id}')

        # Access last bar
        inst = ctx.instrument('AAPL')
        if inst.last_bar:
            print(f"Last bar open: {inst.last_bar.open}")

    def _check_trading_signals(self, ctx: Context, symbol: str,
                               fast_sma: float, slow_sma: float,
                               current_price: float) -> None:
        """Check and execute trading signals."""
        net_pos = ctx.net_position(symbol)

        # Golden Cross: Fast > Slow (bullish)
        if fast_sma > slow_sma:
            if net_pos <= 0:
                logger.info(f"GOLDEN CROSS - Buying {self.trade_size} shares")
                ctx.buy_order(symbol=symbol, quantity=self.trade_size)

        # Death Cross: Fast < Slow (bearish)
        elif fast_sma < slow_sma:
            if net_pos > 0:
                logger.info(f"DEATH CROSS - Selling {net_pos} shares")
                ctx.sell_order(symbol=symbol, quantity=int(net_pos))


if __name__ == '__main__':
    # Configure strategy
    strategy_configs = [
        StrategyConfig(
            name='SMAEventCallbacks',
            type='SMAEventCallbackStrategy',
            params={
                'fast_window': 10,
                'slow_window': 30,
                'trade_size': 100
            }
        )
    ]

    # Run backtest
    report = hf.run_backtest(
        strategy_configs=strategy_configs,
        symbols=['AAPL'],
        start_date='2025-08-01',
        end_date='2025-08-02',
        data_configs=[{
            'type': 'csv',
            'data_type': 'bars_1m',
            'id': '1_MIN_BAR',
            'path': 'bars/AAPL_bars.csv'
        }]
    )

Advantages:

  • Type hints for each event type
  • Better IDE autocomplete
  • Cleaner separation of concerns
  • Easier to test individual methods

Pattern 3: Jupyter/Notebook Strategy

Define strategy class inline (useful for Jupyter notebooks).

Source: examples/notebook_simulation_class.py

import numpy as np
import hiveq.flow as hf
from hiveq.flow.config import EventType
from hiveq.flow import StrategyConfig
from collections import deque


class NotebookStrategy:
    def __init__(self):
        self.fast_window = 5
        self.slow_window = 20
        self.fast_prices = deque(maxlen=self.fast_window)
        self.slow_prices = deque(maxlen=self.slow_window)

    def on_hiveq_event(self, ctx: hf.Context, event):
        """Jupyter-defined strategy class."""
        if event.type == EventType.START:
            ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')

        elif event.type == EventType.BAR:
            bar = event.data()
            symbol = bar.symbol
            self.fast_prices.append(bar.close)
            self.slow_prices.append(bar.close)

            if len(self.slow_prices) < self.slow_window:
                return

            fast_ma = np.mean(self.fast_prices)
            slow_ma = np.mean(self.slow_prices)

            # Crossover logic
            position = ctx.portfolio().net_position(symbol)
            if fast_ma > slow_ma and position <= 0:
                ctx.buy_order(symbol=symbol, quantity=50)
                print(f"Golden cross! Bought 50 shares at {bar.close}")
            elif fast_ma < slow_ma and position >= 0:
                ctx.sell_order(symbol=symbol, quantity=50)
                print(f"Death cross! Sold 50 shares at {bar.close}")

        elif event.type == EventType.ORDER_FILLED:
            print("Order filled!")


# Jupyter cell execution
if __name__ == '__main__':
    strategy_configs = [
        StrategyConfig(
            name='NotebookStrategy',
            type='NotebookStrategy',
            params={'fast_window': 5, 'slow_window': 20}
        )
    ]

    report = hf.run_backtest(
        strategy_configs=strategy_configs,
        symbols=['AAPL'],
        start_date='2025-08-01',
        end_date='2025-08-02',
        data_configs=[{
            'type': 'csv',
            'data_type': 'bars_1m',
            'id': '1_MIN_BAR',
            'path': 'bars/AAPL_bars.csv'
        }]
    )

Advantages:

  • Quick prototyping in Jupyter
  • No separate file needed
  • Immediate feedback
  • Easy parameter tuning

Data Integration

Custom Data Integration

Integrate custom signals or data with market data.

Source: examples/custom_data.py

import hiveq.flow as hf
from hiveq.flow.config import EventType


class CustomDataStrategy:

    def on_hiveq_event(self, ctx: hf.Context, event):
        """Strategy that uses custom data signals."""

        if event.type == EventType.START:
            # Subscribe to both bars and custom data
            ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')
            ctx.subscribe_data(data_id=ctx.strategy_config.user_data_id)

        elif event.type == EventType.CUSTOM_DATA:
            # Process custom data event
            data = event.data()
            hf.logger().info(f"Received custom data: {data.row}")

            # Custom data can contain signals, indicators, etc.
            # Access fields from your CSV: data.signal, data.indicator, etc.


from hiveq.flow import StrategyConfig

# Configure strategy
strategy_configs = [
    StrategyConfig(
        name='CustomDataTest',
        type='CustomDataStrategy',
        params={'user_data_id': 'UserDataTest'}
    )
]

# Run with both market data and custom data
report = hf.run_backtest(
    strategy_configs=strategy_configs,
    symbols=['AAPL'],
    start_date='2025-08-01',
    end_date='2025-08-02',
    data_configs=[
        {
            'type': 'hiveq_historical',
            'dataset': 'HIVEQ_US_EQ',
            'schema': ['bars_1m'],
        },
        {
            'type': 'csv',
            'data_type': 'custom',
            'path': 'userdata/user_data.csv',
            'id': 'UserDataTest'  # Must match subscribe_data() ID
        }
    ]
)

Custom Data CSV Format:

timestamp,symbol,signal,value
2025-08-01 09:30:00,AAPL,BUY,1.5
2025-08-01 09:31:00,AAPL,HOLD,0.8
2025-08-01 09:32:00,AAPL,SELL,-1.2

Key Points:

  • Custom data synchronized with bar data by timestamp
  • Access via EventType.CUSTOM_DATA events
  • Subscribe using ctx.subscribe_data(data_id='your_id')
  • Data ID must match the 'id' field in data_config

Pre-Built Strategy with Custom Data

Using built-in strategies with custom signals.

Source: examples/pre_built_strategy.py

import hiveq.flow as hf
from hiveq.flow import StrategyConfig

strategy_configs = [
    StrategyConfig(
        name='TestUserSignal',
        type='UserSignalStrategy',  # Pre-built strategy
        params={
            'user_data_list': ['UserDataTest'],
            'universe_file': 'universe/univ.csv',
            'mm': 1.0,
            'enable_holding': False,
            'mm_reset': 1.0,
            'entry_begin_time': "00:00:01",
            'entry_end_time': "15:49:00",
            'exit_begin_time': "15:49:30",
            'exit_end_time': "16:00:00",
            'stop_loss_bps': 0
        }
    )
]

report = hf.run_backtest(
    strategy_configs=strategy_configs,
    symbols=['AAPL'],
    start_date='2025-08-01',
    end_date='2025-08-30',
    data_configs=[
        {
            'type': 'hiveq_historical',
            'dataset': 'HIVEQ_US_EQ',
            'schema': ['bars_1m'],
        },
        {
            'type': 'csv',
            'data_type': 'custom',
            'path': 'userdata/user_data.csv',
            'id': 'UserDataTest'
        }
    ]
)

# Access results
print(report.return_stats.to_string())
print(report.run_info.to_string())
print(report.positions.to_string())
print(report.fills.to_string())

Order Management

MOO/MOC Orders (Market On Open/Close)

Special order types for market open and close execution.

Source: examples/moo_moc_strategy.py

import hiveq.flow as hf
from hiveq.flow.config import EventType
from datetime import time


class StrategyState:
    def __init__(self):
        self.moo_order_placed = False
        self.moo_order_filled = False
        self.moc_order_placed = False
        self.moc_order_filled = False
        self.moo_order_id = None
        self.moc_order_id = None


state = StrategyState()


def get_eastern_time(bar):
    """Convert bar timestamp to Eastern Time."""
    import pandas as pd

    ts_utc = pd.Timestamp(bar.ts_event, unit='ns', tz='UTC')
    ts_eastern = ts_utc.tz_convert('America/New_York')
    return ts_eastern.time(), ts_eastern


def on_hiveq_event(ctx: hf.Context, event: hf.Event):
    """Strategy testing MOO and MOC orders."""
    global state

    if event.type == EventType.START:
        ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')
        print("MOO/MOC Strategy started!")

    elif event.type == EventType.BAR:
        bar = event.data()
        symbol = bar.symbol

        eastern_time, ts_eastern = get_eastern_time(bar)

        # MOO Order: Place at 8:00 AM EST (before 9:30 AM open)
        if not state.moo_order_placed:
            if eastern_time >= time(8, 0, 0) and eastern_time < time(8, 1, 0):
                print(f"[{ts_eastern}] Placing MOO order")

                # Use MOO order type
                order = ctx.buy_order(symbol, quantity=100, order_type=OrderType.MOO)
                state.moo_order_id = order.order_id
                state.moo_order_placed = True

                ctx.add_event_log(
                    message=f"MOO order placed at {eastern_time}",
                    symbol=symbol,
                    state_variable={"order_id": state.moo_order_id}
                )

        # MOC Order: Place at 3:55 PM EST (before 4:00 PM close)
        if state.moo_order_filled and not state.moc_order_placed:
            if eastern_time >= time(15, 55, 0) and eastern_time < time(15, 56, 0):
                position = ctx.net_position(symbol)
                if position > 0:
                    print(f"[{ts_eastern}] Placing MOC order")

                    # Use MOC order type
                    order = ctx.sell_order(symbol, quantity=position, order_type=OrderType.MOC)
                    state.moc_order_id = order.order_id
                    state.moc_order_placed = True

                    ctx.add_event_log(
                        message=f"MOC order placed at {eastern_time}",
                        symbol=symbol,
                        state_variable={"order_id": state.moc_order_id}
                    )

    elif event.type == EventType.ORDER_FILLED:
        order = event.data()

        if order.order_id == state.moo_order_id:
            state.moo_order_filled = True
            print(f"MOO ORDER FILLED - Price: {order.avg_px}")

        if order.order_id == state.moc_order_id:
            state.moc_order_filled = True
            print(f"MOC ORDER FILLED - Price: {order.avg_px}")


# MOO and MOC orders are supported via the order_type parameter:
#   ctx.buy_order(symbol, quantity, order_type=OrderType.MOO)
#   ctx.sell_order(symbol, quantity, order_type=OrderType.MOC)


if __name__ == '__main__':
    report = hf.run_backtest(
        strategy_configs=[],  # Uses global function
        symbols=['AAPL'],
        start_date='2025-08-01',
        end_date='2025-08-02',
        data_configs=[{
            'type': 'hiveq_historical',
            'dataset': 'HIVEQ_US_EQ',
            'schema': ['bars_1m'],
        }]
    )

    print(hf.event_logs().to_string())

Key Points:

  • MOO orders placed before market open (9:30 AM EST)
  • MOC orders placed before cutoff (typically 3:55 PM EST)
  • Use TimeInForce.AT_THE_OPEN for MOO
  • Use TimeInForce.AT_THE_CLOSE for MOC
  • Orders fill at opening/closing auction prices

Asset Types

Futures Trading

Subscribe and trade futures contracts.

Source: examples/subscribe_futures.py

import hiveq.flow as hf
from hiveq.flow import Context, StrategyConfig
from hiveq.flow.config import AssetType


class TestFuturesStrategy:
    def __init__(self):
        self.bar_count = 0

    def on_start(self, ctx: Context, event):
        """Initialize strategy and subscribe to futures."""
        print("="*80)
        print("STRATEGY STARTED - Subscribing to ES.1C (Front Month)")
        print("="*80)

        # Subscribe to futures contract
        ctx.subscribe_bars(symbols=["ES.H25"], asset_type=AssetType.FUTURES, interval="1m")

        ctx.add_event_log(
            message="Subscribed to ES.1C continuous futures",
            symbol="ES.1C"
        )

    def on_bar(self, ctx: Context, event):
        """Handle bar events."""
        bar = event.bar

        print(f"[BAR] {bar.symbol} | Time: {bar.ts_event} | "
              f"O: {bar.open:.2f} H: {bar.high:.2f} "
              f"L: {bar.low:.2f} C: {bar.close:.2f} | V: {bar.volume}")

        # Place order on first bar
        symbol = bar.symbol
        if ctx.is_flat(symbol) and self.bar_count == 0:
            print(f"Placing BUY order for {symbol}")
            order = ctx.buy_order(symbol=symbol, quantity=1)
            print(f"Order ID: {order.order_id}")

        self.bar_count += 1

    def on_stop(self, ctx: Context, event):
        """Strategy cleanup."""
        print("="*80)
        print("STRATEGY STOPPED")
        print("="*80)


if __name__ == "__main__":
    # Configure strategy
    strategy_config = StrategyConfig(
        name='test_futures_strategy',
        type='TestFuturesStrategy'
    )

    # Run backtest
    report = hf.run_backtest(
        strategy_configs=[strategy_config],
        symbols=['ES.1C'],  # Continuous contract
        data_configs=[{
            'type': 'csv',
            'data_type': 'bars_1m',
            'id': 'test_bars',
            'path': 'bars/test_rollover_bars.csv'
        }],
        start_date='2025-03-13',
        end_date='2025-03-15'
    )

    # Display results
    print("="*80)
    print("BACKTEST RESULTS")
    print("="*80)
    if results.run_info is not None:
        print(results.run_info.to_string())

    print("Event Logs:")
    print(hf.event_logs().to_string())
    print(results.fills.to_string())

Options Trading

Subscribe and trade options contracts.

Source: examples/subscribe_options.py

import hiveq.flow as hf
from hiveq.flow.config import AssetType, EventType


class TestOptionsStrategy:
    """Simple test strategy for options data."""

    def on_hiveq_event(self, ctx: hf.Context, event):
        """Handle strategy events."""

        if event.type == EventType.START:
            print("="*60)
            print("Starting Test Options Strategy")
            print("="*60)

            # Subscribe to option symbols
            ctx.subscribe_bars(
                symbols=ctx.strategy_config.symbols,
                asset_type=AssetType.OPTIONS,
                interval='1m'
            )

            self.bar_count = 0
            self.symbols_seen = set()

        elif event.type == EventType.BAR:
            bar = event.data()
            symbol = bar.symbol

            # Place order on first bar
            if ctx.is_flat(symbol) and self.bar_count == 0:
                print(f"Placing BUY order for {symbol}")
                print(f"  Price: {bar.close}")
                order = ctx.buy_order(symbol=symbol, quantity=1)
                print(f"  Order ID: {order.order_id}")

            self.bar_count += 1
            self.symbols_seen.add(symbol)

        elif event.type == EventType.ORDER_FILLED:
            order = event.data()
            print(f"** ORDER FILLED **")
            print(f"  Symbol: {order.symbol}")
            print(f"  Side: {order.side}")
            print(f"  Quantity: {order.filled_qty}")
            print(f"  Fill Price: {order.avg_px}")

        elif event.type == EventType.STOP:
            print("="*60)
            print("Test Options Strategy Complete")
            print(f"Total bars: {self.bar_count}")
            print(f"Unique symbols: {len(self.symbols_seen)}")
            print("="*60)


def main():
    """Run the test options backtest."""

    # Configure strategy
    strategy_config = hf.StrategyConfig(
        name='TestOptionsStrategy',
        type='TestOptionsStrategy',
        params={}
    )

    # Run backtest
    report = hf.run_backtest(
        strategy_configs=[strategy_config],
        symbols=[
            "SPXW251014C00450000"  # SPX Weekly Call
        ],
        start_date='2025-10-14',
        end_date='2025-10-15',
        data_configs=[{
            'type': 'csv',
            'data_type': 'bars_1m',
            'path': 'bars/test_options_bars.csv',
            'id': 'test_options_bars'
        }]
    )

    print(hf.event_logs().to_string())
    print(report.fills.to_string())


if __name__ == "__main__":
    main()

Option Symbol Format:

  • Format: UNDERLYING + EXPIRY + TYPE + STRIKE
  • Example: SPXW251014C00450000
    • SPXW: Underlying (SPX Weekly)
    • 251014: Expiry (Oct 14, 2025 → YYMMDD)
    • C: Call (or P for Put)
    • 00450000: Strike price ($450.00 with padding)

Options Trading with HiveQ Historical Data

Trade options using HiveQ historical options dataset.

Important: Use HIVEQ_US_OPT dataset for options trading, not HIVEQ_US_EQ.

import hiveq.flow as hf
from hiveq.flow import StrategyConfig
from hiveq.flow.config import EventType


class SimpleOptionsStrategy:
    """Trade SPY call options based on momentum."""

    def __init__(self):
        self.call_option = 'SPY251114C00600000'  # SPY Nov 14 $600 Call
        self.entry_threshold = 0.005  # 0.5% move
        self.open_price = None

    def on_hiveq_event(self, ctx, event):
        if event.type == EventType.START:
            # Subscribe to underlying and option
            ctx.subscribe_bars(symbols=['SPY'], asset_type=AssetType.EQUITY, interval='1m')
            ctx.subscribe_bars(
                symbols=[self.call_option],
                asset_type=AssetType.OPTIONS,
                interval='1m'
            )

        elif event.type == EventType.BAR:
            bar = event.data()

            # Track underlying price
            if bar.symbol == 'SPY':
                if self.open_price is None:
                    self.open_price = bar.close

                # Check for entry signal
                price_change = (bar.close - self.open_price) / self.open_price
                if price_change >= self.entry_threshold:
                    if ctx.is_flat(self.call_option):
                        ctx.buy_order(symbol=self.call_option, quantity=1)


# Run backtest with HIVEQ_US_OPT dataset
report = hf.run_backtest(
    strategy_configs=[
        StrategyConfig(
            name='SimpleOptions',
            type='SimpleOptionsStrategy',
            params={}
        )
    ],
    symbols=['SPY', 'SPY251114C00600000'],
    start_date='2025-09-01',
    end_date='2025-09-05',
    data_configs=[{
        'type': 'hiveq_historical',
        'dataset': 'HIVEQ_US_OPT',  # Use HIVEQ_US_OPT for options!
        'schema': ['bars_1m']
    }]
)

print(report.return_stats)

Key Points:

  • Dataset: Always use HIVEQ_US_OPT for options trading
  • Symbols: Include both underlying ('SPY') and options ('SPY251114C00600000')
  • Subscribe: Use subscribe_bars() with asset_type=AssetType.OPTIONS for options symbols
  • Monitoring: Subscribe to underlying to generate entry/exit signals

Common Datasets:

  • HIVEQ_US_EQ → For equities (stocks) only
  • HIVEQ_US_OPT → For options trading
  • HIVEQ_US_FUT → For futures trading

Live Trading

Live Trading with Databento

Run strategies in live mode with real-time data.

Source: examples/live_example.py

import hiveq.flow as hf
from hiveq.flow import StrategyConfig


# Stop with SIGINT/CTRL+C
if __name__ == "__main__":
    strategy_configs = [
        StrategyConfig(
            name='SMALive',
            type='SMACrossoverStrategy'
        )
    ]

    # Run live (blocking call)
    hf.run_live(
        strategy_configs=strategy_configs,
        symbols=['AAPL'],
        data_configs=[{
            'type': 'databento',
            'id': 'databento_client',
            'api_key': 'db-kirXAdmRKUqxrqBH7vvrv5WL5777m',
            'venue_dataset_map': {"SIM": "EQUS.MINI"}
        }]
    )

Key Differences from Backtest:

  • Uses run_live() instead of run_backtest()
  • Requires live data provider (Databento, Interactive Brokers, etc.)
  • Blocking call - runs until interrupted
  • Real-time event processing
  • Use Ctrl+C to stop

Live Data Config:

{
    'type': 'databento',  # Live data provider
    'id': 'databento_client',
    'api_key': 'your_api_key',
    'venue_dataset_map': {
        'SIM': 'EQUS.MINI',     # Simulated equities
        'XCME': 'GLBX.MDP3'     # CME futures
    }
}

Performance Reporting

Accessing Results

import hiveq.flow as hf

# Run backtest
report = hf.run_backtest(...)

# Return statistics
print(results.return_stats.to_string())
# Shows: total return, annualized return, Sharpe ratio, etc.

# Run information
print(results.run_info.to_string())
# Shows: dates, symbols, capital, commission, etc.

# Positions
print(results.positions.to_string())
# Shows: all positions opened/closed with P&L

# Fills
print(results.fills.to_string())
# Shows: all order fills with execution details

# Event logs
event_df = hf.event_logs()
print(event_df.to_string())
# Shows: all logged events

# Create tearsheet (for Marimo/Jupyter)
import marimo as mo
mo.md(results.create_tearsheet())

Summary

All examples demonstrate: 2. Configuration: Use StrategyConfig to configure strategies 3. Data Sources: Support for CSV, HiveQ historical, and live data 4. Event Handling: Multiple patterns (single handler, callbacks, inline) 5. Order Management: Market, limit, stop, MOO, MOC orders 6. Position Tracking: Real-time position and P&L tracking 7. Multi-Asset: Equities, futures, options support 8. Custom Data: Integration of custom signals and data 9. Performance: Comprehensive reporting and analytics

All source files available in /examples/ directory.

On this page