HiveQ Docs
HiveQ Flow

HiveQ Flow v0.2.1

HiveQ Flow is a Python framework that provides an intuitive interface for developing, testing, and deploying quantitative trading strategies. Whether you're backtesting ideas or running live strategies, HiveQ Flow simplifies the process with a clean, powerful API.

Quickstart

Setup HiveQ Flow

Install HiveQ Flow from the distribution / release directory and install the wheel package

pip install hiveq-flow

Run your first example

Use one of our prebuilt strategy, passing in config values

import hiveq.flow as hf
from hiveq.flow import StrategyConfig
# Name is the instance name, and type is the class name of the strategy
strategy_configs = [StrategyConfig(name='TestUserSignal', type='UserSignalStrategy')]
# run the backtest
performance_report = hf.run_backtest(strategy_configs=strategy_configs, symbols=['AAPL'],
                          data_configs=[{'type': 'csv', 'data_type': 'bars_1m', 'id': '1_MIN_BAR',
                                        'path': '/bars/AAPL_bars.csv'}
                                       ])

Examples and tutorials

Data Configuration

The data_configs parameter defines the input datasets used to run a HiveQ backtest flow. It is a list of dictionaries, where each dictionary specifies one dataset to be loaded.

Example

data_configs = [
    {
        "type": "csv",
        "data_type": "bars_1m",
        "id": "1_MIN_BAR",
        "path": "/bars/AAPL_bars_20250905.csv"
    },
    {
        "type": "csv",
        "data_type": "custom",
        "id": "UserDataTest",
        "path": "/userdata/user_data.csv"
    }
]
Configuration Fields

CSV Support

Each entry in data_configs supports the following keys:

KeyDescription
typeInput file type. Currently supported: csv.
data_typeType of dataset: bars_1m (Market bar data) or user_data (Custom/user dataset)
nameUnique identifier for the dataset (used inside the backtest).
pathrelative file path to the data. Base path is server defined
Bar Data (data_type = "bars_1m")

Bar data represents time-series price and volume information for a symbol. HiveQ flow requires a minimum set of standardized columns:

['timestamp', 'open', 'high', 'low', 'close', 'volume', 'symbol']

Your input CSV may use provider-specific headers. Flow automatically maps provider columns to these required fields.

Example CSV Header (Provider Format)
date,sym,time,open,high,low,close,volume,total_vol,total_bar_vol,ticks,wavgp,filtered,kx_recv_time,kx_recv_date,interval_type,src_id,list_ex
Example Row
2025-08-01,AAPL,9:30,210.95,212.09,210.2647,210.835,2195476,4496491,4494942,29129,211.0691951,0,04:38.6,2025-08-02,minutes,1,XNAS
Column Mapping
Required ColumnExample CSV ColumnNotes
timestampdate + timeCombined into a single timestamp.
symbolsymTicker symbol.
openopenOpen price.
highhighHigh price.
lowlowLow price.
closecloseClose price.
volumevolumeTrade volume.
Custom Data (data_type = "custom")
  • This input type allows you to provide custom CSV datasets that can be consumed by strategies.
  • Unlike bar data, there are no fixed required columns—the schema depends entirely on how your strategy is implemented.

Example custom data

date,time,sym,signal1,weight1
2025-08-01,14:00:00.012,AAPL,1,1
2025-08-01,15:01:00.012,AAPL,0,0

The strategy consuming this dataset is responsible for interpreting the columns.

HiveQ Data API

A unified Data access API for accessing all kind of data, curated and vendor provided market data

  • Currently supports historical data (Bars)
  • See data API docs for more information about parameters
 data_configs=[{
            'type': 'hiveq_historical',
            'dataset': 'HIVEQ_US_EQ',
            'schema': ['bars_1m'],
        }]

Writing Strategies

All hiveq flow needs is a method on_hiveq_event(ctx, event) either as a global method or a method implemented in your class.

Writing your strategy - Global method (on_hiveq_event)**

import hiveq.flow as hf
from hiveq.flow.config import EventType, AssetType


