API Reference
HiveQ Flow API Reference
Complete API reference for HiveQ Flow trading system (v0.2.1).
Table of Contents
- Environment Setup
- Quick Reference
- Core Functions
- Context API
- Configuration
- Trading Types
- Event Types
Environment Setup
HiveQ Flow auto-initializes from environment variables. No manual init() call is required.
Required Environment Variables
| Variable | Description |
|---|---|
HIVEQ_API_KEY | API key for authentication |
HIVEQ_USER_NAME | User's name (used as trader_id) |
HIVEQ_USER_ID | User ID |
HIVEQ_ORG_ID | Organization ID |
Set these in a .env file in your project root or export them in your shell:
export HIVEQ_API_KEY="your_api_key"
export HIVEQ_USER_NAME="YourName"
export HIVEQ_USER_ID="your_user_id"
export HIVEQ_ORG_ID="your_org_id"When you call any HiveQ Flow function (e.g., run_backtest()), the system automatically reads these environment variables and initializes.
Quick Reference
Common Subscriptions
# Bars (OHLCV)
ctx.subscribe_bars(symbols=['AAPL'], asset_type=AssetType.EQUITY, interval='1m')
ctx.subscribe_futures_bars(symbols=['ES.c.0'], interval='1m') # Continuous futures
# Options snapshots
ctx.subscribe_option_snaps('SPX', strike=5000, option_type='C', expiration_type='0dte', interval='1s')
ctx.subscribe_option_snaps('SPX', interval='1s') # All strikes, types, expirations
# TBBO (top-of-book + trades)
ctx.subscribe_tbbo(symbols=['AAPL'], asset_type=AssetType.EQUITY)
ctx.subscribe_futures_tbbo(symbols=['ES.c.0'])
# Ticks
ctx.subscribe_trade_ticks(symbols=['AAPL'], asset_type=AssetType.EQUITY)
ctx.subscribe_quotes(symbols=['AAPL'], asset_type=AssetType.EQUITY)
# Custom data (signals, ML predictions)
ctx.subscribe_data(data_id='kafka_signals')Common Callbacks
def on_start(self, ctx: Context, event: StartEvent): # Initialize, subscribe
def on_bar(self, ctx: Context, event: BarEvent): # Bar data
def on_snap(self, ctx: Context, event: SnapEvent): # Options snapshot
def on_trade(self, ctx: Context, event: TradeEvent): # Trade tick
def on_quote(self, ctx: Context, event: QuoteEvent): # Quote update
def on_custom_data(self, ctx: Context, event: CustomDataEvent): # Custom data/signals
def on_order(self, ctx: Context, event: OrderEvent): # Order events (filled, rejected, etc.)Common Context Methods
# Orders
order = ctx.buy_order(symbol, quantity)
order = ctx.sell_order(symbol, quantity)
order = ctx.buy_order(symbol, quantity, order_type=OrderType.LIMIT, limit_price=150.00)
order = ctx.buy_order(symbol, quantity, order_type=OrderType.MOO) # Market-on-open
order = ctx.sell_order(symbol, quantity, order_type=OrderType.MOC) # Market-on-close
# Positions
pos = ctx.net_position(symbol) # Current position quantity
is_flat = ctx.is_flat(symbol) # True if no position
is_long = ctx.is_net_long(symbol) # True if long position
is_short = ctx.is_net_short(symbol) # True if short position
# Portfolio
portfolio = ctx.portfolio()
pnl = portfolio.total_pnl() # Total P&L
cash = portfolio.cash() # Available cashAsset Types
from hiveq.flow.config import AssetType
AssetType.EQUITY # Stocks: 'AAPL', 'GOOGL'
AssetType.FUTURES # Futures: 'ES.c.0', 'ESH25'
AssetType.OPTIONS # Options: Use subscribe_option_snaps()
AssetType.CRYPTO # Crypto: 'BTC-USD'Options Filtering
# Filter by strike
ctx.subscribe_option_snaps('SPX', strike=5000)
# Filter by type
ctx.subscribe_option_snaps('SPX', option_type='C') # Calls only
ctx.subscribe_option_snaps('SPX', option_type='P') # Puts only
# Filter by expiration
ctx.subscribe_option_snaps('SPX', expiration_type='0dte') # 0DTE only
ctx.subscribe_option_snaps('SPX', expiration_type='2025-01-17') # Specific date
# Combine filters
ctx.subscribe_option_snaps('SPX', strike=5000, option_type='C', expiration_type='0dte')Futures Continuous Contracts
'ES.v.0' # Volume-weighted front month (auto-rollover)
'ES.c.0' # Calendar front month (auto-rollover)
'ESH25' # Specific contract (March 2025)Core Functions
hiveq.flow.run_backtest()
Run a backtest with one or more trading strategies.
Signature:
def run_backtest(
strategy_configs: list[StrategyConfig],
symbols: Optional[list[str]] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
data_configs: Optional[list[dict]] = None,
backtest_config: Optional[BacktestConfig] = None,
strategy_type: str = 'intraday',
fetch_mode: str = 'daily'
) -> PerformanceReportParameters:
strategy_configs(list[StrategyConfig]): List of strategy configurationssymbols(list[str], optional): Trading symbols (e.g., ["AAPL", "GOOGL"])start_date(str, optional): Start date in 'YYYY-MM-DD' formatend_date(str, optional): End date in 'YYYY-MM-DD' formatdata_configs(list[dict], optional): Data source configurationsbacktest_config(BacktestConfig, optional): Complete backtest configurationstrategy_type(str): Strategy type -'intraday'(default) or'overnight'fetch_mode(str): Data fetch mode -'daily'(default) or'prefetch'(only for intraday)
Execution Modes:
| strategy_type | fetch_mode | Data Fetch | Execution | State Reset |
|---|---|---|---|---|
'intraday' | 'daily' | Day-by-day | Day-by-day | Each day |
'intraday' | 'prefetch' | Single fetch | Single run | Never |
'overnight' | (ignored) | Single fetch | Single run | Never |
intraday+daily: Best for intraday strategies. Fetches data day-by-day, runs strategy day-by-day, recreates strategy instance each day (automatic state reset).intraday+prefetch: Faster for shorter backtests. Single data fetch, single continuous run, no automatic state reset.overnight: For swing/position trading. Single data fetch, single continuous run, positions can be held overnight.
Data Config Structure:
{
'type': 'csv' | 'hiveq_historical',
'data_type': 'bars_1m' | 'custom',
'id': 'unique_identifier',
'path': 'relative/path/to/file.csv', # for CSV
'dataset': 'HIVEQ_US_EQ', # for hiveq_historical
'schema': ['bars_1m'], # for hiveq_historical
}Returns: PerformanceReport with backtest results
Example:
import hiveq.flow as hf
from hiveq.flow import StrategyConfig
# Configure strategy
strategy_configs = [
StrategyConfig(
name='SMA_Strategy',
type='SMACrossoverStrategy',
params={'fast_window': 10, 'slow_window': 30}
)
]
# Run backtest with CSV data
report = hf.run_backtest(
strategy_configs=strategy_configs,
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-30',
data_configs=[{
'type': 'csv',
'data_type': 'bars_1m',
'id': '1_MIN_BAR',
'path': 'bars/AAPL_bars.csv'
}]
)
# Run backtest with HiveQ historical data (default: intraday + daily)
report = hf.run_backtest(
strategy_configs=strategy_configs,
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-30',
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m'],
}]
)
# Run backtest with prefetch mode (faster for shorter backtests)
report = hf.run_backtest(
strategy_configs=strategy_configs,
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-30',
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m'],
}],
strategy_type='intraday',
fetch_mode='prefetch'
)
# Run overnight strategy (swing trading)
report = hf.run_backtest(
strategy_configs=strategy_configs,
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-30',
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m'],
}],
strategy_type='overnight'
)
# Access results
print(report.return_stats.to_string())
print(report.positions.to_string())
print(report.fills.to_string())hiveq.flow.run_live()
Run live trading with one or more strategies.
Signature:
def run_live(
strategy_configs: list[StrategyConfig],
symbols: list[str],
data_configs: list[dict],
live_config: Optional[LiveConfig] = None
) -> NoneParameters:
strategy_configs(list[StrategyConfig]): Strategy configurationssymbols(list[str]): Trading symbolsdata_configs(list[dict]): Live data client configurationslive_config(LiveConfig, optional): Live trading configuration
Data Config for Live Trading:
{
'type': 'databento',
'id': 'databento_client',
'api_key': 'db-xxx',
'venue_dataset_map': {
'SIM': 'EQUS.MINI',
'XCME': 'GLBX.MDP3'
}
}Example:
import hiveq.flow as hf
from hiveq.flow import StrategyConfig
strategy_configs = [
StrategyConfig(
name='SMALive',
type='SMACrossoverStrategy'
)
]
hf.run_live(
strategy_configs=strategy_configs,
symbols=['AAPL'],
data_configs=[{
'type': 'databento',
'id': 'databento_client',
'api_key': 'db-xxx',
'venue_dataset_map': {"SIM": "EQUS.MINI"}
}]
)hiveq.flow.deploy_backtest()
Deploy a backtest to run remotely on the HiveQ orchestrator.
Signature:
def deploy_backtest(
strategy_configs: list[StrategyConfig],
symbols: Optional[list[str]] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
data_configs: Optional[list[dict]] = None,
backtest_config: Optional[BacktestConfig] = None,
task_name: Optional[str] = None,
priority: int = 5,
allow_duplicate: bool = True,
duplicate_action: str = 'override'
) -> dictParameters:
strategy_configs(list[StrategyConfig]): List of strategy configurationssymbols(list[str], optional): Trading symbolsstart_date(str, optional): Start date in 'YYYY-MM-DD' formatend_date(str, optional): End date in 'YYYY-MM-DD' formatdata_configs(list[dict], optional): Data source configurationsbacktest_config(BacktestConfig, optional): Complete backtest configurationtask_name(str, optional): Unique name for this taskpriority(int): Task priority (0-10, higher = more important)allow_duplicate(bool): Allow submission if task with same name existsduplicate_action(str): Action for existing task: 'override', 'terminate', or 'duplicate'
Returns: Dictionary containing:
status: 'success' or 'error'task_id: The task ID for tracking (if status is 'success')message: Error message (if status is 'error')
Example:
import hiveq.flow as hf
from hiveq.flow import StrategyConfig
# Deploy backtest to run remotely
result = hf.deploy_backtest(
strategy_configs=[StrategyConfig(name='MyStrategy', type='MyStrategy')],
symbols=['AAPL'],
start_date='2025-01-01',
end_date='2025-01-31',
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m']
}]
)
if result['status'] == 'success':
print(f"Backtest deployed with task_id: {result['task_id']}")hiveq.flow.event_logs()
Get event logs for all strategies.
Signature:
def event_logs() -> pd.DataFrameReturns: DataFrame with columns:
timestamp: Event timestampstrategy_id: Strategy identifierevent_type: Type of eventsymbol: Symbol (if applicable)message: Event message- Additional event-specific fields
Example:
import hiveq.flow as hf
# After running backtest
report = hf.run_backtest(...)
# Get event logs
event_df = hf.event_logs()
# Filter by strategy
strategy_logs = event_df[event_df['strategy_id'] == 'my_strategy']
# Filter by event type
filled_orders = event_df[event_df['event_type'] == 'ORDER_FILLED']
print(event_df.to_string())hiveq.flow.config()
Get the current engine configuration.
Signature:
def config() -> EngineConfigReturns: EngineConfig - The current engine configuration including:
oms: OMS type ('NAUTILUS')oms_log_level: OMS logging level- Other engine settings
Example:
import hiveq.flow as hf
cfg = hf.config()
print(f"OMS: {cfg.oms}")hiveq.flow.backtest_config()
Get the current backtest configuration.
Signature:
def backtest_config() -> BacktestConfigReturns: BacktestConfig - The backtest configuration used for current/recent run including:
start_date,end_date: Date rangeinitial_capital: Starting capitalcommission,slippage: Trading costsdeploy: Deploy mode flag
Example:
import hiveq.flow as hf
report = hf.run_backtest(...)
config = hf.backtest_config()
print(f"Start: {config.start_date}, Capital: ${config.initial_capital:,.2f}")hiveq.flow.execution_mode()
Get the current execution mode.
Signature:
def execution_mode() -> ExecutionModeReturns: ExecutionMode enum value:
ExecutionMode.BACKTEST: Historical backtestingExecutionMode.LIVE: Live trading (usepaper_tradingflag in LiveConfig for paper trading)
Example:
import hiveq.flow as hf
from hiveq.flow.config import ExecutionMode
report = hf.run_backtest(...)
mode = hf.execution_mode()
if mode == ExecutionMode.BACKTEST:
print("Running in backtest mode")hiveq.flow.logger()
Get the HiveQ Flow logger instance.
Signature:
def logger() -> LoggerReturns: Logger instance
Example:
import hiveq.flow as hf
logger = hf.logger()
logger.info("Strategy initialized")
logger.debug("Processing bar data")
logger.warning("High volatility detected")
logger.error("Order rejected")Context API
The Context object is the primary interface for strategies to interact with the trading system.
Market Data Subscriptions
ctx.subscribe_bars()
Subscribe to bar (OHLCV) data for specified symbols.
Signature:
def subscribe_bars(
symbols: List[str],
asset_type: AssetType,
interval: str
) -> NoneParameters:
symbols(list[str]): List of trading symbolsasset_type(AssetType): Asset type (EQUITY, FUTURES, OPTIONS). Required.interval(str): Bar interval. Required.- Seconds: "1s", "5s", "30s"
- Minutes: "1m", "5m", "15m", "30m"
- Hours: "1h", "4h"
- Days: "1d"
Example:
def on_start(ctx: hf.Context, event: hf.StartEvent):
# Subscribe to 1-minute bars for equities
ctx.subscribe_bars(["AAPL", "GOOGL"], asset_type=AssetType.EQUITY, interval="1m")
# Subscribe to 30-minute bars for futures
ctx.subscribe_bars(["ES.H25"], asset_type=AssetType.FUTURES, interval="30m")
# Subscribe to 1-hour bars for options
ctx.subscribe_bars(["SPY"], asset_type=AssetType.OPTIONS, interval="1h")ctx.subscribe_data()
Subscribe to custom data events.
Signature:
def subscribe_data(data_id: str, signals: List[str] = None) -> NoneParameters:
data_id(str): Identifier matching the 'id' in data_configsignals(List[str], optional): List of signal symbols to fetch from the API. Used during prefetch phase to dynamically specify which signals to load. If not specified, thesymbolsfield in data_config is used.
Example 1: Static signals (symbols in data_config)
# Signals specified in data_config
report = hf.run_backtest(
strategy_configs=[config],
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_QUANT_SIGNALS',
'schema': ['signals'],
'id': 'UserSignals',
'symbols': ['signal_1', 'signal_2'] # Static specification
}]
)
# In strategy:
def on_start(ctx: hf.Context, event: hf.StartEvent):
ctx.subscribe_data("UserSignals") # No signals param neededExample 2: Dynamic signals (signals in subscribe_data)
# No 'symbols' in data_config - specified dynamically in strategy
report = hf.run_backtest(
strategy_configs=[config],
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_QUANT_SIGNALS',
'schema': ['signals'],
'id': 'UserSignals' # No 'symbols' here
}]
)
# In strategy - signals determined at runtime
def on_start(ctx: hf.Context, event: hf.StartEvent):
# Signals captured during prefetch and used to filter API fetch
ctx.subscribe_data("UserSignals", signals=['signal_1', 'signal_2'])
def on_custom_data(ctx: hf.Context, event: hf.CustomDataEvent):
data = event.data()
symbol = data.column_data('symbol')
signal_json = data.column_data('signal_json')Priority: If both data_config['symbols'] and signals parameter are specified, data_config['symbols'] takes priority.
ctx.subscribe_tbbo()
Subscribe to TBBO (Top of Book + Trades) data for real-time trade and quote ticks.
Signature:
def subscribe_tbbo(
symbols: List[str],
asset_type: AssetType = AssetType.EQUITY
) -> NoneParameters:
symbols(list[str]): List of trading symbolsasset_type(AssetType): Asset type (default: EQUITY)
What is TBBO? TBBO provides both:
- Trade ticks: Individual trade executions with price, size, and aggressor side
- Quote ticks: Best bid/offer (L1 quotes) with prices and sizes
Ideal for high-frequency strategies needing both trade flow and top-of-book information.
Example:
def on_start(ctx: hf.Context, event: hf.StartEvent):
# Subscribe to TBBO data
ctx.subscribe_tbbo(symbols=['AAPL', 'GOOGL'])
def on_trade(self, ctx: hf.Context, event: hf.TradeEvent):
"""Handle trade tick events."""
trade = event.data()
# trade.symbol, trade.price, trade.size
# trade.aggressor_side: 'BUYER', 'SELLER', or 'NO_AGGRESSOR'
# trade.trade_id, trade.ts_event
if trade.aggressor_side == "BUYER":
print(f"Aggressive buying: {trade.size} @ ${trade.price}")
def on_quote(self, ctx: hf.Context, event: hf.QuoteEvent):
"""Handle quote tick events."""
quote = event.data()
# quote.symbol, quote.bid_price, quote.ask_price
# quote.bid_size, quote.ask_size, quote.ts_event
spread = quote.ask_price - quote.bid_price
mid_price = (quote.bid_price + quote.ask_price) / 2
print(f"Spread: ${spread:.2f}, Mid: ${mid_price:.2f}")Data Config for TBBO:
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['tbbo'] # or 'trades'
}]ctx.subscribe_option_snaps()
Subscribe to options snapshots with flexible filtering (strike, type, expiration).
Signature:
def subscribe_option_snaps(
symbol: str, # Underlying symbol
option_type: Optional[str] = None, # 'C'/'Call', 'P'/'Put', or None (both)
strike: Optional[float] = None, # Specific strike or None (all)
expiration_type: Optional[str] = None, # '0dte', 'YYYYMMDD', 'YYYY-MM-DD', or None (all)
interval: str = '1s'
) -> NoneParameters:
symbol(str): Underlying symbol (e.g., 'SPXW', 'SPY')option_type(str, optional): 'C'/'Call' for calls, 'P'/'Put' for puts, None for bothstrike(float, optional): Specific strike price or None for all strikesexpiration_type(str, optional): '0dte', 'YYYYMMDD' (e.g., '20250117'), 'YYYY-MM-DD', or None for allinterval(str): Snapshot interval (default: '1s')
Examples:
def on_start(ctx: hf.Context, event: hf.StartEvent):
# All SPXW 0DTE options
ctx.subscribe_option_snaps('SPXW', expiration_type='0dte')
# Only calls, specific strike
ctx.subscribe_option_snaps('SPXW', option_type='C', strike=4500.0, expiration_type='0dte')
# Specific date
ctx.subscribe_option_snaps('SPY', expiration_type='2025-01-17', interval='1m')
def on_snap(ctx: hf.Context, event: hf.SnapEvent):
snap = event.data()
# snap.chain (full symbol), snap.strike, snap.price, snap.bid, snap.ask
print(f"{snap.chain} Strike={snap.strike} Price={snap.price}")Data Config for Snaps:
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_OPT',
'schema': ['snaps_1s']
}]ctx.subscribe_futures_bars()
Subscribe to futures bar data with automatic rollover for continuous contracts.
Signature:
def subscribe_futures_bars(symbols: List[str], interval: str) -> NoneParameters:
symbols(list[str]): Futures symbols. Supports continuous notation: 'ES.v.0' (front month), 'ES.c.0' (calendar), or specific 'ESH25'interval(str): Bar interval
Example:
def on_start(ctx: hf.Context, event: hf.StartEvent):
ctx.subscribe_futures_bars(['ES.v.0', 'NQ.v.0'], '1m') # Continuous with rollover
ctx.subscribe_futures_bars(['ESH25'], '5m') # Specific contractctx.subscribe_futures_tbbo()
Subscribe to futures TBBO (quotes + trades).
Signature:
def subscribe_futures_tbbo(symbols: List[str]) -> NoneExample:
def on_start(ctx: hf.Context, event: hf.StartEvent):
ctx.subscribe_futures_tbbo(['ES.v.0', 'NQ.v.0'])ctx.subscribe_trade_ticks()
Subscribe to individual trade executions.
Signature:
def subscribe_trade_ticks(symbols: List[str], asset_type: AssetType = AssetType.EQUITY) -> NoneExample:
def on_start(ctx: hf.Context, event: hf.StartEvent):
ctx.subscribe_trade_ticks(['AAPL'], AssetType.EQUITY)
def on_trade(ctx: hf.Context, event: hf.TradeEvent):
trade = event.data()
print(f"{trade.symbol} Trade: {trade.price} x {trade.size}")ctx.subscribe_quotes()
Subscribe to bid/ask quote updates.
Signature:
def subscribe_quotes(symbols: List[str], asset_type: AssetType = AssetType.EQUITY) -> NoneExample:
def on_start(ctx: hf.Context, event: hf.StartEvent):
ctx.subscribe_quotes(['AAPL'], AssetType.EQUITY)
def on_quote(ctx: hf.Context, event: hf.QuoteEvent):
quote = event.data()
print(f"Bid: {quote.bid_price} x {quote.bid_size}, Ask: {quote.ask_price} x {quote.ask_size}")Order Management
ctx.buy_order()
Place a buy order to enter long or cover short position.
Signature:
def buy_order(
symbol: str,
quantity: float,
order_type: OrderType = OrderType.MARKET,
limit_price: Optional[float] = None,
stop_price: Optional[float] = None
) -> OrderParameters:
symbol(str): Trading symbolquantity(float): Number of shares/contracts (must be positive)order_type(OrderType): MARKET, LIMIT, or STOPlimit_price(float, optional): Required for LIMIT ordersstop_price(float, optional): Required for STOP orders
Returns: Order object with order_id, symbol, quantity, prices
Example:
def on_bar(ctx: hf.Context, event: hf.BarEvent):
bar = event.data()
# Market order
order = ctx.buy_order("AAPL", quantity=100)
print(f"Order ID: {order.order_id}")
# Limit order
order = ctx.buy_order(
"AAPL",
quantity=100,
order_type=OrderType.LIMIT,
limit_price=150.50
)
# Stop order
order = ctx.buy_order(
"AAPL",
quantity=100,
order_type=OrderType.STOP,
stop_price=155.00
)ctx.sell_order()
Place a sell order to close long position or reduce exposure.
Signature:
def sell_order(
symbol: str,
quantity: float,
order_type: OrderType = OrderType.MARKET,
limit_price: Optional[float] = None,
stop_price: Optional[float] = None
) -> OrderParameters: Same as buy_order()
Example:
def on_bar(ctx: hf.Context, event: hf.BarEvent):
if ctx.is_net_long("AAPL"):
# Exit with market order
order = ctx.sell_order("AAPL", quantity=100)
# Or use limit order
order = ctx.sell_order(
"AAPL",
quantity=100,
order_type=OrderType.LIMIT,
limit_price=152.00
)ctx.cancel_order()
Cancel an existing pending order.
Signature:
def cancel_order(order_id: str) -> boolParameters:
order_id(str): Order ID from buy_order/sell_order
Returns: True if cancellation successful, False otherwise
Example:
def on_bar(ctx: hf.Context, event: hf.BarEvent):
bar = event.data()
# Place limit order
order = ctx.buy_order(
"AAPL",
quantity=100,
order_type=OrderType.LIMIT,
limit_price=150.00
)
# Cancel if price moves away
if bar.close > 152.00:
if ctx.cancel_order(order.order_id):
print("Order canceled successfully")Position Management
ctx.net_position()
Get current net position quantity for a symbol.
Signature:
def net_position(symbol: str) -> floatReturns: Net position (positive=long, negative=short, 0=flat)
Example:
def on_bar(ctx: hf.Context, event: hf.BarEvent):
position = ctx.net_position("AAPL")
print(f"Position: {position}")
# 100.0 = long 100 shares
# -50.0 = short 50 shares
# 0.0 = flatctx.is_flat()
Check if position is flat (zero).
Signature:
def is_flat(symbol: str) -> boolReturns: True if no position, False otherwise
Example:
def on_bar(ctx: hf.Context, event: hf.BarEvent):
if ctx.is_flat("AAPL"):
# Enter new position
ctx.buy_order("AAPL", quantity=100)ctx.is_net_long()
Check if holding net long position.
Signature:
def is_net_long(symbol: str) -> boolExample:
def on_bar(ctx: hf.Context, event: hf.BarEvent):
if ctx.is_net_long("AAPL"):
pos = ctx.net_position("AAPL")
ctx.sell_order("AAPL", quantity=pos)ctx.is_net_short()
Check if holding net short position.
Signature:
def is_net_short(symbol: str) -> boolExample:
def on_bar(ctx: hf.Context, event: hf.BarEvent):
if ctx.is_net_short("AAPL"):
pos = abs(ctx.net_position("AAPL"))
ctx.buy_order("AAPL", quantity=pos)ctx.has_open_order()
Check if there are pending/open orders.
Signature:
def has_open_order(symbol: str = None) -> boolParameters:
symbol(str, optional): Symbol to check. If None, checks all symbols.
Example:
def on_bar(ctx: hf.Context, event: hf.BarEvent):
# Only place new order if no pending orders
if not ctx.has_open_order("AAPL"):
ctx.buy_order("AAPL", quantity=100)ctx.open_order_qty()
Get net quantity of all pending orders.
Signature:
def open_order_qty(symbol: str = None) -> floatReturns: Net pending order quantity (positive=buy, negative=sell)
Example:
def on_bar(ctx: hf.Context, event: hf.BarEvent):
pending = ctx.open_order_qty("AAPL")
current_pos = ctx.net_position("AAPL")
# Total exposure = position + pending
total_exposure = current_pos + pending
if total_exposure < 1000:
ctx.buy_order("AAPL", quantity=100)Portfolio Information
ctx.portfolio()
Get strategy-filtered portfolio.
Signature:
def portfolio() -> PortfolioReturns: Portfolio object with strategy-filtered positions and P&L
Example:
def on_bar(ctx: hf.Context, event: hf.BarEvent):
portfolio = ctx.portfolio()
balance = portfolio.cash()
position = portfolio.net_position("AAPL")
is_long = portfolio.is_net_long("AAPL")
is_flat = portfolio.is_flat("AAPL")ctx.global_portfolio()
Get account-level portfolio across all strategies.
Signature:
def global_portfolio() -> GlobalPortfolioReturns: Global portfolio with aggregate data
Example:
def on_bar(ctx: hf.Context, event: hf.BarEvent):
# Strategy view
strategy_pnl = ctx.portfolio().total_pnl()
# Global account view
global_portfolio = ctx.global_portfolio()
account_pnl = global_portfolio.total_pnl()
total_exposure = global_portfolio.net_exposure()Instrument Access
ctx.instrument()
Get instrument by symbol.
Signature:
def instrument(symbol: str) -> InstrumentReturns: Instrument object with properties and last values
Instrument Attributes:
symbol(str): Trading symbollast_bar(Bar): Most recent barmultiplier(float): Contract multipliermin_tick(float): Minimum price movementasset_type(AssetType): Asset typesecurity_details(DataFrame): Full reference datanative_instrument_id: Internal OMS identifier
Example:
def on_bar(ctx: hf.Context, event: hf.BarEvent):
inst = ctx.instrument('AAPL')
# Access last bar
if inst.last_bar:
print(f"Last close: {inst.last_bar.close}")
# Check properties
print(f"Tick size: {inst.min_tick}")
print(f"Multiplier: {inst.multiplier}")
# Calculate notional
position_size = 100
price = inst.last_bar.close
notional = position_size * price * inst.multiplierTimers
ctx.set_timer()
Set a repeating timer.
Signature:
def set_timer(timer_id: str, timer_interval: pd.Timedelta) -> NoneParameters:
timer_id(str): Unique timer identifiertimer_interval(pd.Timedelta): Interval between timer events
Example:
def on_start(ctx: hf.Context, event: hf.StartEvent):
ctx.set_timer('status_report', pd.Timedelta(minutes=5))
def on_hiveq_event(ctx: hf.Context, event: hf.Event):
if event.type == EventType.TIMER:
timer = event.data()
if timer.timer_id == 'status_report':
print("5 minutes elapsed")ctx.cancel_timer()
Cancel a running timer.
Signature:
def cancel_timer(timer_id: str) -> NoneExample:
ctx.cancel_timer('my_timer')Event Logging
ctx.add_event_log()
Add custom event log entry.
Signature:
def add_event_log(
message: str,
sub_event_type: Optional[str] = None,
symbol: Optional[str] = None,
state_variable: Optional[dict] = None
) -> NoneParameters:
message(str): Descriptive messagesub_event_type(str, optional): Event sub-typesymbol(str, optional): Associated symbolstate_variable(dict, optional): Additional state variables
Example:
def on_bar(self, ctx: Context, event: BarEvent):
bar = event.data()
# Log signal
ctx.add_event_log(
message="Bullish signal detected",
symbol=bar.symbol,
state_variable={"price": bar.close}
)
# Log trade entry
ctx.add_event_log(
message="Entering long position",
symbol=bar.symbol
)Strategy Callbacks
Event handler methods called by the framework. Implement these in your strategy class.
on_start(ctx, event)
Called once when strategy starts. Use for subscriptions and initialization.
def on_start(self, ctx: Context, event: StartEvent):
ctx.subscribe_bars(['AAPL'], AssetType.EQUITY, '1m')on_bar(ctx, event)
Called for each bar (OHLCV) update.
def on_bar(self, ctx: Context, event: BarEvent):
bar = event.data() # or event.bar
# bar.symbol, bar.open, bar.high, bar.low, bar.close, bar.volumeon_trade(ctx, event)
Called for each trade tick.
def on_trade(self, ctx: Context, event: TradeEvent):
trade = event.data()
# trade.symbol, trade.price, trade.size, trade.aggressor_sideon_quote(ctx, event)
Called for each quote (bid/ask) update.
def on_quote(self, ctx: Context, event: QuoteEvent):
quote = event.data()
# quote.symbol, quote.bid_price, quote.ask_price, quote.bid_size, quote.ask_sizeon_snap(ctx, event)
Called for each options snapshot.
def on_snap(self, ctx: Context, event: SnapEvent):
snap = event.data()
# snap.chain, snap.strike, snap.price, snap.bid, snap.ask, snap.greekson_custom_data(ctx, event)
Called for custom data events (signals, ML predictions, etc.).
def on_custom_data(self, ctx: Context, event: CustomDataEvent):
data = event.data()
signal = data.column_data('signal1', default=0)
weight = data.column_data('weight1', default=0.0)on_order(ctx, event)
Called for all order state changes (filled, rejected, accepted, etc.).
def on_order(self, ctx: Context, event: OrderEvent):
if event.type == EventType.ORDER_FILLED:
order = event.data()
fill = order.last_fill
# fill.symbol, fill.last_qty, fill.last_px
elif event.type == EventType.ORDER_REJECTED:
order = event.data()
# Handle rejectionNote: Use on_order() to handle all order events. Check event.type to distinguish
between ORDER_FILLED, ORDER_REJECTED, ORDER_CANCELED, etc. Individual callbacks like
on_order_filled or on_fill do not exist.
def on_order(self, ctx: Context, event: OrderEvent):
order = event.data()
if event.type == EventType.ORDER_FILLED:
fill = order.last_fill
print(f"Filled: {fill.last_qty} @ {fill.last_px}")
elif event.type == EventType.ORDER_REJECTED:
print(f"Rejected: {order.symbol} {order.order_id}")on_position(ctx, event)
Called when position changes.
def on_position(self, ctx: Context, event: PositionEvent):
position = event.data()
# position.symbol, position.quantity, position.avg_pxon_timer(ctx, event)
Called at set intervals (use ctx.set_timer() to configure).
def on_timer(self, ctx: Context, event: TimerEvent):
# Periodic checks, rebalancing, etc.on_stop(ctx, event)
Called when strategy stops.
def on_stop(self, ctx: Context, event: StopEvent):
# Cleanup, close positions, etc.Unified Event Handler (Alternative Pattern)
Instead of individual callbacks, implement one handler for all events:
def on_hiveq_event(self, ctx: Context, event: Event):
if event.type == EventType.START:
ctx.subscribe_bars(['AAPL'], AssetType.EQUITY, '1m')
elif event.type == EventType.BAR:
bar = event.data()
# Handle bar
elif event.type == EventType.ORDER_FILLED:
order = event.data()
# Handle fill
elif event.type == EventType.CUSTOM_DATA:
data = event.data()
# Handle custom dataConfiguration
StrategyConfig
Strategy configuration class.
Attributes:
name(str): Strategy instance nametype(str): Strategy type/class namesymbols(list[str], optional): Trading symbolsparams(dict): Strategy-specific parameters
Example:
from hiveq.flow import StrategyConfig
config = StrategyConfig(
name='SMA_Crossover',
type='SMACrossoverStrategy',
symbols=['AAPL'],
params={
'fast_window': 10,
'slow_window': 30,
'trade_size': 100
}
)
# Access params as attributes
print(config.fast_window) # 10BacktestConfig
Backtest configuration class.
Attributes:
symbols(list[str]): Trading symbolsstart_date(str): Start date 'YYYY-MM-DD'end_date(str): End date 'YYYY-MM-DD'initial_capital(float): Starting capital (default: 1,000,000)commission(float): Commission rate as decimal (default: 0.001)slippage(float): Slippage rate as decimal (default: 0.0)risk_free_rate(float): Risk-free rate for Sharpe (default: 0.02)venue(str): Trading venue (default: "SIM")
Example:
from hiveq.flow import BacktestConfig
config = BacktestConfig(
symbols=['AAPL', 'GOOGL'],
start_date='2025-01-01',
end_date='2025-12-31',
initial_capital=500000.0,
commission=0.002, # 0.2%
slippage=0.0005, # 0.05%
risk_free_rate=0.045 # 4.5%
)Data Configuration
Data configurations specify how to load market data and custom signals for backtesting and live trading.
HiveQ Historical Data
For Backtesting - Use HiveQ's managed historical data service.
Structure:
{
'type': 'hiveq_historical',
'dataset': str, # Dataset identifier
'schema': list[str] # Data schema types
}Available Datasets:
'HIVEQ_US_EQ': US Equities (stocks)'HIVEQ_US_FUT': US Futures'HIVEQ_US_OPT': US Options'HIVEQ_US_IND': US Indices (SPX, VIX, NDX - not tradable, for reference only)'HIVEQ_US_ETF': US ETF Holdings/Constituents (use withhd.Historical.get_data(), notdata_configs)'HIVEQ_QUANT_SIGNALS': HiveQ Quant Signals (alpha signals)
Available Schemas:
'bars_1m': 1-minute OHLCV bars (smallest bar granularity)'bars_1d': Daily OHLCV bars'eq_trades': Equity trade ticks (HIVEQ_US_EQ only)'fut_trades': Futures trade ticks (HIVEQ_US_FUT only)'tbbo': Top of Book + Trades (trade ticks + quote ticks)'snaps_1s': 1-second snapshots (for options)'indices_values': Index price data (for HIVEQ_US_IND)'etf_master': ETF holdings/constituents data (for HIVEQ_US_ETF - use withhd.Historical.get_data())
Examples:
# 1-minute bars for equities
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m']
}]
# TBBO data for high-frequency trading
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['tbbo']
}]
# Options snapshots
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_OPT',
'schema': ['snaps_1s']
}]
# Multiple schemas
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m', 'tbbo']
}]CSV Data
For Custom Data - Load data from local CSV files.
Structure:
{
'type': 'csv',
'data_type': str, # 'bars_1m', 'bars_1d', or 'custom'
'id': str, # Unique identifier
'path': str # Relative path to ~/.hiveq/data/ or absolute path
}CSV Format for Bars:
timestamp,symbol,open,high,low,close,volume
2025-08-01 09:30:00,AAPL,180.50,181.00,180.25,180.75,1000000
2025-08-01 09:31:00,AAPL,180.75,181.25,180.50,181.00,1100000CSV Format for Custom Data:
timestamp,symbol,signal,confidence,feature1,feature2
2025-08-01 09:30:00,AAPL,1,0.85,0.123,-0.456
2025-08-01 09:31:00,AAPL,0,0.32,0.089,-0.221Examples:
# Bars from CSV
data_configs=[{
'type': 'csv',
'data_type': 'bars_1m',
'id': '1_MIN_BAR',
'path': 'bars/AAPL_bars.csv'
}]
# Custom signals
data_configs=[{
'type': 'csv',
'data_type': 'custom',
'id': 'MLSignals',
'path': 'signals/predictions.csv'
}]
# Multiple data sources
data_configs=[
{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m']
},
{
'type': 'csv',
'data_type': 'custom',
'id': 'UserSignals',
'path': 'signals/custom.csv'
}
]Combining Data Sources
You can combine multiple data sources in a single strategy:
# Example: Backtest with market data + custom signals
report = hf.run_backtest(
strategy_configs=[config],
symbols=['AAPL'],
start_date='2025-01-01',
end_date='2025-01-31',
data_configs=[
# Market data
{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m', 'tbbo']
},
# Custom signals
{
'type': 'csv',
'data_type': 'custom',
'id': 'MLPredictions',
'path': 'signals/ml_predictions.csv'
}
]
)Trading Types
Order
Order object returned by order placement methods.
Attributes:
order_id(str): Unique order identifiersymbol(str): Trading symbolside(OrderSide): BUY or SELLquantity(float): Order quantityfilled_qty(float): Filled quantityavg_px(float): Average fill pricestatus(OrderStatus): Order statusorder_type(OrderType): MARKET, LIMIT, STOP
OrderSide
Order side enumeration.
Values:
OrderSide.BUY: Buy orderOrderSide.SELL: Sell order
OrderType
Order type enumeration.
Values:
OrderType.MARKET: Market orderOrderType.LIMIT: Limit orderOrderType.STOP: Stop order
OrderStatus
Order status enumeration.
Values:
OrderStatus.SUBMITTED: Order submittedOrderStatus.ACCEPTED: Order acceptedOrderStatus.FILLED: Order filledOrderStatus.REJECTED: Order rejectedOrderStatus.CANCELED: Order canceled
Event Types
EventType
Event type enumeration for strategy callbacks.
Market Data Events:
EventType.BAR: Bar (OHLCV) data eventEventType.TICK: Tick data eventEventType.QUOTE: Quote data event (bid/ask with sizes)EventType.TRADE: Trade tick event (individual trades with aggressor side)EventType.SNAP: Snapshot event (for ultra-short interval data like 1s)
Lifecycle Events:
EventType.START: Strategy start eventEventType.STOP: Strategy stop event
Order Events:
EventType.ORDER: Generic order eventEventType.ORDER_SUBMITTED: Order submittedEventType.ORDER_ACCEPTED: Order accepted by exchangeEventType.ORDER_REJECTED: Order rejectedEventType.ORDER_FILLED: Order filledEventType.ORDER_CANCELED: Order canceled
Position Events:
EventType.POSITION_OPENED: Position openedEventType.POSITION_CHANGED: Position changedEventType.POSITION_CLOSED: Position closed
Other Events:
EventType.CUSTOM_DATA: Custom data eventEventType.TIMER: Timer event
Corresponding Callback Methods:
# Per-event callbacks (recommended for type safety)
def on_start(self, ctx: Context, event: StartEvent): ...
def on_bar(self, ctx: Context, event: BarEvent): ...
def on_trade(self, ctx: Context, event: TradeEvent): ... # NEW: For trade ticks
def on_quote(self, ctx: Context, event: QuoteEvent): ... # NEW: For quote ticks
def on_snap(self, ctx: Context, event: SnapEvent): ... # NEW: For snapshot data
def on_order(self, ctx: Context, event: OrderEvent): ... # Handles all order events (filled, rejected, etc.)
def on_position(self, ctx: Context, event: PositionEvent): ...
def on_custom_data(self, ctx: Context, event: CustomDataEvent): ...
def on_timer(self, ctx: Context, event: TimerEvent): ...
def on_stop(self, ctx: Context, event: StopEvent): ...
# Or single handler for all events
def on_hiveq_event(self, ctx: Context, event: Event):
if event.type == EventType.TRADE:
trade = event.data()
...AssetType
Asset type enumeration.
Values:
AssetType.EQUITY: Equities/stocksAssetType.OPTIONS: Options contractsAssetType.FUTURES: Futures contractsAssetType.CRYPTO: Cryptocurrencies