Python Data API
HiveQ Data API
A Python client for accessing HiveQ's historical and live market data with ease.
Version: 0.2.1
Features
Five Powerful Clients
- Historical - Query historical market data with flexible date/time ranges (API-based)
- InstrumentReference - Access instrument metadata like multipliers, roots, symbols, etc. (API-based)
- Publisher - Publish strategy events, logs, and custom data to HiveQ destinations (API-based, async mode default)
- Metadata - Get dataset and schema information from HiveQ platform (API-based)
- LiveStream - Stream live market data in real-time (WebSocket-based)
Additional Features
- Centralized Configuration: Load credentials from environment variables or direct parameters
- Fast JSON Parsing: Built on orjson for high-performance data processing
- Async Publishing: Fire-and-forget publishing with 10-100x performance improvement (default mode)
- Automatic Rollover Detection: Futures contract rollover information with multiple methodologies
Installation
Install from PyPI (coming soon)
pip install hiveq-dataDevelopment Installation
# Clone the repository
cd python-sdk
# Install in development mode
pip install -e .
# Or install with development dependencies
pip install -e ".[dev]"Check Version
import hiveq_data as hd
print(hd.__version__) # Outputs: 0.2.1Configuration
Automatic Configuration (Recommended)
No configuration needed! The SDK automatically loads credentials from environment variables:
- Searches for
.envfile in your current directory and parent directories - Falls back to environment variables (
HIVEQ_API_KEY,HIVEQ_BASE_URL,HIVEQ_USER_ID,HIVEQ_ORG_ID,HIVEQ_USER_NAME)
Just create a .env file (see .env.example):
HIVEQ_API_KEY=your_api_key_here
HIVEQ_BASE_URL=https://api.hiveq.com
HIVEQ_USER_ID=your_user_id
HIVEQ_ORG_ID=your_org_id
HIVEQ_USER_NAME=your_usernameThat's it! No configure() call needed:
import hiveq_data as hd
# Credentials automatically loaded from .env!
client = hd.Historical()
data = client.get_data(...)Manual Configuration (Optional)
You only need to call configure() if you want to:
- Override auto-loaded values
- Set credentials programmatically
Override base URL for local development
import hiveq_data as hd
# Auto-loads credentials from .env, but overrides base_url
hd.configure(base_url="http://localhost:3000")Set credentials programmatically
import hiveq_data as hd
hd.configure(
api_key='your_api_key_here',
base_url='https://api.hiveq.com',
user_id='your_user_id',
org_id='your_org_id',
user_name='your_username'
)Use environment variables only
export HIVEQ_API_KEY=your_api_key_here
export HIVEQ_BASE_URL=https://api.hiveq.com
export HIVEQ_USER_ID=your_user_id
export HIVEQ_ORG_ID=your_org_id
export HIVEQ_USER_NAME=your_usernameimport hiveq_data as hd
# Auto-loads from environment variables
client = hd.Historical()Logging Configuration (Optional)
The SDK uses Python's standard logging module. By default, logs are suppressed. Configure logging in your application to see SDK logs:
import logging
import hiveq_data as hd
# Basic configuration - show all SDK logs at INFO level
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Or configure only HiveQ SDK logs
logging.getLogger('hiveq_data').setLevel(logging.DEBUG)
# Create client - will now show debug logs
client = hd.Historical()Log Levels:
DEBUG- Detailed information for debugging (API payloads, parsing details)INFO- General informational messages (operations completed)WARNING- Warning messages (fallback behaviors, missing data)ERROR- Error messages (API failures, parsing errors)
Filter specific modules:
# Only show futures reference logs
logging.getLogger('hiveq_data.instrument_reference.futures_reference').setLevel(logging.DEBUG)
# Suppress async client logs
logging.getLogger('hiveq_data.internal.async_http_client').setLevel(logging.ERROR)Quick Start
Make sure you have a .env file with your credentials (credentials are automatically loaded on import).
Historical Data
import hiveq_data as hd
# Create client (automatically uses .env credentials)
historical = hd.Historical()
# Equity data - use 'symbols' parameter
equity_data = historical.get_data(
dataset='HIVEQ_US_EQ',
schema='bars_1s',
symbols=['AAPL', 'MSFT'],
start='2025-08-01',
end='2025-08-30'
)
print(equity_data)
# Futures data - Option 1: Query by root (all contracts for that root)
futures_by_root = historical.get_data(
dataset='HIVEQ_US_FUT',
schema='bars_1s',
root=['ES', 'NQ'], # E-mini S&P 500 and Nasdaq futures (all contracts)
start='2025-08-01',
end='2025-08-30'
)
print(futures_by_root)
# Futures data - Option 2: Query by specific contract symbols
futures_by_symbol = historical.get_data(
dataset='HIVEQ_US_FUT',
schema='bars_1s',
symbols=['ESH25', 'NQM25'], # Specific futures contracts
start='2025-08-01',
end='2025-08-30'
)
print(futures_by_symbol)
# Options data - Option 1: Query by chains (all options for underlying)
options_by_chain = historical.get_data(
dataset='HIVEQ_US_OPT',
schema='snaps_1s',
chains=['SPY'], # All SPY options
start='2025-08-01',
end='2025-08-30',
expiration_date='2025-09-19', # Optional: filter by expiration
strike=450.0, # Optional: filter by strike
option_type='C' # Optional: filter by option type (C=Call, P=Put)
)
print(options_by_chain)
# Options data - Option 2: Query by specific option symbols
options_by_symbol = historical.get_data(
dataset='HIVEQ_US_OPT',
schema='snaps_1s',
symbols=['SPY250919C00450000'], # Specific option contract
start='2025-08-01',
end='2025-08-30'
)
print(options_by_symbol)
# Session mode - same intraday window across multiple dates
session_data = historical.get_data(
dataset='HIVEQ_US_EQ',
schema='bars_1s',
symbols=['AAPL'],
start='2025-08-01 09:30:00',
end='2025-08-05 10:00:00',
filter_mode='session'
)
print(session_data)Instrument Reference
Query instrument metadata including contract specifications, multipliers, exchanges, and more.
import hiveq_data as hd
# Create client
instruments = hd.InstrumentReference()
# Get equity metadata
equities = instruments.get_equities(symbols=['AAPL', 'MSFT'])
print(f"Found {equities['count']} equities")
# Futures: Specific contracts
# Supports Databento symbology (ESM1 = June 2021 or 2011 based on query dates)
futures = instruments.get_futures(
symbols=['ESM1', 'ESZ5'],
start_date='2021-05-01',
end_date='2021-06-30'
)
# Returns: ESM1 parsed as June 2021 (context-aware)
# Futures: Continuous contracts
# Format: ROOT.RULE.POSITION (e.g., ES.v.1 = volume-based, position 1)
continuous = instruments.get_futures(
symbols=['ES.v.1'], # Second month by volume
start_date='2020-01-01',
end_date='2020-12-31',
expiry_type='volume' # 'volume', 'expiry', 'last_trade', 'open_interest'
)
# Returns: All contracts that were position 1 during 2020
# Other continuous contract examples:
# ES.v.0 - Front month by volume
# ES.c.0 - Front month by calendar
# ES.v.2 - Third month by volume
# Exchange and currency filters
futures_filtered = instruments.get_futures(
symbols=['ES.v.0'],
start_date='2024-01-01',
end_date='2024-01-31',
exchange='XCME',
currency='USD'
)
# Options contracts
options = instruments.get_options(
chains=['AAPL'],
expiration_date='2025-06-20',
option_type='C'
)
# Index instruments
indices = instruments.get_indices(symbols=['SPX', 'NDX'])Publisher
Note: Publisher requires API credentials and is used for writing data to HiveQ destinations. Supports both synchronous and asynchronous publishing modes. Async mode is the default.
Async Mode (Default)
import hiveq_data as hd
# Create publisher client (async_mode=True by default)
publisher = hd.Publisher()
# Publish strategy event logs (returns immediately, non-blocking)
publisher.publish(
schema='event_logs',
key='run_20240101_001',
operation='add', # 'add' for new records, 'modify' for updates
data=[
{
'ts_event': '2024-01-01T10:00:00.000Z',
'strategy_id': 'my_strategy',
'trader_id': 'trader_001',
'nav': 100000.0,
'realized_pnl': 1500.0,
'total_pnl': 1500.0,
'event_log_type': 'TRADE',
'sub_event_type': 'FILLED',
'symbol': 'AAPL',
'message': 'Trade executed'
}
]
)
# Always flush before shutdown in async mode
publisher.flush()
publisher.close()Synchronous Mode
import hiveq_data as hd
# Create publisher with sync mode
publisher = hd.Publisher(async_mode=False)
# Publish and wait for response
result = publisher.publish(
schema='event_logs',
key='run_20240101_001',
operation='add',
data=[
{
'ts_event': '2024-01-01T10:00:00.000Z',
'strategy_id': 'my_strategy',
'trader_id': 'trader_001',
'nav': 100000.0
}
]
)
publisher.close()High-Frequency Publishing
For high-frequency publishing scenarios (e.g., backtesting), the default async mode provides 10-100x performance improvement:
import hiveq_data as hd
publisher = hd.Publisher() # async_mode=True by default
# Publish data (returns immediately, non-blocking)
for i in range(1000):
publisher.publish(
schema='event_logs',
key=f'run_{i}',
operation='add',
data=[{
'ts_event': '2024-01-01T10:00:00.000Z',
'strategy_id': 'my_strategy',
'nav': 100000.0,
'realized_pnl': 1500.0
}]
)
# Flush before shutdown to ensure all data is published
publisher.flush()
publisher.close()SignalPublisher
Note: SignalPublisher uses Kafka for high-throughput signal publishing. Requires Kafka broker access.
SignalPublisher is optimized for publishing user signals (trading signals, strategy signals, etc.) to Kafka topics with low latency and high throughput.
Signal Format:
{
'data': {'signal': 0, 'weight': 0}, # Signal data dict
'user_id': 'Raj', # User identifier
'ts_event': '2025-11-05 19:00:02.267000', # Event timestamp (auto-generated)
'symbol': 'AAPL', # Symbol
'signal_version': 1, # Version (default: 1)
'signal_id': '39831f03-7022-4c0f-...' # UUID (auto-generated)
}Basic Usage
import hiveq_data as hd
# Create signal publisher
# Bootstrap servers can be set via KAFKA_BOOTSTRAP_SERVERS env variable
publisher = hd.SignalPublisher(
bootstrap_servers='localhost:9092',
topic='user-signals'
)
# Publish signal
publisher.publish(
data={'signal': 1, 'weight': 0.85},
symbol='AAPL',
user_id='Raj'
)
# Close when done
publisher.close()Context Manager Usage
import hiveq_data as hd
# Automatic cleanup with context manager
with hd.SignalPublisher(
bootstrap_servers='localhost:9092',
topic='user-signals'
) as publisher:
publisher.publish(
data={'signal': 1, 'weight': 0.85},
symbol='AAPL',
user_id='Raj'
)
# Publisher automatically closedBatch Publishing
import hiveq_data as hd
with hd.SignalPublisher() as publisher:
signals = [
{
'data': {'signal': 1, 'weight': 0.85},
'symbol': 'AAPL',
'user_id': 'Raj'
},
{
'data': {'signal': 1, 'weight': 0.72},
'symbol': 'MSFT',
'user_id': 'Raj'
},
{
'data': {'signal': 0, 'weight': 0.0},
'symbol': 'GOOG',
'user_id': 'Raj'
},
]
count = publisher.publish_batch(signals)
print(f"Published {count} signals")Custom Partition Keys
import hiveq_data as hd
with hd.SignalPublisher() as publisher:
# Use custom key for partitioning (e.g., trader ID)
publisher.publish(
data={'signal': 1, 'weight': 0.85},
symbol='AAPL',
user_id='Raj',
key='trader-001'
)Auto-Reconnect
import hiveq_data as hd
# Enable automatic reconnection on connection failure
publisher = hd.SignalPublisher(
bootstrap_servers='localhost:9092',
topic='user-signals',
auto_reconnect=True, # Enable auto-reconnect (default: True)
reconnect_interval=1.0, # Retry every 1 second (default: 1.0)
status_log_interval=30.0 # Log status every 30 seconds (default: 30.0)
)
# Check connection status
if publisher.is_connected():
publisher.publish(
data={'signal': 1, 'weight': 0.85},
symbol='AAPL',
user_id='Raj'
)
print("Signal published successfully")
else:
print("Publisher not connected, waiting for reconnection...")
# Get detailed status
status = publisher.get_connection_status()
print(f"Status: {status}")
publisher.close()Advanced Kafka Configuration
import hiveq_data as hd
# Pass additional Kafka producer config
publisher = hd.SignalPublisher(
bootstrap_servers='localhost:9092',
topic='user-signals',
acks='all', # Wait for all replicas
compression_type='gzip', # Compress messages
retries=3, # Retry failed sends
max_in_flight_requests_per_connection=1 # Ensure ordering
)Features:
- Kafka-based high-throughput publishing
- Automatic reconnection on connection failure
- Key-based partitioning by symbol or custom key
- JSON serialization with UTF-8 encoding
- Batch publishing support
- Context manager support for automatic cleanup
- Configurable Kafka producer settings
- Connection status monitoring
- Throttled reconnection logging
- Error handling with retry logic
Metadata
Note: Metadata client requires API credentials and provides access to dataset and schema information.
import hiveq_data as hd
# Create metadata client
metadata = hd.Metadata()
# Get all available datasets
datasets = metadata.get_datasets()
print(f"Available datasets: {datasets}")
# Get schemas for all datasets
all_schemas = metadata.get_schemas(dataset=["*"])
# Get schemas for specific datasets
schemas = metadata.get_schemas(dataset=["HIVEQ_US_EQ", "HIVEQ_US_FUT"])
# Get schema for single dataset
schema = metadata.get_schemas(dataset="HIVEQ_US_EQ")
print(f"HIVEQ_US_EQ schemas: {schema}")
# Get details for a specific schema
schema_info = metadata.get_schema(schema='bars_1s', dataset='HIVEQ_US_EQ')
# Refresh schema cache
metadata.refresh_schema()LiveStream
Note: LiveStream client connects to a WebSocket server for real-time market data streaming. It supports multiple subscriptions with a single connection, automatic reconnection every 1 second (configurable), and connection status monitoring every 30 seconds (configurable).
Basic Usage
import asyncio
import hiveq_data as hd
# Define single callback for handling all market data
async def handle_market_data(data):
print(f"Market Data: {data}")
async def main():
# Create LiveStream client
# - Reconnects every 1 second
# - Logs status every 30 seconds
stream = hd.LiveStream(
host='localhost',
port=8765,
auto_reconnect=True
)
# Connect to WebSocket server
await stream.connect()
# Subscribe to market data using single callback
await stream.subscribe('market_data.equity.bars_1m', 'AAPL', handle_market_data)
await stream.subscribe('market_data.equity.bars_1m', 'MSFT', handle_market_data)
await stream.subscribe('market_data.equity.tbbo', 'AAPL', handle_market_data)
# Subscribe to multiple keys at once
await stream.subscribe('market_data.equity.bars_1m', ['GOOGL', 'TSLA'], handle_market_data)
# Keep running
await stream.wait_until_disconnected()
# Cleanup
await stream.disconnect()
# Run the stream
asyncio.run(main())Using Context Manager
import asyncio
import hiveq_data as hd
async def handle_market_data(data):
print(f"Market Data: {data}")
async def main():
# Automatic connection management with context manager
async with hd.LiveStream(host='localhost', port=8765) as stream:
await stream.subscribe('market_data.equity.bars_1m', 'AAPL', handle_market_data)
await stream.subscribe('market_data.equity.tbbo', 'MSFT', handle_market_data)
# Run for 60 seconds
await asyncio.sleep(60)
# Stream automatically disconnected
asyncio.run(main())Managing Subscriptions
async def handle_market_data(data):
print(f"Market Data: {data}")
async with hd.LiveStream(host='localhost', port=8765) as stream:
# Subscribe
await stream.subscribe('market_data.equity.bars_1m', 'AAPL', handle_market_data)
# Run for a while
await asyncio.sleep(30)
# Unsubscribe specific callback
await stream.unsubscribe('market_data.equity.bars_1m', 'AAPL', handle_market_data)
# Or unsubscribe all callbacks for a topic/key
await stream.unsubscribe('market_data.equity.bars_1m', 'AAPL')Features:
- WebSocket-based real-time data streaming
- Multiple subscriptions with single connection and single callback
- Fast automatic reconnection every 1 second (configurable)
- Auto-resubscription to all topics after reconnection
- Connection status monitoring every 30 seconds with throttled logging
- Can add subscriptions even when not connected
- Async/await support with both sync and async callbacks
- Context manager support for automatic cleanup
- Thread-safe subscription management
Discovering Available Datasets and Schemas
To get the latest information about available datasets and schemas, use the Metadata client:
import hiveq_data as hd
# Create metadata client
metadata = hd.Metadata()
# Get all available datasets
datasets = metadata.get_datasets()
print("Available datasets:", datasets)
# Get all schemas for all datasets
all_schemas = metadata.get_schemas(dataset=["*"])
# Get schemas for specific dataset
eq_schemas = metadata.get_schemas(dataset="HIVEQ_US_EQ")
print("HIVEQ_US_EQ schemas:", eq_schemas)Why use Metadata client?
- Always returns current, up-to-date information
- Includes detailed schema descriptions and parameters
- Programmatically discover what's available
- No hardcoded lists that can become stale
Parameter Requirements by Asset Class
Equity (HIVEQ_US_EQ):
- Required:
symbols,start,end - Optional:
limit,offset,columns
Futures (HIVEQ_US_FUT):
- Required:
start,end, AND one of:symbolsORroot - Optional:
limit,offset,columns
Options (HIVEQ_US_OPT):
- Required:
start,end, AND one of:symbolsORchains - Optional:
limit,offset,columns,expiration_date,strike,option_type
API Reference
Configuration
hd.configure(api_key=None, base_url=None, user_id=None, org_id=None, user_name=None) (Optional)
Configure API credentials globally. Usually not needed as credentials are automatically loaded from environment variables.
Parameters:
api_key(str, optional): Your HiveQ API key (overrides auto-loaded value)base_url(str, optional): API base URL (overrides auto-loaded value)user_id(str, optional): User ID for request headers (X-User-ID)org_id(str, optional): Organization ID for request headers (X-Org-ID)user_name(str, optional): User name for request headers (X-User-Name)
Historical Client
hd.Historical(api_key=None, base_url=None, user_id=None, org_id=None, user_name=None)
Historical market data client.
Methods:
get_data(dataset, schema, symbols=None, root=None, chains=None, start=None, end=None, limit=None, offset=None, filter_mode=None, **kwargs): Fetch historical datadataset(str, required): Dataset name (e.g.,'HIVEQ_US_EQ','HIVEQ_US_FUT','HIVEQ_US_OPT')schema(str, required): Schema name (e.g.,'bars_1s','trades','snaps_1s')symbols(str|list, optional): Symbol(s) to queryroot(str|list, optional): Root symbol(s) for futureschains(str|list, optional): Underlying symbol(s) for optionsstart(str|date|datetime, required): Start date/datetimeend(str|date|datetime, required): End date/datetimelimit(int, optional): Maximum records to returnoffset(int, optional): Records to skip for paginationfilter_mode(str, optional):'continuous'(default) or'session'**kwargs: Additional parameters (e.g.,expiration_date,strike,option_type,columns)- Returns: dict with
'data'and metadata
Publisher Client
hd.Publisher(api_key=None, base_url=None, async_mode=True, user_id=None, org_id=None, user_name=None)
Data publisher client. Async mode is the default.
Parameters:
api_key(str, optional): API key (overrides global config)base_url(str, optional): Base URL (overrides global config)async_mode(bool, optional): If True (default), uses async HTTP for fire-and-forget publishesuser_id(str, optional): User ID for request headersorg_id(str, optional): Organization ID for request headersuser_name(str, optional): User name for request headers
Methods:
-
publish(schema, data, key, operation='add'): Publish data to a schemaschema(str, required): Schema name (e.g.,'event_logs')data(list, required): List of records to publish (each record is a dict)key(str, required): Unique key identifier for tracking published dataoperation(str, optional):'add'(insert new) or'modify'(update existing). Default:'add'- Returns: Response dict (immediate in async mode)
-
flush(): Wait for all in-flight async publishes to complete- Only relevant when
async_mode=True
- Only relevant when
-
close(): Close the publisher and cleanup resources
InstrumentReference Client
hd.InstrumentReference(api_key=None, base_url=None, user_id=None, org_id=None, user_name=None)
API-based instrument metadata client.
Methods:
-
get_equities(symbols=None, date=None, exchange=None, currency=None, limit=None, offset=None): Get equity metadata- Returns: dict with
'instruments'and'count'
- Returns: dict with
-
get_futures(symbols=None, start_date=None, end_date=None, exchange=None, currency=None, expiry_type='volume', limit=None, offset=None): Get futures contracts- Specific contracts: Databento symbology (e.g.,
ESM1,ESZ25) - Continuous contracts: Format
ROOT.RULE.POSITION(e.g.,ES.v.0) expiry_type: Rollover type ('volume','expiry','last_trade','open_interest')- Returns: dict with
'instruments'and'count'
- Specific contracts: Databento symbology (e.g.,
-
get_options(symbols=None, chains=None, start_date=None, end_date=None, expiration_date=None, strike=None, option_type=None, exchange=None, currency=None, limit=None, offset=None, root=None): Get options contracts- Returns: dict with
'instruments'and'count'
- Returns: dict with
-
get_indices(symbols=None, date=None, exchange=None, currency=None, limit=None, offset=None): Get index metadata- Returns: dict with
'instruments'and'count'
- Returns: dict with
-
get_instruments(symbols=None, asset_class=None, start_date=None, end_date=None, exchange=None, currency=None, limit=None, offset=None): Generic method- Routes to asset-specific methods based on
asset_class('equity','futures','options','index')
- Routes to asset-specific methods based on
Metadata Client
hd.Metadata(api_key=None, base_url=None)
Client for accessing HiveQ metadata API.
Methods:
-
get_datasets(): Get list of available datasets- Returns: dict with available datasets
-
get_schemas(dataset): Get schemas for specified datasetsdataset(str or list): Dataset name(s) or["*"]for all datasets- Returns: dict with schema information
-
get_schema(schema, dataset): Get detailed info for a specific schemaschema(str): Schema namedataset(str): Dataset name- Returns: dict with schema details
-
refresh_schema(): Refresh the schema cache- Returns: dict with refresh result
LiveStream Client
hd.LiveStream(host='localhost', port=8765, auto_reconnect=True, reconnect_interval=1.0, status_log_interval=30.0)
WebSocket-based client for streaming live market data.
Parameters:
host(str, optional): WebSocket server host (default:'localhost')port(int, optional): WebSocket server port (default:8765)auto_reconnect(bool, optional): Auto-reconnect on disconnect (default:True)reconnect_interval(float, optional): Seconds between reconnection attempts (default:1.0)status_log_interval(float, optional): Seconds between status log messages (default:30.0)
Methods:
async connect(): Connect to the WebSocket serverasync disconnect(): Disconnect from the serverasync subscribe(topic, keys, callback): Subscribe to a topic with key(s) and callbackasync unsubscribe(topic, key, callback=None): Unsubscribe from a (topic, key) pairasync wait_until_disconnected(): Wait until the client is fully stoppedis_connected(): Check if currently connected (returns bool)
Advanced Usage
Context Manager Support
All clients support context managers for automatic cleanup:
import hiveq_data as hd
# Credentials auto-loaded from env
with hd.Historical() as historical:
data = historical.get_data(
dataset='HIVEQ_US_EQ',
schema='bars_1s',
symbols='AAPL',
start='2025-08-01',
end='2025-08-30'
)
print(data)
# Session automatically closedPer-Client Credentials
Override auto-loaded credentials for specific clients:
import hiveq_data as hd
# Use .env credentials for most clients
prod_historical = hd.Historical()
# Use different credentials for specific client (e.g., local testing)
dev_historical = hd.Historical(
api_key='dev_api_key',
base_url='http://localhost:3000'
)Override Base URL for Development
import hiveq_data as hd
# Override base URL while keeping API key from env
hd.configure(base_url='http://localhost:3000')
# All clients now use localhost
client = hd.Historical()License
MIT License