# Global strategy function
def on_hiveq_event(ctx: hf.Context, event):
    """
    Global function strategy for testing.
    Simple buy and hold strategy.
    """
    if event.type == EventType.START:
        ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')
        print("Global strategy started!")

    elif event.type == EventType.BAR:
        bar = event.data()
        symbol = bar.symbol

        # Simple buy and hold - buy on first bar
        position = ctx.portfolio().net_position(symbol)
        if position == 0:
            ctx.buy_order(symbol=symbol, quantity=100)
            print(f"Global strategy: Bought 100 shares of {symbol} at {bar.close}")

    elif event.type == EventType.ORDER_FILLED:
        print("Global strategy: Order filled!")

if __name__ == '__main__':  # Have main or just a jupyter cell invoke
    results = hf.run_backtest(
        strategy_configs=[],  # Empty - should trigger global function usage
        symbols=['AAPL'],
        start_date='2025-08-01',
        end_date='2025-08-02',
        data_configs=[{
            'type': 'csv',
            'data_type': 'bars_1m',
            'id': '1_MIN_BAR',
            'path': '/bars/AAPL_bars.csv'
        }]
    )

Writing your strategy as a class - for writing modular and reusable strategies

import numpy as np

import hiveq.flow as hf
from hiveq.flow.config import EventType, AssetType
from hiveq.flow import StrategyConfig
from collections import deque


class SMACrossoverStrategy:
    def __init__(self):
        trade_size: int = 100

        self.fast_window = 10
        self.slow_window = 30
        self.trade_size = trade_size

        self.fast_prices = deque(maxlen=self.fast_window)
        self.slow_prices = deque(maxlen=self.slow_window)

        self.current_position = 0

    def on_hiveq_event(self, ctx: hf.Context, event):
        """
        SMA Crossover Strategy.
        Buys when fast SMA crosses above slow SMA (golden cross).
        Sells when fast SMA crosses below slow SMA (death cross).
        """

        if event.type == EventType.START:
            ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')

        elif event.type == EventType.BAR:
            bar = event.data()
            symbol = bar.symbol
            self.fast_prices.append(bar.close)
            self.slow_prices.append(bar.close)

            if len(self.slow_prices) < 30:
                # Not enough data yet
                return

            fast_ma = np.mean(self.fast_prices)
            slow_ma = np.mean(self.slow_prices)

            # Trading logic: Crossover
            if fast_ma > slow_ma and self.current_position <= 0:
                ctx.buy_order(symbol=symbol, quantity=100)
                self.current_position = 100
            elif fast_ma < slow_ma and self.current_position >= 0:
                ctx.sell_order(symbol=symbol, quantity=100)
                self.current_position -= 100

        elif event.type == EventType.ORDER_FILLED:
            order = event.data()
            print(f"Order filled at {order.avg_px:.2f} for {order.filled_qty} shares")
        elif event.type == EventType.POSITION_CLOSED:
            position = event.data()
            print(f"Position closed: P&L = ${position.realized_pnl:.2f}")


if __name__ == '__main__':
    # Configure strategy
    strategy_configs = [
        StrategyConfig(
            name='SMACross',
            type='SMACrossoverStrategy',
            params={'fast_window': 10, 'slow_window': 30}
        )
    ]

    results = hf.run_backtest(
        strategy_configs=strategy_configs,
        symbols=['AAPL'],
        start_date='2025-08-01',
        end_date='2025-08-02',
        data_configs=[{
            'type': 'csv',
            'data_type': 'bars_1m',
            'id': '1_MIN_BAR',
            'path': '/bars/AAPL_bars.csv'
        }]
    )

Support for individual event callback - Callback per event type

"""
SMA Crossover Strategy using Per-Event Callbacks.

This example demonstrates how to implement a trading strategy using individual
event callback methods (on_start, on_stop, on_bar, etc.) instead of the
single on_hiveq_event method. Each callback has proper type annotations
to show the expected data types.
"""

from typing import Optional, Dict, Any
from collections import deque
import numpy as np
import pandas as pd

