HiveQ Docs
HiveQ Flow

API Reference

HiveQ Flow API Reference

Complete API reference for HiveQ Flow trading system (v0.2.1).

Table of Contents


Environment Setup

HiveQ Flow auto-initializes from environment variables. No manual init() call is required.

Required Environment Variables

VariableDescription
HIVEQ_API_KEYAPI key for authentication
HIVEQ_USER_NAMEUser's name (used as trader_id)
HIVEQ_USER_IDUser ID
HIVEQ_ORG_IDOrganization ID

Set these in a .env file in your project root or export them in your shell:

export HIVEQ_API_KEY="your_api_key"
export HIVEQ_USER_NAME="YourName"
export HIVEQ_USER_ID="your_user_id"
export HIVEQ_ORG_ID="your_org_id"

When you call any HiveQ Flow function (e.g., run_backtest()), the system automatically reads these environment variables and initializes.


Quick Reference

Common Subscriptions

# Bars (OHLCV)
ctx.subscribe_bars(symbols=['AAPL'], asset_type=AssetType.EQUITY, interval='1m')
ctx.subscribe_futures_bars(symbols=['ES.c.0'], interval='1m')  # Continuous futures

# Options snapshots
ctx.subscribe_option_snaps('SPX', strike=5000, option_type='C', expiration_type='0dte', interval='1s')
ctx.subscribe_option_snaps('SPX', interval='1s')  # All strikes, types, expirations

# TBBO (top-of-book + trades)
ctx.subscribe_tbbo(symbols=['AAPL'], asset_type=AssetType.EQUITY)
ctx.subscribe_futures_tbbo(symbols=['ES.c.0'])

# Ticks
ctx.subscribe_trade_ticks(symbols=['AAPL'], asset_type=AssetType.EQUITY)
ctx.subscribe_quotes(symbols=['AAPL'], asset_type=AssetType.EQUITY)

# Custom data (signals, ML predictions)
ctx.subscribe_data(data_id='kafka_signals')

Common Callbacks

def on_start(self, ctx: Context, event: StartEvent):             # Initialize, subscribe
def on_bar(self, ctx: Context, event: BarEvent):                 # Bar data
def on_snap(self, ctx: Context, event: SnapEvent):               # Options snapshot
def on_trade(self, ctx: Context, event: TradeEvent):             # Trade tick
def on_quote(self, ctx: Context, event: QuoteEvent):             # Quote update
def on_custom_data(self, ctx: Context, event: CustomDataEvent):  # Custom data/signals
def on_order(self, ctx: Context, event: OrderEvent):             # Order events (filled, rejected, etc.)

Common Context Methods

# Orders
order = ctx.buy_order(symbol, quantity)
order = ctx.sell_order(symbol, quantity)
order = ctx.buy_order(symbol, quantity, order_type=OrderType.LIMIT, limit_price=150.00)
order = ctx.buy_order(symbol, quantity, order_type=OrderType.MOO)   # Market-on-open
order = ctx.sell_order(symbol, quantity, order_type=OrderType.MOC)  # Market-on-close

# Positions
pos = ctx.net_position(symbol)            # Current position quantity
is_flat = ctx.is_flat(symbol)             # True if no position
is_long = ctx.is_net_long(symbol)         # True if long position
is_short = ctx.is_net_short(symbol)       # True if short position

# Portfolio
portfolio = ctx.portfolio()
pnl = portfolio.total_pnl()              # Total P&L
cash = portfolio.cash()                   # Available cash

Asset Types

from hiveq.flow.config import AssetType

AssetType.EQUITY    # Stocks: 'AAPL', 'GOOGL'
AssetType.FUTURES   # Futures: 'ES.c.0', 'ESH25'
AssetType.OPTIONS   # Options: Use subscribe_option_snaps()
AssetType.CRYPTO    # Crypto: 'BTC-USD'

Options Filtering

# Filter by strike
ctx.subscribe_option_snaps('SPX', strike=5000)

# Filter by type
ctx.subscribe_option_snaps('SPX', option_type='C')  # Calls only
ctx.subscribe_option_snaps('SPX', option_type='P')  # Puts only

# Filter by expiration
ctx.subscribe_option_snaps('SPX', expiration_type='0dte')         # 0DTE only
ctx.subscribe_option_snaps('SPX', expiration_type='2025-01-17')   # Specific date

# Combine filters
ctx.subscribe_option_snaps('SPX', strike=5000, option_type='C', expiration_type='0dte')

Futures Continuous Contracts

'ES.v.0'  # Volume-weighted front month (auto-rollover)
'ES.c.0'  # Calendar front month (auto-rollover)
'ESH25'   # Specific contract (March 2025)

Core Functions

hiveq.flow.run_backtest()

Run a backtest with one or more trading strategies.

Signature:

def run_backtest(
    strategy_configs: list[StrategyConfig],
    symbols: Optional[list[str]] = None,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    data_configs: Optional[list[dict]] = None,
    backtest_config: Optional[BacktestConfig] = None,
    strategy_type: str = 'intraday',
    fetch_mode: str = 'daily'
) -> PerformanceReport

Parameters:

  • strategy_configs (list[StrategyConfig]): List of strategy configurations
  • symbols (list[str], optional): Trading symbols (e.g., ["AAPL", "GOOGL"])
  • start_date (str, optional): Start date in 'YYYY-MM-DD' format
  • end_date (str, optional): End date in 'YYYY-MM-DD' format
  • data_configs (list[dict], optional): Data source configurations
  • backtest_config (BacktestConfig, optional): Complete backtest configuration
  • strategy_type (str): Strategy type - 'intraday' (default) or 'overnight'
  • fetch_mode (str): Data fetch mode - 'daily' (default) or 'prefetch' (only for intraday)

