HiveQ Flow v0.2.0
HiveQ Flow Trading Platform
HiveQ Flow is an event-driven algorithmic trading framework for backtesting and live trading strategies with support for equities, futures, and options.
Version: 0.2.0
Overview
Flow provides a complete environment for developing and deploying quantitative trading strategies. The platform uses an event-driven architecture that processes market data, generates signals, and executes orders consistently across backtesting and live trading environments.
Key Features
- Event-Driven Architecture: Process market data through standardized event callbacks
- Backtesting Engine: High-performance historical simulation with realistic execution
- Live Trading Support: Deploy strategies for paper trading or live trading
- Multi-Asset Support: Trade equities, futures (with auto-rollover), and options
- Flexible Data Sources: HiveQ historical data, real-time feeds, or custom CSV data
- Performance Analytics: Comprehensive reporting with tear sheets and metrics
Quick Start
import hiveq.flow as hf
from hiveq.flow import StrategyConfig
from hiveq.flow.config import EventType, AssetType
# Initialize Flow
hf.init(trader_id='your_id', api_key='your_key')
# Define strategy
class MyStrategy:
def on_hiveq_event(self, ctx: hf.Context, event: hf.Event):
if event.type == EventType.START:
# Subscribe to market data
ctx.subscribe_bars(symbols=['AAPL'], asset_type=AssetType.EQUITY, interval='1m')
elif event.type == EventType.BAR:
bar = event.data()
# Trading logic
if ctx.is_flat(bar.symbol):
ctx.buy_order(symbol=bar.symbol, quantity=100)
# Run backtest
report = hf.run_backtest(
strategy_configs=[StrategyConfig(name='MyStrat', type='MyStrategy', params={})],
symbols=['AAPL'],
start_date='2025-01-01',
end_date='2025-01-31',
data_configs=[{'type': 'hiveq_historical', 'dataset': 'HIVEQ_US_EQ', 'schema': ['bars_1m']}]
)
# View results
print(report)Platform Architecture
Flow is built on an event-driven architecture with these core components:
1. Event System
All market data and trading events flow through a standardized event pipeline:
- Lifecycle Events: START, STOP
- Market Data Events: BAR, TRADE, QUOTE, SNAP (options)
- Order Events: ORDER_SUBMITTED, ORDER_FILLED, ORDER_REJECTED, ORDER_CANCELED
- Position Events: POSITION_OPENED, POSITION_CHANGED, POSITION_CLOSED
- Custom Events: TIMER, CUSTOM_DATA
2. Context API
The Context object provides strategies with access to:
- Market data subscriptions
- Order management
- Position queries
- Portfolio state
- Event logging
3. Data Integration
- HiveQ Historical: Access to equities, futures, and options data
- Real-Time Feeds: Live market data via Kafka
- Custom Data: CSV files or custom signal sources
4. Execution Engine
- Realistic fill simulation in backtesting
- Configurable slippage and commissions
- Support for multiple order types (MARKET, LIMIT, STOP, MOO, MOC)
Core Concepts
Strategy Implementation
Strategies are Python classes with event callback methods. Choose one of two approaches:
Approach 1: Single Event Handler
class MyStrategy:
def on_hiveq_event(self, ctx: hf.Context, event: hf.Event):
if event.type == EventType.START:
# Initialize
pass
elif event.type == EventType.BAR:
# Process bar data
bar = event.data()Approach 2: Per-Event Handlers (Recommended)
from hiveq.flow import events
class MyStrategy:
def on_start(self, ctx: hf.Context, event: events.StartEvent):
# Initialize
pass
def on_bar(self, ctx: hf.Context, event: events.BarEvent):
# Process bar data
bar = event.data()Important: Never mix both approaches in the same strategy class!
Context API
Access trading operations through the Context object:
# Subscribe to data
ctx.subscribe_bars(symbols=['AAPL'], asset_type=AssetType.EQUITY, interval='1m')
ctx.subscribe_futures_bars(['ES.c.0'], interval='5m')
ctx.subscribe_option_snaps('SPX', strike=5000, option_type='C', expiration_type='0dte')
# Place orders
ctx.buy_order(symbol='AAPL', quantity=100)
ctx.sell_order(symbol='AAPL', quantity=100, order_type=OrderType.LIMIT, limit_price=150.00)
# Query positions
position = ctx.net_position('AAPL')
is_flat = ctx.is_flat('AAPL')
is_long = ctx.is_net_long('AAPL')
# Portfolio access
portfolio = ctx.portfolio()
total_pnl = portfolio.total_pnl()Data Configuration
Specify data sources in run_backtest():
data_configs = [
{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_EQ', # Equities
'schema': ['bars_1m'] # 1-minute bars
},
{
'type': 'hiveq_historical',
'dataset': 'HIVEQ_US_FUT', # Futures
'schema': ['bars_5m']
},
{
'type': 'csv',
'data_type': 'custom',
'id': 'my_signals',
'path': 'signals/data.csv'
}
]Asset Classes
Equities
ctx.subscribe_bars(symbols=['AAPL', 'MSFT'], asset_type=AssetType.EQUITY, interval='1m')
ctx.buy_order(symbol='AAPL', quantity=100)Futures (with Auto-Rollover)
# Continuous contracts
ctx.subscribe_futures_bars(['ES.c.0', 'NQ.v.0'], interval='5m') # .c.0 = calendar, .v.0 = volume
ctx.buy_order(symbol='ES.c.0', quantity=1) # 1 contractOptions
# Subscribe with filtering
ctx.subscribe_option_snaps('SPX', strike=5000, option_type='C', expiration_type='0dte', interval='1s')
def on_snap(self, ctx: hf.Context, event: events.SnapEvent):
snap = event.data()
# snap.symbol, snap.chain, snap.strike, snap.bid_px, snap.ask_pxQuick Start
import hiveq.flow as hf
from hiveq.flow import StrategyConfig
from hiveq.flow.config import EventType, AssetType
hf.init(trader_id='your_id', api_key='your_key')
class MyStrategy:
def on_hiveq_event(self, ctx: hf.Context, event: hf.Event):
if event.type == EventType.START:
ctx.subscribe_bars(symbols=['AAPL'], asset_type=AssetType.EQUITY, interval='1m')
elif event.type == EventType.BAR:
bar = event.data()
if ctx.is_flat(bar.symbol):
ctx.buy_order(symbol=bar.symbol, quantity=100)
strategy_config = StrategyConfig(name='MyStrat', type='MyStrategy', params={})
report = hf.run_backtest(
strategy_configs=[strategy_config],
symbols=['AAPL'],
start_date='2025-01-01',
end_date='2025-01-31',
data_configs=[{'type': 'hiveq_historical', 'dataset': 'HIVEQ_US_EQ', 'schema': ['bars_1m']}]
)Performance Reporting
Flow generates comprehensive performance reports:
report = hf.run_backtest(...)
# Access report data
print(report.positions) # All positions
print(report.return_stats) # Returns, Sharpe, etc.
print(report.pnl_stats) # P&L statistics
print(report.trades) # Trade history
print(report.orders) # Order history
# Generate tear sheet (HTML)
html_tearsheet = report.create_tearsheet()
# In Marimo notebook
import marimo as mo
mo.Html(report.create_tearsheet())Example: Simple Moving Average Strategy
import hiveq.flow as hf
from hiveq.flow import StrategyConfig
from hiveq.flow.config import EventType, AssetType
from collections import deque
import numpy as np
hf.init(trader_id='T123', api_key='KEY')
class SMACrossover:
def __init__(self):
self.fast_window = 10
self.slow_window = 30
self.prices = {}
def on_hiveq_event(self, ctx: hf.Context, event: hf.Event):
if event.type == EventType.START:
ctx.subscribe_bars(symbols=['AAPL'], asset_type=AssetType.EQUITY, interval='1m')
self.prices['AAPL'] = {'fast': deque(maxlen=10), 'slow': deque(maxlen=30)}
elif event.type == EventType.BAR:
bar = event.data()
self.prices[bar.symbol]['fast'].append(bar.close)
self.prices[bar.symbol]['slow'].append(bar.close)
if len(self.prices[bar.symbol]['slow']) < 30:
return
fast_sma = np.mean(self.prices[bar.symbol]['fast'])
slow_sma = np.mean(self.prices[bar.symbol]['slow'])
if fast_sma > slow_sma and ctx.is_flat(bar.symbol):
ctx.buy_order(symbol=bar.symbol, quantity=100)
elif fast_sma < slow_sma and ctx.is_net_long(bar.symbol):
ctx.sell_order(symbol=bar.symbol, quantity=ctx.net_position(bar.symbol))
report = hf.run_backtest(
strategy_configs=[StrategyConfig(name='SMA', type='SMACrossover', params={})],
symbols=['AAPL'],
start_date='2025-09-01',
end_date='2025-09-30',
data_configs=[{'type': 'hiveq_historical', 'dataset': 'HIVEQ_US_EQ', 'schema': ['bars_1m']}]
)Next Steps
- New Users: Start with Strategy Development to understand the framework
- Configuration: Learn about Configuration options
Version: 0.2.0 Last Updated: 2025-11-20 Maintained By: HiveQ Team