import hiveq.flow as hf
from hiveq.flow.events import StartEvent, BarEvent, TimerEvent
from hiveq.flow import Context, StrategyConfig
from hiveq.flow.config import AssetType
from hiveq.flow.trading_types import Order, Position, Portfolio
from hiveq.flow.logger import logger

logger = logger()


class SMAEventCallbackStrategy:
    """
    SMA Crossover Strategy with per-event callbacks.

    This strategy demonstrates:
    - Individual callback methods for different event types
    - Proper type annotations for each callback
    - State management across different events
    - Position tracking and order management
    """

    def __init__(self):
        """Initialize strategy state and parameters."""
        # Strategy parameters
        self.fast_window: int = 10
        self.slow_window: int = 30
        self.trade_size: int = 100

        # Price buffers for SMA calculation
        self.fast_prices: deque = deque(maxlen=self.fast_window)
        self.slow_prices: deque = deque(maxlen=self.slow_window)

        # Position tracking (simplified - Context now handles order tracking)
        self.position_side: Optional[str] = None  # 'long', 'short', or None

        # Performance tracking
        self.trade_count: int = 0
        self.winning_trades: int = 0
        self.losing_trades: int = 0

        logger.info(f"Initialized SMA Strategy - Fast: {self.fast_window}, Slow: {self.slow_window}")

    def on_start(self, ctx: Context, event: StartEvent) -> None:
        """
        Called when the strategy starts.

        Args:
            ctx: The strategy context providing access to trading operations
        """
        logger.info("=== Strategy Starting ===")

        # Subscribe to bar data for all configured symbols
        symbols = ctx.strategy_config.symbols
        logger.info(f"Subscribing to 1-minute bars for symbols: {symbols}")
        ctx.subscribe_bars(symbols=symbols, asset_type=AssetType.EQUITY, interval='1m')

        # Set up a periodic timer for status reporting
        ctx.set_timer('status_report', pd.Timedelta(minutes=5))
        logger.info("Set up 5-minute status report timer")

        # Log strategy parameters
        logger.info(f"Strategy parameters from config: {ctx.strategy_config.params}")

        # Override parameters if provided in config
        if ctx.strategy_config.params:
            self.fast_window = ctx.strategy_config.params.get('fast_window', self.fast_window)
            self.slow_window = ctx.strategy_config.params.get('slow_window', self.slow_window)
            self.trade_size = ctx.strategy_config.params.get('trade_size', self.trade_size)

            # Reinitialize deques with new window sizes
            self.fast_prices = deque(maxlen=self.fast_window)
            self.slow_prices = deque(maxlen=self.slow_window)

            logger.info(f"Updated parameters - Fast: {self.fast_window}, Slow: {self.slow_window}, Size: {self.trade_size}")

    def on_bar(self, ctx: Context, event: BarEvent) -> None:
        bar = event.data()
        symbol = bar.symbol
        close_price = bar.close

        # Update price buffers
        self.fast_prices.append(close_price)
        self.slow_prices.append(close_price)

        # Log every 10th bar to avoid spam
        bar_count = len(self.slow_prices)
        if bar_count % 10 == 0:
            logger.debug(f"Bar #{bar_count} - {symbol}: O={bar.open:.2f}, H={bar.high:.2f}, "
                        f"L={bar.low:.2f}, C={bar.close:.2f}, V={bar.volume:.0f}")

        # Check if we have enough data for calculation
        if len(self.slow_prices) < self.slow_window:
            if bar_count == 1:
                logger.info(f"Collecting data... Need {self.slow_window} bars for slow SMA")
            return

        # Calculate SMAs
        fast_sma = np.mean(self.fast_prices)
        slow_sma = np.mean(self.slow_prices)

        # Log SMA values periodically
        if bar_count % 10 == 0:
            logger.debug(f"Fast SMA: {fast_sma:.2f}, Slow SMA: {slow_sma:.2f}")

        # Generate trading signals
        self._check_trading_signals(ctx, symbol, fast_sma, slow_sma, close_price)

    def on_timer(self, ctx: Context, event: TimerEvent) -> None:
        timer = event.data()
        # logger.info(f'Fired timer : {timer.timer_id}')

    # ==================== Helper Methods ====================

    def _check_trading_signals(self, ctx: Context, symbol: str,
                               fast_sma: float, slow_sma: float,
                               current_price: float) -> None:
        # Get current position info
        net_pos = ctx.net_position(symbol)
        pending_qty = ctx.open_order_qty(symbol)
        available_pos = net_pos - pending_qty

        # Golden Cross: Fast SMA crosses above Slow SMA (bullish signal)
        if fast_sma > slow_sma and self.position_side != 'long':
            logger.info(f"GOLDEN CROSS detected - Buying {self.trade_size} shares at ~{current_price:.2f}")
            logger.info(f"Net position: {net_pos}, Pending orders: {pending_qty}, Available: {available_pos}")

            ctx.buy_order(symbol=symbol, quantity=self.trade_size)
            self.position_side = 'long'

        # Death Cross: Fast SMA crosses below Slow SMA (bearish signal)
        elif fast_sma < slow_sma and self.position_side != 'short':
            # Can only sell what we have (netting constraint)
            if available_pos > 0:
                # Sell either our target size or available position, whichever is smaller
                sell_qty = min(self.trade_size, available_pos)

                logger.info(f"DEATH CROSS detected - Selling {sell_qty} shares at ~{current_price:.2f}")
                logger.info(f"Net position: {net_pos}, Pending orders: {pending_qty}, Available: {available_pos}")

                ctx.sell_order(symbol=symbol, quantity=sell_qty)

                # Update position side based on what we expect to have
                expected_pos_after = available_pos - sell_qty
                if expected_pos_after <= 0:
                    self.position_side = None  # Will be flat
                else:
                    self.position_side = 'long'  # Still long but reduced
            else:
                logger.info(f"DEATH CROSS detected but no position to sell")
                logger.info(f"Net position: {net_pos}, Pending orders: {pending_qty}, Available: {available_pos}")
                self.position_side = 'short'  # Signal detected but no action taken