Execution Modes:

strategy_typefetch_modeData FetchExecutionState Reset
'intraday''daily'Day-by-dayDay-by-dayEach day
'intraday''prefetch'Single fetchSingle runNever
'overnight'(ignored)Single fetchSingle runNever
  • intraday + daily: Best for intraday strategies. Fetches data day-by-day, runs strategy day-by-day, recreates strategy instance each day (automatic state reset).
  • intraday + prefetch: Faster for shorter backtests. Single data fetch, single continuous run, no automatic state reset.
  • overnight: For swing/position trading. Single data fetch, single continuous run, positions can be held overnight.

Data Config Structure:

{
    'type': 'csv' | 'hiveq_historical',
    'data_type': 'bars_1m' | 'custom',
    'id': 'unique_identifier',
    'path': 'relative/path/to/file.csv',  # for CSV
    'dataset': 'HIVEQ_US_EQ',  # for hiveq_historical
    'schema': ['bars_1m'],  # for hiveq_historical
}

Returns: PerformanceReport with backtest results

Example:

import hiveq.flow as hf
from hiveq.flow import StrategyConfig

# Configure strategy
strategy_configs = [
    StrategyConfig(
        name='SMA_Strategy',
        type='SMACrossoverStrategy',
        params={'fast_window': 10, 'slow_window': 30}
    )
]

# Run backtest with CSV data
report = hf.run_backtest(
    strategy_configs=strategy_configs,
    symbols=['AAPL'],
    start_date='2025-08-01',
    end_date='2025-08-30',
    data_configs=[{
        'type': 'csv',
        'data_type': 'bars_1m',
        'id': '1_MIN_BAR',
        'path': 'bars/AAPL_bars.csv'
    }]
)

# Run backtest with HiveQ historical data (default: intraday + daily)
report = hf.run_backtest(
    strategy_configs=strategy_configs,
    symbols=['AAPL'],
    start_date='2025-08-01',
    end_date='2025-08-30',
    data_configs=[{
        'type': 'hiveq_historical',
        'dataset': 'HIVEQ_US_EQ',
        'schema': ['bars_1m'],
    }]
)

# Run backtest with prefetch mode (faster for shorter backtests)
report = hf.run_backtest(
    strategy_configs=strategy_configs,
    symbols=['AAPL'],
    start_date='2025-08-01',
    end_date='2025-08-30',
    data_configs=[{
        'type': 'hiveq_historical',
        'dataset': 'HIVEQ_US_EQ',
        'schema': ['bars_1m'],
    }],
    strategy_type='intraday',
    fetch_mode='prefetch'
)

# Run overnight strategy (swing trading)
report = hf.run_backtest(
    strategy_configs=strategy_configs,
    symbols=['AAPL'],
    start_date='2025-08-01',
    end_date='2025-08-30',
    data_configs=[{
        'type': 'hiveq_historical',
        'dataset': 'HIVEQ_US_EQ',
        'schema': ['bars_1m'],
    }],
    strategy_type='overnight'
)

# Access results
print(report.return_stats.to_string())
print(report.positions.to_string())
print(report.fills.to_string())

hiveq.flow.run_live()

Run live trading with one or more strategies.

Signature:

def run_live(
    strategy_configs: list[StrategyConfig],
    symbols: list[str],
    data_configs: list[dict],
    live_config: Optional[LiveConfig] = None
) -> None

Parameters:

  • strategy_configs (list[StrategyConfig]): Strategy configurations
  • symbols (list[str]): Trading symbols
  • data_configs (list[dict]): Live data client configurations
  • live_config (LiveConfig, optional): Live trading configuration

Data Config for Live Trading:

{
    'type': 'databento',
    'id': 'databento_client',
    'api_key': 'db-xxx',
    'venue_dataset_map': {
        'SIM': 'EQUS.MINI',
        'XCME': 'GLBX.MDP3'
    }
}

Example:

import hiveq.flow as hf
from hiveq.flow import StrategyConfig

strategy_configs = [
    StrategyConfig(
        name='SMALive',
        type='SMACrossoverStrategy'
    )
]

hf.run_live(
    strategy_configs=strategy_configs,
    symbols=['AAPL'],
    data_configs=[{
        'type': 'databento',
        'id': 'databento_client',
        'api_key': 'db-xxx',
        'venue_dataset_map': {"SIM": "EQUS.MINI"}
    }]
)

hiveq.flow.deploy_backtest()

Deploy a backtest to run remotely on the HiveQ orchestrator.

Signature:

def deploy_backtest(
    strategy_configs: list[StrategyConfig],
    symbols: Optional[list[str]] = None,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    data_configs: Optional[list[dict]] = None,
    backtest_config: Optional[BacktestConfig] = None,
    task_name: Optional[str] = None,
    priority: int = 5,
    allow_duplicate: bool = True,
    duplicate_action: str = 'override'
) -> dict

Parameters:

  • strategy_configs (list[StrategyConfig]): List of strategy configurations
  • symbols (list[str], optional): Trading symbols
  • start_date (str, optional): Start date in 'YYYY-MM-DD' format
  • end_date (str, optional): End date in 'YYYY-MM-DD' format
  • data_configs (list[dict], optional): Data source configurations
  • backtest_config (BacktestConfig, optional): Complete backtest configuration
  • task_name (str, optional): Unique name for this task
  • priority (int): Task priority (0-10, higher = more important)
  • allow_duplicate (bool): Allow submission if task with same name exists
  • duplicate_action (str): Action for existing task: 'override', 'terminate', or 'duplicate'

Returns: Dictionary containing:

  • status: 'success' or 'error'
  • task_id: The task ID for tracking (if status is 'success')
  • message: Error message (if status is 'error')

