HiveQ Flow v0.2.1
HiveQ Flow is a Python framework that provides an intuitive interface for developing, testing, and deploying quantitative trading strategies. Whether you're backtesting ideas or running live strategies, HiveQ Flow simplifies the process with a clean, powerful API.
Quickstart
Setup HiveQ Flow
Install HiveQ Flow from the distribution / release directory and install the wheel package
pip install hiveq-flowRun your first example
Use one of our prebuilt strategy, passing in config values
import hiveq.flow as hf
from hiveq.flow import StrategyConfig
# Name is the instance name, and type is the class name of the strategy
strategy_configs = [StrategyConfig(name='TestUserSignal', type='UserSignalStrategy')]
# run the backtest
performance_report = hf.run_backtest(strategy_configs=strategy_configs, symbols=['AAPL'],
data_configs=[{'type': 'csv', 'data_type': 'bars_1m', 'id': '1_MIN_BAR',
'path': '/bars/AAPL_bars.csv'}
])Examples and tutorials
Data Configuration
The data_configs parameter defines the input datasets used to run a
HiveQ backtest flow. It is a list of dictionaries, where each
dictionary specifies one dataset to be loaded.
Example
data_configs = [
{
"type": "csv",
"data_type": "bars_1m",
"id": "1_MIN_BAR",
"path": "/bars/AAPL_bars_20250905.csv"
},
{
"type": "csv",
"data_type": "custom",
"id": "UserDataTest",
"path": "/userdata/user_data.csv"
}
]Configuration Fields
CSV Support
Each entry in data_configs supports the following keys:
| Key | Description |
|---|---|
type | Input file type. Currently supported: csv. |
data_type | Type of dataset: bars_1m (Market bar data) or user_data (Custom/user dataset) |
name | Unique identifier for the dataset (used inside the backtest). |
path | relative file path to the data. Base path is server defined |
Bar Data (data_type = "bars_1m")
Bar data represents time-series price and volume information for a symbol. HiveQ flow requires a minimum set of standardized columns:
['timestamp', 'open', 'high', 'low', 'close', 'volume', 'symbol']Your input CSV may use provider-specific headers. Flow automatically maps provider columns to these required fields.
Example CSV Header (Provider Format)
date,sym,time,open,high,low,close,volume,total_vol,total_bar_vol,ticks,wavgp,filtered,kx_recv_time,kx_recv_date,interval_type,src_id,list_exExample Row
2025-08-01,AAPL,9:30,210.95,212.09,210.2647,210.835,2195476,4496491,4494942,29129,211.0691951,0,04:38.6,2025-08-02,minutes,1,XNASColumn Mapping
| Required Column | Example CSV Column | Notes |
|---|---|---|
timestamp | date + time | Combined into a single timestamp. |
symbol | sym | Ticker symbol. |
open | open | Open price. |
high | high | High price. |
low | low | Low price. |
close | close | Close price. |
volume | volume | Trade volume. |
Custom Data (data_type = "custom")
- This input type allows you to provide custom CSV datasets that can be consumed by strategies.
- Unlike bar data, there are no fixed required columns—the schema depends entirely on how your strategy is implemented.
Example custom data
date,time,sym,signal1,weight1
2025-08-01,14:00:00.012,AAPL,1,1
2025-08-01,15:01:00.012,AAPL,0,0The strategy consuming this dataset is responsible for interpreting the columns.
HiveQ Data API
A unified Data access API for accessing all kind of data, curated and vendor provided market data
- Currently supports historical data (Bars)
- See data API docs for more information about parameters
data_configs=[{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ',
'schema': ['bars_1m'],
}]Writing Strategies
All hiveq flow needs is a method on_hiveq_event(ctx, event) either as a global method or a method implemented in your class.
Writing your strategy - Global method (on_hiveq_event)**
import hiveq.flow as hf
from hiveq.flow.config import EventType, AssetType
# Global strategy function
def on_hiveq_event(ctx: hf.Context, event):
"""
Global function strategy for testing.
Simple buy and hold strategy.
"""
if event.type == EventType.START:
ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')
print("Global strategy started!")
elif event.type == EventType.BAR:
bar = event.data()
symbol = bar.symbol
# Simple buy and hold - buy on first bar
position = ctx.portfolio().net_position(symbol)
if position == 0:
ctx.buy_order(symbol=symbol, quantity=100)
print(f"Global strategy: Bought 100 shares of {symbol} at {bar.close}")
elif event.type == EventType.ORDER_FILLED:
print("Global strategy: Order filled!")
if __name__ == '__main__': # Have main or just a jupyter cell invoke
results = hf.run_backtest(
strategy_configs=[], # Empty - should trigger global function usage
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-02',
data_configs=[{
'type': 'csv',
'data_type': 'bars_1m',
'id': '1_MIN_BAR',
'path': '/bars/AAPL_bars.csv'
}]
)Writing your strategy as a class - for writing modular and reusable strategies
import numpy as np
import hiveq.flow as hf
from hiveq.flow.config import EventType, AssetType
from hiveq.flow import StrategyConfig
from collections import deque
class SMACrossoverStrategy:
def __init__(self):
trade_size: int = 100
self.fast_window = 10
self.slow_window = 30
self.trade_size = trade_size
self.fast_prices = deque(maxlen=self.fast_window)
self.slow_prices = deque(maxlen=self.slow_window)
self.current_position = 0
def on_hiveq_event(self, ctx: hf.Context, event):
"""
SMA Crossover Strategy.
Buys when fast SMA crosses above slow SMA (golden cross).
Sells when fast SMA crosses below slow SMA (death cross).
"""
if event.type == EventType.START:
ctx.subscribe_bars(symbols=ctx.strategy_config.symbols, asset_type=AssetType.EQUITY, interval='1m')
elif event.type == EventType.BAR:
bar = event.data()
symbol = bar.symbol
self.fast_prices.append(bar.close)
self.slow_prices.append(bar.close)
if len(self.slow_prices) < 30:
# Not enough data yet
return
fast_ma = np.mean(self.fast_prices)
slow_ma = np.mean(self.slow_prices)
# Trading logic: Crossover
if fast_ma > slow_ma and self.current_position <= 0:
ctx.buy_order(symbol=symbol, quantity=100)
self.current_position = 100
elif fast_ma < slow_ma and self.current_position >= 0:
ctx.sell_order(symbol=symbol, quantity=100)
self.current_position -= 100
elif event.type == EventType.ORDER_FILLED:
order = event.data()
print(f"Order filled at {order.avg_px:.2f} for {order.filled_qty} shares")
elif event.type == EventType.POSITION_CLOSED:
position = event.data()
print(f"Position closed: P&L = ${position.realized_pnl:.2f}")
if __name__ == '__main__':
# Configure strategy
strategy_configs = [
StrategyConfig(
name='SMACross',
type='SMACrossoverStrategy',
params={'fast_window': 10, 'slow_window': 30}
)
]
results = hf.run_backtest(
strategy_configs=strategy_configs,
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-02',
data_configs=[{
'type': 'csv',
'data_type': 'bars_1m',
'id': '1_MIN_BAR',
'path': '/bars/AAPL_bars.csv'
}]
)Support for individual event callback - Callback per event type
"""
SMA Crossover Strategy using Per-Event Callbacks.
This example demonstrates how to implement a trading strategy using individual
event callback methods (on_start, on_stop, on_bar, etc.) instead of the
single on_hiveq_event method. Each callback has proper type annotations
to show the expected data types.
"""
from typing import Optional, Dict, Any
from collections import deque
import numpy as np
import pandas as pd
import hiveq.flow as hf
from hiveq.flow.events import StartEvent, BarEvent, TimerEvent
from hiveq.flow import Context, StrategyConfig
from hiveq.flow.config import AssetType
from hiveq.flow.trading_types import Order, Position, Portfolio
from hiveq.flow.logger import logger
logger = logger()
class SMAEventCallbackStrategy:
"""
SMA Crossover Strategy with per-event callbacks.
This strategy demonstrates:
- Individual callback methods for different event types
- Proper type annotations for each callback
- State management across different events
- Position tracking and order management
"""
def __init__(self):
"""Initialize strategy state and parameters."""
# Strategy parameters
self.fast_window: int = 10
self.slow_window: int = 30
self.trade_size: int = 100
# Price buffers for SMA calculation
self.fast_prices: deque = deque(maxlen=self.fast_window)
self.slow_prices: deque = deque(maxlen=self.slow_window)
# Position tracking (simplified - Context now handles order tracking)
self.position_side: Optional[str] = None # 'long', 'short', or None
# Performance tracking
self.trade_count: int = 0
self.winning_trades: int = 0
self.losing_trades: int = 0
logger.info(f"Initialized SMA Strategy - Fast: {self.fast_window}, Slow: {self.slow_window}")
def on_start(self, ctx: Context, event: StartEvent) -> None:
"""
Called when the strategy starts.
Args:
ctx: The strategy context providing access to trading operations
"""
logger.info("=== Strategy Starting ===")
# Subscribe to bar data for all configured symbols
symbols = ctx.strategy_config.symbols
logger.info(f"Subscribing to 1-minute bars for symbols: {symbols}")
ctx.subscribe_bars(symbols=symbols, asset_type=AssetType.EQUITY, interval='1m')
# Set up a periodic timer for status reporting
ctx.set_timer('status_report', pd.Timedelta(minutes=5))
logger.info("Set up 5-minute status report timer")
# Log strategy parameters
logger.info(f"Strategy parameters from config: {ctx.strategy_config.params}")
# Override parameters if provided in config
if ctx.strategy_config.params:
self.fast_window = ctx.strategy_config.params.get('fast_window', self.fast_window)
self.slow_window = ctx.strategy_config.params.get('slow_window', self.slow_window)
self.trade_size = ctx.strategy_config.params.get('trade_size', self.trade_size)
# Reinitialize deques with new window sizes
self.fast_prices = deque(maxlen=self.fast_window)
self.slow_prices = deque(maxlen=self.slow_window)
logger.info(f"Updated parameters - Fast: {self.fast_window}, Slow: {self.slow_window}, Size: {self.trade_size}")
def on_bar(self, ctx: Context, event: BarEvent) -> None:
bar = event.data()
symbol = bar.symbol
close_price = bar.close
# Update price buffers
self.fast_prices.append(close_price)
self.slow_prices.append(close_price)
# Log every 10th bar to avoid spam
bar_count = len(self.slow_prices)
if bar_count % 10 == 0:
logger.debug(f"Bar #{bar_count} - {symbol}: O={bar.open:.2f}, H={bar.high:.2f}, "
f"L={bar.low:.2f}, C={bar.close:.2f}, V={bar.volume:.0f}")
# Check if we have enough data for calculation
if len(self.slow_prices) < self.slow_window:
if bar_count == 1:
logger.info(f"Collecting data... Need {self.slow_window} bars for slow SMA")
return
# Calculate SMAs
fast_sma = np.mean(self.fast_prices)
slow_sma = np.mean(self.slow_prices)
# Log SMA values periodically
if bar_count % 10 == 0:
logger.debug(f"Fast SMA: {fast_sma:.2f}, Slow SMA: {slow_sma:.2f}")
# Generate trading signals
self._check_trading_signals(ctx, symbol, fast_sma, slow_sma, close_price)
def on_timer(self, ctx: Context, event: TimerEvent) -> None:
timer = event.data()
# logger.info(f'Fired timer : {timer.timer_id}')
# ==================== Helper Methods ====================
def _check_trading_signals(self, ctx: Context, symbol: str,
fast_sma: float, slow_sma: float,
current_price: float) -> None:
# Get current position info
net_pos = ctx.net_position(symbol)
pending_qty = ctx.open_order_qty(symbol)
available_pos = net_pos - pending_qty
# Golden Cross: Fast SMA crosses above Slow SMA (bullish signal)
if fast_sma > slow_sma and self.position_side != 'long':
logger.info(f"GOLDEN CROSS detected - Buying {self.trade_size} shares at ~{current_price:.2f}")
logger.info(f"Net position: {net_pos}, Pending orders: {pending_qty}, Available: {available_pos}")
ctx.buy_order(symbol=symbol, quantity=self.trade_size)
self.position_side = 'long'
# Death Cross: Fast SMA crosses below Slow SMA (bearish signal)
elif fast_sma < slow_sma and self.position_side != 'short':
# Can only sell what we have (netting constraint)
if available_pos > 0:
# Sell either our target size or available position, whichever is smaller
sell_qty = min(self.trade_size, available_pos)
logger.info(f"DEATH CROSS detected - Selling {sell_qty} shares at ~{current_price:.2f}")
logger.info(f"Net position: {net_pos}, Pending orders: {pending_qty}, Available: {available_pos}")
ctx.sell_order(symbol=symbol, quantity=sell_qty)
# Update position side based on what we expect to have
expected_pos_after = available_pos - sell_qty
if expected_pos_after <= 0:
self.position_side = None # Will be flat
else:
self.position_side = 'long' # Still long but reduced
else:
logger.info(f"DEATH CROSS detected but no position to sell")
logger.info(f"Net position: {net_pos}, Pending orders: {pending_qty}, Available: {available_pos}")
self.position_side = 'short' # Signal detected but no action taken
if __name__ == '__main__':
"""
Example of running the strategy with per-event callbacks.
"""
# Configure strategy with parameters
strategy_configs = [
StrategyConfig(
name='SMAEventCallbacks',
type='SMAEventCallbackStrategy', # Class name
params={
'fast_window': 10,
'slow_window': 30,
'trade_size': 100
}
)
]
# Run backtest
results = hf.run_backtest(
strategy_configs=strategy_configs,
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-02',
data_configs=[{
'type': 'csv',
'data_type': 'bars_1m',
'id': '1_MIN_BAR',
'path': 'bars/AAPL_bars.csv'
}]
)Using suite of prebuilt strategies
import hiveq.flow as hf
from hiveq.flow import StrategyConfig
strategy_configs = [StrategyConfig(name='TestUserSignal', type='UserSignalStrategy',
params={'user_data_list': ['UserDataTest'],
'universe_file': 'universe/univ.csv',
'mm': 1.0, 'enable_holding': False, 'mm_reset': 1.0,
'entry_begin_time': "00:00:01", 'entry_end_time': "15:49:00",
'exit_begin_time': "15:49:30", 'exit_end_time': "16:00:00",
'stop_loss_bps': 0
})]
results = hf.run_backtest(
strategy_configs=strategy_configs,
symbols=['AAPL'],
start_date='2025-08-01',
end_date='2025-08-02',
data_configs=[{
'type': 'csv',
'data_type': 'bars_1m',
'id': '1_MIN_BAR',
'path': 'bars/AAPL_bars.csv'
}, {'type': 'csv', 'data_type': 'custom',
'path': 'userdata/user_data.csv',
'id': 'UserDataTest'}]
)User Guide
- Installation - Detailed installation instruction
API Reference
- API Reference - Complete API documentation
- Data Types - HiveQ Flow data types and callback interfaces
Developer Guide
- Changelog - Release notes and updates