if __name__ == '__main__':
    """
    Example of running the strategy with per-event callbacks.
    """

    # Configure strategy with parameters
    strategy_configs = [
        StrategyConfig(
            name='SMAEventCallbacks',
            type='SMAEventCallbackStrategy',  # Class name
            params={
                'fast_window': 10,
                'slow_window': 30,
                'trade_size': 100
            }
        )
    ]

    # Run backtest
    results = hf.run_backtest(
        strategy_configs=strategy_configs,
        symbols=['AAPL'],
        start_date='2025-08-01',
        end_date='2025-08-02',
        data_configs=[{
            'type': 'csv',
            'data_type': 'bars_1m',
            'id': '1_MIN_BAR',
            'path': 'bars/AAPL_bars.csv'
        }]
    )

Using suite of prebuilt strategies

import hiveq.flow as hf
from hiveq.flow import StrategyConfig

strategy_configs = [StrategyConfig(name='TestUserSignal', type='UserSignalStrategy',
                                   params={'user_data_list': ['UserDataTest'],
                                           'universe_file': 'universe/univ.csv',
                                           'mm': 1.0, 'enable_holding': False, 'mm_reset': 1.0,
                                           'entry_begin_time': "00:00:01", 'entry_end_time': "15:49:00",
                                           'exit_begin_time': "15:49:30", 'exit_end_time': "16:00:00",
                                           'stop_loss_bps': 0
                                           })]
results = hf.run_backtest(
    strategy_configs=strategy_configs,
    symbols=['AAPL'],
    start_date='2025-08-01',
    end_date='2025-08-02',
    data_configs=[{
        'type': 'csv',
        'data_type': 'bars_1m',
        'id': '1_MIN_BAR',
        'path': 'bars/AAPL_bars.csv'
    }, {'type': 'csv', 'data_type': 'custom',
                                        'path': 'userdata/user_data.csv',
                                        'id': 'UserDataTest'}]
)

User Guide

API Reference

Developer Guide

On this page