Example:

import hiveq.flow as hf
from hiveq.flow import StrategyConfig

# Deploy backtest to run remotely
result = hf.deploy_backtest(
    strategy_configs=[StrategyConfig(name='MyStrategy', type='MyStrategy')],
    symbols=['AAPL'],
    start_date='2025-01-01',
    end_date='2025-01-31',
    data_configs=[{
        'type': 'hiveq_historical',
        'dataset': 'HIVEQ_US_EQ',
        'schema': ['bars_1m']
    }]
)

if result['status'] == 'success':
    print(f"Backtest deployed with task_id: {result['task_id']}")

hiveq.flow.event_logs()

Get event logs for all strategies.

Signature:

def event_logs() -> pd.DataFrame

Returns: DataFrame with columns:

  • timestamp: Event timestamp
  • strategy_id: Strategy identifier
  • event_type: Type of event
  • symbol: Symbol (if applicable)
  • message: Event message
  • Additional event-specific fields

Example:

import hiveq.flow as hf

# After running backtest
report = hf.run_backtest(...)

# Get event logs
event_df = hf.event_logs()

# Filter by strategy
strategy_logs = event_df[event_df['strategy_id'] == 'my_strategy']

# Filter by event type
filled_orders = event_df[event_df['event_type'] == 'ORDER_FILLED']

print(event_df.to_string())

hiveq.flow.config()

Get the current engine configuration.

Signature:

def config() -> EngineConfig

Returns: EngineConfig - The current engine configuration including:

  • oms: OMS type ('NAUTILUS')
  • oms_log_level: OMS logging level
  • Other engine settings

Example:

import hiveq.flow as hf

cfg = hf.config()
print(f"OMS: {cfg.oms}")

hiveq.flow.backtest_config()

Get the current backtest configuration.

Signature:

def backtest_config() -> BacktestConfig

Returns: BacktestConfig - The backtest configuration used for current/recent run including:

  • start_date, end_date: Date range
  • initial_capital: Starting capital
  • commission, slippage: Trading costs
  • deploy: Deploy mode flag

Example:

import hiveq.flow as hf

report = hf.run_backtest(...)

config = hf.backtest_config()
print(f"Start: {config.start_date}, Capital: ${config.initial_capital:,.2f}")

hiveq.flow.execution_mode()

Get the current execution mode.

Signature:

def execution_mode() -> ExecutionMode

Returns: ExecutionMode enum value:

  • ExecutionMode.BACKTEST: Historical backtesting
  • ExecutionMode.LIVE: Live trading (use paper_trading flag in LiveConfig for paper trading)

Example:

import hiveq.flow as hf
from hiveq.flow.config import ExecutionMode

report = hf.run_backtest(...)

mode = hf.execution_mode()
if mode == ExecutionMode.BACKTEST:
    print("Running in backtest mode")

hiveq.flow.logger()

Get the HiveQ Flow logger instance.

Signature:

def logger() -> Logger

Returns: Logger instance

Example:

import hiveq.flow as hf

logger = hf.logger()
logger.info("Strategy initialized")
logger.debug("Processing bar data")
logger.warning("High volatility detected")
logger.error("Order rejected")

Context API

The Context object is the primary interface for strategies to interact with the trading system.

Market Data Subscriptions

ctx.subscribe_bars()

Subscribe to bar (OHLCV) data for specified symbols.

Signature:

def subscribe_bars(
    symbols: List[str],
    asset_type: AssetType,
    interval: str
) -> None

Parameters:

  • symbols (list[str]): List of trading symbols
  • asset_type (AssetType): Asset type (EQUITY, FUTURES, OPTIONS). Required.
  • interval (str): Bar interval. Required.
    • Seconds: "1s", "5s", "30s"
    • Minutes: "1m", "5m", "15m", "30m"
    • Hours: "1h", "4h"
    • Days: "1d"

Example:

def on_start(ctx: hf.Context, event: hf.StartEvent):
    # Subscribe to 1-minute bars for equities
    ctx.subscribe_bars(["AAPL", "GOOGL"], asset_type=AssetType.EQUITY, interval="1m")

    # Subscribe to 30-minute bars for futures
    ctx.subscribe_bars(["ES.H25"], asset_type=AssetType.FUTURES, interval="30m")

    # Subscribe to 1-hour bars for options
    ctx.subscribe_bars(["SPY"], asset_type=AssetType.OPTIONS, interval="1h")

ctx.subscribe_data()

Subscribe to custom data events.

Signature:

def subscribe_data(data_id: str, signals: List[str] = None) -> None

Parameters:

  • data_id (str): Identifier matching the 'id' in data_config
  • signals (List[str], optional): List of signal symbols to fetch from the API. Used during prefetch phase to dynamically specify which signals to load. If not specified, the symbols field in data_config is used.

Example 1: Static signals (symbols in data_config)

# Signals specified in data_config
report = hf.run_backtest(
    strategy_configs=[config],
    data_configs=[{
        'type': 'hiveq_historical',
        'dataset': 'HIVEQ_QUANT_SIGNALS',
        'schema': ['signals'],
        'id': 'UserSignals',
        'symbols': ['signal_1', 'signal_2']  # Static specification
    }]
)

# In strategy:
def on_start(ctx: hf.Context, event: hf.StartEvent):
    ctx.subscribe_data("UserSignals")  # No signals param needed

Example 2: Dynamic signals (signals in subscribe_data)

# No 'symbols' in data_config - specified dynamically in strategy
report = hf.run_backtest(
    strategy_configs=[config],
    data_configs=[{
        'type': 'hiveq_historical',
        'dataset': 'HIVEQ_QUANT_SIGNALS',
        'schema': ['signals'],
        'id': 'UserSignals'  # No 'symbols' here
    }]
)

# In strategy - signals determined at runtime
def on_start(ctx: hf.Context, event: hf.StartEvent):
    # Signals captured during prefetch and used to filter API fetch
    ctx.subscribe_data("UserSignals", signals=['signal_1', 'signal_2'])

def on_custom_data(ctx: hf.Context, event: hf.CustomDataEvent):
    data = event.data()
    symbol = data.column_data('symbol')
    signal_json = data.column_data('signal_json')

Priority: If both data_config['symbols'] and signals parameter are specified, data_config['symbols'] takes priority.


ctx.subscribe_tbbo()

Subscribe to TBBO (Top of Book + Trades) data for real-time trade and quote ticks.

Signature:

def subscribe_tbbo(
    symbols: List[str],
    asset_type: AssetType = AssetType.EQUITY
) -> None

Parameters:

  • symbols (list[str]): List of trading symbols
  • asset_type (AssetType): Asset type (default: EQUITY)

What is TBBO? TBBO provides both:

  • Trade ticks: Individual trade executions with price, size, and aggressor side
  • Quote ticks: Best bid/offer (L1 quotes) with prices and sizes

Ideal for high-frequency strategies needing both trade flow and top-of-book information.

Example:

def on_start(ctx: hf.Context, event: hf.StartEvent):
    # Subscribe to TBBO data
    ctx.subscribe_tbbo(symbols=['AAPL', 'GOOGL'])

def on_trade(self, ctx: hf.Context, event: hf.TradeEvent):
    """Handle trade tick events."""
    trade = event.data()
    # trade.symbol, trade.price, trade.size
    # trade.aggressor_side: 'BUYER', 'SELLER', or 'NO_AGGRESSOR'
    # trade.trade_id, trade.ts_event

    if trade.aggressor_side == "BUYER":
        print(f"Aggressive buying: {trade.size} @ ${trade.price}")

def on_quote(self, ctx: hf.Context, event: hf.QuoteEvent):
    """Handle quote tick events."""
    quote = event.data()
    # quote.symbol, quote.bid_price, quote.ask_price
    # quote.bid_size, quote.ask_size, quote.ts_event

    spread = quote.ask_price - quote.bid_price
    mid_price = (quote.bid_price + quote.ask_price) / 2
    print(f"Spread: ${spread:.2f}, Mid: ${mid_price:.2f}")

Data Config for TBBO:

data_configs=[{
    'type': 'hiveq_historical',
    'dataset': 'HIVEQ_US_EQ',
    'schema': ['tbbo']  # or 'trades'
}]

ctx.subscribe_option_snaps()

Subscribe to options snapshots with flexible filtering (strike, type, expiration).

Signature:

def subscribe_option_snaps(
    symbol: str,  # Underlying symbol
    option_type: Optional[str] = None,  # 'C'/'Call', 'P'/'Put', or None (both)
    strike: Optional[float] = None,  # Specific strike or None (all)
    expiration_type: Optional[str] = None,  # '0dte', 'YYYYMMDD', 'YYYY-MM-DD', or None (all)
    interval: str = '1s'
) -> None

Parameters:

  • symbol (str): Underlying symbol (e.g., 'SPXW', 'SPY')
  • option_type (str, optional): 'C'/'Call' for calls, 'P'/'Put' for puts, None for both
  • strike (float, optional): Specific strike price or None for all strikes
  • expiration_type (str, optional): '0dte', 'YYYYMMDD' (e.g., '20250117'), 'YYYY-MM-DD', or None for all
  • interval (str): Snapshot interval (default: '1s')

Examples:

def on_start(ctx: hf.Context, event: hf.StartEvent):
    # All SPXW 0DTE options
    ctx.subscribe_option_snaps('SPXW', expiration_type='0dte')

    # Only calls, specific strike
    ctx.subscribe_option_snaps('SPXW', option_type='C', strike=4500.0, expiration_type='0dte')

    # Specific date
    ctx.subscribe_option_snaps('SPY', expiration_type='2025-01-17', interval='1m')

def on_snap(ctx: hf.Context, event: hf.SnapEvent):
    snap = event.data()
    # snap.chain (full symbol), snap.strike, snap.price, snap.bid, snap.ask
    print(f"{snap.chain} Strike={snap.strike} Price={snap.price}")

Data Config for Snaps:

data_configs=[{
    'type': 'hiveq_historical',
    'dataset': 'HIVEQ_US_OPT',
    'schema': ['snaps_1s']
}]

ctx.subscribe_futures_bars()

Subscribe to futures bar data with automatic rollover for continuous contracts.

Signature:

def subscribe_futures_bars(symbols: List[str], interval: str) -> None

Parameters:

  • symbols (list[str]): Futures symbols. Supports continuous notation: 'ES.v.0' (front month), 'ES.c.0' (calendar), or specific 'ESH25'
  • interval (str): Bar interval

Example:

def on_start(ctx: hf.Context, event: hf.StartEvent):
    ctx.subscribe_futures_bars(['ES.v.0', 'NQ.v.0'], '1m')  # Continuous with rollover
    ctx.subscribe_futures_bars(['ESH25'], '5m')  # Specific contract

ctx.subscribe_futures_tbbo()

Subscribe to futures TBBO (quotes + trades).

Signature:

def subscribe_futures_tbbo(symbols: List[str]) -> None

Example:

def on_start(ctx: hf.Context, event: hf.StartEvent):
    ctx.subscribe_futures_tbbo(['ES.v.0', 'NQ.v.0'])

ctx.subscribe_trade_ticks()

Subscribe to individual trade executions.

Signature:

def subscribe_trade_ticks(symbols: List[str], asset_type: AssetType = AssetType.EQUITY) -> None

Example:

def on_start(ctx: hf.Context, event: hf.StartEvent):
    ctx.subscribe_trade_ticks(['AAPL'], AssetType.EQUITY)

def on_trade(ctx: hf.Context, event: hf.TradeEvent):
    trade = event.data()
    print(f"{trade.symbol} Trade: {trade.price} x {trade.size}")

ctx.subscribe_quotes()

Subscribe to bid/ask quote updates.

Signature:

def subscribe_quotes(symbols: List[str], asset_type: AssetType = AssetType.EQUITY) -> None

Example:

def on_start(ctx: hf.Context, event: hf.StartEvent):
    ctx.subscribe_quotes(['AAPL'], AssetType.EQUITY)

def on_quote(ctx: hf.Context, event: hf.QuoteEvent):
    quote = event.data()
    print(f"Bid: {quote.bid_price} x {quote.bid_size}, Ask: {quote.ask_price} x {quote.ask_size}")

Order Management

ctx.buy_order()

Place a buy order to enter long or cover short position.

Signature:

def buy_order(
    symbol: str,
    quantity: float,
    order_type: OrderType = OrderType.MARKET,
    limit_price: Optional[float] = None,
    stop_price: Optional[float] = None
) -> Order

Parameters:

  • symbol (str): Trading symbol
  • quantity (float): Number of shares/contracts (must be positive)
  • order_type (OrderType): MARKET, LIMIT, or STOP
  • limit_price (float, optional): Required for LIMIT orders
  • stop_price (float, optional): Required for STOP orders

Returns: Order object with order_id, symbol, quantity, prices

Example:

def on_bar(ctx: hf.Context, event: hf.BarEvent):
    bar = event.data()

    # Market order
    order = ctx.buy_order("AAPL", quantity=100)
    print(f"Order ID: {order.order_id}")

    # Limit order
    order = ctx.buy_order(
        "AAPL",
        quantity=100,
        order_type=OrderType.LIMIT,
        limit_price=150.50
    )

    # Stop order
    order = ctx.buy_order(
        "AAPL",
        quantity=100,
        order_type=OrderType.STOP,
        stop_price=155.00
    )

ctx.sell_order()

Place a sell order to close long position or reduce exposure.

Signature:

def sell_order(
    symbol: str,
    quantity: float,
    order_type: OrderType = OrderType.MARKET,
    limit_price: Optional[float] = None,
    stop_price: Optional[float] = None
) -> Order

Parameters: Same as buy_order()

Example:

def on_bar(ctx: hf.Context, event: hf.BarEvent):
    if ctx.is_net_long("AAPL"):
        # Exit with market order
        order = ctx.sell_order("AAPL", quantity=100)

        # Or use limit order
        order = ctx.sell_order(
            "AAPL",
            quantity=100,
            order_type=OrderType.LIMIT,
            limit_price=152.00
        )

ctx.cancel_order()

Cancel an existing pending order.

Signature:

def cancel_order(order_id: str) -> bool

Parameters:

  • order_id (str): Order ID from buy_order/sell_order

Returns: True if cancellation successful, False otherwise

Example:

def on_bar(ctx: hf.Context, event: hf.BarEvent):
    bar = event.data()

    # Place limit order
    order = ctx.buy_order(
        "AAPL",
        quantity=100,
        order_type=OrderType.LIMIT,
        limit_price=150.00
    )

    # Cancel if price moves away
    if bar.close > 152.00:
        if ctx.cancel_order(order.order_id):
            print("Order canceled successfully")

Position Management

ctx.net_position()

Get current net position quantity for a symbol.

Signature:

def net_position(symbol: str) -> float

Returns: Net position (positive=long, negative=short, 0=flat)

Example:

def on_bar(ctx: hf.Context, event: hf.BarEvent):
    position = ctx.net_position("AAPL")
    print(f"Position: {position}")
    # 100.0 = long 100 shares
    # -50.0 = short 50 shares
    # 0.0 = flat

ctx.is_flat()

Check if position is flat (zero).

Signature:

def is_flat(symbol: str) -> bool

Returns: True if no position, False otherwise

Example:

def on_bar(ctx: hf.Context, event: hf.BarEvent):
    if ctx.is_flat("AAPL"):
        # Enter new position
        ctx.buy_order("AAPL", quantity=100)

ctx.is_net_long()

Check if holding net long position.

Signature:

def is_net_long(symbol: str) -> bool

Example:

def on_bar(ctx: hf.Context, event: hf.BarEvent):
    if ctx.is_net_long("AAPL"):
        pos = ctx.net_position("AAPL")
        ctx.sell_order("AAPL", quantity=pos)

ctx.is_net_short()

Check if holding net short position.

Signature:

def is_net_short(symbol: str) -> bool

Example:

def on_bar(ctx: hf.Context, event: hf.BarEvent):
    if ctx.is_net_short("AAPL"):
        pos = abs(ctx.net_position("AAPL"))
        ctx.buy_order("AAPL", quantity=pos)

ctx.has_open_order()

Check if there are pending/open orders.

Signature:

def has_open_order(symbol: str = None) -> bool

Parameters:

  • symbol (str, optional): Symbol to check. If None, checks all symbols.

Example:

def on_bar(ctx: hf.Context, event: hf.BarEvent):
    # Only place new order if no pending orders
    if not ctx.has_open_order("AAPL"):
        ctx.buy_order("AAPL", quantity=100)

ctx.open_order_qty()

Get net quantity of all pending orders.

Signature:

def open_order_qty(symbol: str = None) -> float

Returns: Net pending order quantity (positive=buy, negative=sell)

Example:

def on_bar(ctx: hf.Context, event: hf.BarEvent):
    pending = ctx.open_order_qty("AAPL")
    current_pos = ctx.net_position("AAPL")

    # Total exposure = position + pending
    total_exposure = current_pos + pending

    if total_exposure < 1000:
        ctx.buy_order("AAPL", quantity=100)

Portfolio Information

ctx.portfolio()

Get strategy-filtered portfolio.

Signature:

def portfolio() -> Portfolio

Returns: Portfolio object with strategy-filtered positions and P&L

Example:

def on_bar(ctx: hf.Context, event: hf.BarEvent):
    portfolio = ctx.portfolio()

    balance = portfolio.cash()
    position = portfolio.net_position("AAPL")
    is_long = portfolio.is_net_long("AAPL")
    is_flat = portfolio.is_flat("AAPL")

ctx.global_portfolio()

Get account-level portfolio across all strategies.

Signature:

def global_portfolio() -> GlobalPortfolio

Returns: Global portfolio with aggregate data

Example:

def on_bar(ctx: hf.Context, event: hf.BarEvent):
    # Strategy view
    strategy_pnl = ctx.portfolio().total_pnl()

    # Global account view
    global_portfolio = ctx.global_portfolio()
    account_pnl = global_portfolio.total_pnl()
    total_exposure = global_portfolio.net_exposure()

Instrument Access

ctx.instrument()

Get instrument by symbol.

Signature:

def instrument(symbol: str) -> Instrument

Returns: Instrument object with properties and last values

Instrument Attributes:

  • symbol (str): Trading symbol
  • last_bar (Bar): Most recent bar
  • multiplier (float): Contract multiplier
  • min_tick (float): Minimum price movement
  • asset_type (AssetType): Asset type
  • security_details (DataFrame): Full reference data
  • native_instrument_id: Internal OMS identifier

Example:

def on_bar(ctx: hf.Context, event: hf.BarEvent):
    inst = ctx.instrument('AAPL')

    # Access last bar
    if inst.last_bar:
        print(f"Last close: {inst.last_bar.close}")

    # Check properties
    print(f"Tick size: {inst.min_tick}")
    print(f"Multiplier: {inst.multiplier}")

    # Calculate notional
    position_size = 100
    price = inst.last_bar.close
    notional = position_size * price * inst.multiplier

Timers

ctx.set_timer()

Set a repeating timer.

Signature:

def set_timer(timer_id: str, timer_interval: pd.Timedelta) -> None

Parameters:

  • timer_id (str): Unique timer identifier
  • timer_interval (pd.Timedelta): Interval between timer events

Example:

def on_start(ctx: hf.Context, event: hf.StartEvent):
    ctx.set_timer('status_report', pd.Timedelta(minutes=5))

def on_hiveq_event(ctx: hf.Context, event: hf.Event):
    if event.type == EventType.TIMER:
        timer = event.data()
        if timer.timer_id == 'status_report':
            print("5 minutes elapsed")

ctx.cancel_timer()

Cancel a running timer.

Signature:

def cancel_timer(timer_id: str) -> None

Example:

ctx.cancel_timer('my_timer')

Event Logging

ctx.add_event_log()

Add custom event log entry.

Signature:

def add_event_log(
    message: str,
    sub_event_type: Optional[str] = None,
    symbol: Optional[str] = None,
    state_variable: Optional[dict] = None
) -> None

Parameters:

  • message (str): Descriptive message
  • sub_event_type (str, optional): Event sub-type
  • symbol (str, optional): Associated symbol
  • state_variable (dict, optional): Additional state variables

Example:

def on_bar(self, ctx: Context, event: BarEvent):
    bar = event.data()

    # Log signal
    ctx.add_event_log(
        message="Bullish signal detected",
        symbol=bar.symbol,
        state_variable={"price": bar.close}
    )

    # Log trade entry
    ctx.add_event_log(
        message="Entering long position",
        symbol=bar.symbol
    )

Strategy Callbacks

Event handler methods called by the framework. Implement these in your strategy class.

on_start(ctx, event)

Called once when strategy starts. Use for subscriptions and initialization.

def on_start(self, ctx: Context, event: StartEvent):
    ctx.subscribe_bars(['AAPL'], AssetType.EQUITY, '1m')

on_bar(ctx, event)

Called for each bar (OHLCV) update.

def on_bar(self, ctx: Context, event: BarEvent):
    bar = event.data()  # or event.bar
    # bar.symbol, bar.open, bar.high, bar.low, bar.close, bar.volume

on_trade(ctx, event)

Called for each trade tick.

def on_trade(self, ctx: Context, event: TradeEvent):
    trade = event.data()
    # trade.symbol, trade.price, trade.size, trade.aggressor_side

on_quote(ctx, event)

Called for each quote (bid/ask) update.

def on_quote(self, ctx: Context, event: QuoteEvent):
    quote = event.data()
    # quote.symbol, quote.bid_price, quote.ask_price, quote.bid_size, quote.ask_size

on_snap(ctx, event)

Called for each options snapshot.

def on_snap(self, ctx: Context, event: SnapEvent):
    snap = event.data()
    # snap.chain, snap.strike, snap.price, snap.bid, snap.ask, snap.greeks

on_custom_data(ctx, event)

Called for custom data events (signals, ML predictions, etc.).

def on_custom_data(self, ctx: Context, event: CustomDataEvent):
    data = event.data()
    signal = data.column_data('signal1', default=0)
    weight = data.column_data('weight1', default=0.0)

on_order(ctx, event)

Called for all order state changes (filled, rejected, accepted, etc.).

def on_order(self, ctx: Context, event: OrderEvent):
    if event.type == EventType.ORDER_FILLED:
        order = event.data()
        fill = order.last_fill
        # fill.symbol, fill.last_qty, fill.last_px
    elif event.type == EventType.ORDER_REJECTED:
        order = event.data()
        # Handle rejection

Note: Use on_order() to handle all order events. Check event.type to distinguish between ORDER_FILLED, ORDER_REJECTED, ORDER_CANCELED, etc. Individual callbacks like on_order_filled or on_fill do not exist.

def on_order(self, ctx: Context, event: OrderEvent):
    order = event.data()
    if event.type == EventType.ORDER_FILLED:
        fill = order.last_fill
        print(f"Filled: {fill.last_qty} @ {fill.last_px}")
    elif event.type == EventType.ORDER_REJECTED:
        print(f"Rejected: {order.symbol} {order.order_id}")

on_position(ctx, event)

Called when position changes.

def on_position(self, ctx: Context, event: PositionEvent):
    position = event.data()
    # position.symbol, position.quantity, position.avg_px

on_timer(ctx, event)

Called at set intervals (use ctx.set_timer() to configure).

def on_timer(self, ctx: Context, event: TimerEvent):
    # Periodic checks, rebalancing, etc.

on_stop(ctx, event)

Called when strategy stops.

def on_stop(self, ctx: Context, event: StopEvent):
    # Cleanup, close positions, etc.

Unified Event Handler (Alternative Pattern)

Instead of individual callbacks, implement one handler for all events:

def on_hiveq_event(self, ctx: Context, event: Event):
    if event.type == EventType.START:
        ctx.subscribe_bars(['AAPL'], AssetType.EQUITY, '1m')
    elif event.type == EventType.BAR:
        bar = event.data()
        # Handle bar
    elif event.type == EventType.ORDER_FILLED:
        order = event.data()
        # Handle fill
    elif event.type == EventType.CUSTOM_DATA:
        data = event.data()
        # Handle custom data

Configuration

StrategyConfig

Strategy configuration class.

Attributes:

  • name (str): Strategy instance name
  • type (str): Strategy type/class name
  • symbols (list[str], optional): Trading symbols
  • params (dict): Strategy-specific parameters

Example:

from hiveq.flow import StrategyConfig

config = StrategyConfig(
    name='SMA_Crossover',
    type='SMACrossoverStrategy',
    symbols=['AAPL'],
    params={
        'fast_window': 10,
        'slow_window': 30,
        'trade_size': 100
    }
)

# Access params as attributes
print(config.fast_window)  # 10

BacktestConfig

Backtest configuration class.

Attributes:

  • symbols (list[str]): Trading symbols
  • start_date (str): Start date 'YYYY-MM-DD'
  • end_date (str): End date 'YYYY-MM-DD'
  • initial_capital (float): Starting capital (default: 1,000,000)
  • commission (float): Commission rate as decimal (default: 0.001)
  • slippage (float): Slippage rate as decimal (default: 0.0)
  • risk_free_rate (float): Risk-free rate for Sharpe (default: 0.02)
  • venue (str): Trading venue (default: "SIM")

Example:

from hiveq.flow import BacktestConfig

config = BacktestConfig(
    symbols=['AAPL', 'GOOGL'],
    start_date='2025-01-01',
    end_date='2025-12-31',
    initial_capital=500000.0,
    commission=0.002,  # 0.2%
    slippage=0.0005,   # 0.05%
    risk_free_rate=0.045  # 4.5%
)

Data Configuration

Data configurations specify how to load market data and custom signals for backtesting and live trading.

HiveQ Historical Data

For Backtesting - Use HiveQ's managed historical data service.

Structure:

{
    'type': 'hiveq_historical',
    'dataset': str,        # Dataset identifier
    'schema': list[str]    # Data schema types
}

Available Datasets:

  • 'HIVEQ_US_EQ': US Equities (stocks)
  • 'HIVEQ_US_FUT': US Futures
  • 'HIVEQ_US_OPT': US Options
  • 'HIVEQ_US_IND': US Indices (SPX, VIX, NDX - not tradable, for reference only)
  • 'HIVEQ_US_ETF': US ETF Holdings/Constituents (use with hd.Historical.get_data(), not data_configs)
  • 'HIVEQ_QUANT_SIGNALS': HiveQ Quant Signals (alpha signals)

Available Schemas:

  • 'bars_1m': 1-minute OHLCV bars (smallest bar granularity)
  • 'bars_1d': Daily OHLCV bars
  • 'eq_trades': Equity trade ticks (HIVEQ_US_EQ only)
  • 'fut_trades': Futures trade ticks (HIVEQ_US_FUT only)
  • 'tbbo': Top of Book + Trades (trade ticks + quote ticks)
  • 'snaps_1s': 1-second snapshots (for options)
  • 'indices_values': Index price data (for HIVEQ_US_IND)
  • 'etf_master': ETF holdings/constituents data (for HIVEQ_US_ETF - use with hd.Historical.get_data())

Examples:

# 1-minute bars for equities
data_configs=[{
    'type': 'hiveq_historical',
    'dataset': 'HIVEQ_US_EQ',
    'schema': ['bars_1m']
}]

# TBBO data for high-frequency trading
data_configs=[{
    'type': 'hiveq_historical',
    'dataset': 'HIVEQ_US_EQ',
    'schema': ['tbbo']
}]

# Options snapshots
data_configs=[{
    'type': 'hiveq_historical',
    'dataset': 'HIVEQ_US_OPT',
    'schema': ['snaps_1s']
}]

# Multiple schemas
data_configs=[{
    'type': 'hiveq_historical',
    'dataset': 'HIVEQ_US_EQ',
    'schema': ['bars_1m', 'tbbo']
}]

CSV Data

For Custom Data - Load data from local CSV files.

Structure:

{
    'type': 'csv',
    'data_type': str,    # 'bars_1m', 'bars_1d', or 'custom'
    'id': str,           # Unique identifier
    'path': str          # Relative path to ~/.hiveq/data/ or absolute path
}

CSV Format for Bars:

timestamp,symbol,open,high,low,close,volume
2025-08-01 09:30:00,AAPL,180.50,181.00,180.25,180.75,1000000
2025-08-01 09:31:00,AAPL,180.75,181.25,180.50,181.00,1100000

CSV Format for Custom Data:

timestamp,symbol,signal,confidence,feature1,feature2
2025-08-01 09:30:00,AAPL,1,0.85,0.123,-0.456
2025-08-01 09:31:00,AAPL,0,0.32,0.089,-0.221

Examples:

# Bars from CSV
data_configs=[{
    'type': 'csv',
    'data_type': 'bars_1m',
    'id': '1_MIN_BAR',
    'path': 'bars/AAPL_bars.csv'
}]

# Custom signals
data_configs=[{
    'type': 'csv',
    'data_type': 'custom',
    'id': 'MLSignals',
    'path': 'signals/predictions.csv'
}]

# Multiple data sources
data_configs=[
    {
        'type': 'hiveq_historical',
        'dataset': 'HIVEQ_US_EQ',
        'schema': ['bars_1m']
    },
    {
        'type': 'csv',
        'data_type': 'custom',
        'id': 'UserSignals',
        'path': 'signals/custom.csv'
    }
]

Combining Data Sources

You can combine multiple data sources in a single strategy:

# Example: Backtest with market data + custom signals
report = hf.run_backtest(
    strategy_configs=[config],
    symbols=['AAPL'],
    start_date='2025-01-01',
    end_date='2025-01-31',
    data_configs=[
        # Market data
        {
            'type': 'hiveq_historical',
            'dataset': 'HIVEQ_US_EQ',
            'schema': ['bars_1m', 'tbbo']
        },
        # Custom signals
        {
            'type': 'csv',
            'data_type': 'custom',
            'id': 'MLPredictions',
            'path': 'signals/ml_predictions.csv'
        }
    ]
)

Trading Types

Order

Order object returned by order placement methods.

Attributes:

  • order_id (str): Unique order identifier
  • symbol (str): Trading symbol
  • side (OrderSide): BUY or SELL
  • quantity (float): Order quantity
  • filled_qty (float): Filled quantity
  • avg_px (float): Average fill price
  • status (OrderStatus): Order status
  • order_type (OrderType): MARKET, LIMIT, STOP

OrderSide

Order side enumeration.

Values:

  • OrderSide.BUY: Buy order
  • OrderSide.SELL: Sell order

OrderType

Order type enumeration.

Values:

  • OrderType.MARKET: Market order
  • OrderType.LIMIT: Limit order
  • OrderType.STOP: Stop order

OrderStatus

Order status enumeration.

Values:

  • OrderStatus.SUBMITTED: Order submitted
  • OrderStatus.ACCEPTED: Order accepted
  • OrderStatus.FILLED: Order filled
  • OrderStatus.REJECTED: Order rejected
  • OrderStatus.CANCELED: Order canceled

Event Types

EventType

Event type enumeration for strategy callbacks.

Market Data Events:

  • EventType.BAR: Bar (OHLCV) data event
  • EventType.TICK: Tick data event
  • EventType.QUOTE: Quote data event (bid/ask with sizes)
  • EventType.TRADE: Trade tick event (individual trades with aggressor side)
  • EventType.SNAP: Snapshot event (for ultra-short interval data like 1s)

Lifecycle Events:

  • EventType.START: Strategy start event
  • EventType.STOP: Strategy stop event

Order Events:

  • EventType.ORDER: Generic order event
  • EventType.ORDER_SUBMITTED: Order submitted
  • EventType.ORDER_ACCEPTED: Order accepted by exchange
  • EventType.ORDER_REJECTED: Order rejected
  • EventType.ORDER_FILLED: Order filled
  • EventType.ORDER_CANCELED: Order canceled

Position Events:

  • EventType.POSITION_OPENED: Position opened
  • EventType.POSITION_CHANGED: Position changed
  • EventType.POSITION_CLOSED: Position closed

Other Events:

  • EventType.CUSTOM_DATA: Custom data event
  • EventType.TIMER: Timer event

Corresponding Callback Methods:

# Per-event callbacks (recommended for type safety)
def on_start(self, ctx: Context, event: StartEvent): ...
def on_bar(self, ctx: Context, event: BarEvent): ...
def on_trade(self, ctx: Context, event: TradeEvent): ...      # NEW: For trade ticks
def on_quote(self, ctx: Context, event: QuoteEvent): ...      # NEW: For quote ticks
def on_snap(self, ctx: Context, event: SnapEvent): ...        # NEW: For snapshot data
def on_order(self, ctx: Context, event: OrderEvent): ...      # Handles all order events (filled, rejected, etc.)
def on_position(self, ctx: Context, event: PositionEvent): ...
def on_custom_data(self, ctx: Context, event: CustomDataEvent): ...
def on_timer(self, ctx: Context, event: TimerEvent): ...
def on_stop(self, ctx: Context, event: StopEvent): ...

# Or single handler for all events
def on_hiveq_event(self, ctx: Context, event: Event):
    if event.type == EventType.TRADE:
        trade = event.data()
        ...

AssetType

Asset type enumeration.

Values:

  • AssetType.EQUITY: Equities/stocks
  • AssetType.OPTIONS: Options contracts
  • AssetType.FUTURES: Futures contracts
  • AssetType.CRYPTO: Cryptocurrencies

On this page