Python Data API
HiveQ Data API
A Python client for accessing HiveQ's historical and live market data with ease.
Version: 0.2.0
Features
Five Powerful Clients
- Historical - Query historical market data with flexible date/time ranges (API-based)
- InstrumentReference - Access security metadata like multipliers, roots, symbols, etc. (API-based)
- Publisher - Publish strategy events, logs, and custom data to HiveQ destinations (API-based, supports async mode)
- Metadata - Get dataset and schema information from HiveQ platform (API-based)
- LiveStream - Stream live market data in real-time (API-based)
Additional Features
- Centralized Configuration: Load credentials once from .env files, environment variables, or direct parameters
- Fast JSON Parsing: Built on orjson for high-performance data processing
- Async Publishing: Fire-and-forget publishing with 10-100x performance improvement
- Automatic Rollover Detection: Futures contract rollover information with multiple methodologies
Installation
Install from PyPI (coming soon)
pip install hiveq-dataDevelopment Installation
# Clone the repository
cd python-sdk
# Install in development mode
pip install -e .
# Or install with development dependencies
pip install -e ".[dev]"Check Version
import hiveq_data as hd
print(hd.__version__) # Outputs: 0.2.0Configuration
Automatic Configuration (Recommended)
No configuration needed! The SDK automatically loads credentials when you import it:
- Searches for
.envfile in your current directory and parent directories - Falls back to environment variables (HIVEQ_API_KEY, HIVEQ_BASE_URL)
Just create a .env file (see .env.example):
HIVEQ_API_KEY=your_api_key_here
HIVEQ_BASE_URL=https://api.hiveq.comThat's it! No configure() call needed:
import hiveq_data as hd
# Credentials automatically loaded from .env!
client = hd.Historical()
data = client.get_data(...)Manual Configuration (Optional)
You only need to call configure() if you want to:
- Override auto-loaded values
- Use a specific .env file location
- Set credentials programmatically
Override base URL for local development
import hiveq_data as hd
# Auto-loads credentials from .env, but overrides base_url
hd.configure(base_url="http://localhost:3000")Use a specific .env file
import hiveq_data as hd
hd.configure(env_file=".env.production")Set credentials programmatically
import hiveq_data as hd
hd.configure(
api_key='your_api_key_here',
base_url='https://api.hiveq.com'
)Use environment variables only
export HIVEQ_API_KEY=your_api_key_here
export HIVEQ_BASE_URL=https://api.hiveq.comimport hiveq_data as hd
# Auto-loads from environment variables
client = hd.Historical()Logging Configuration (Optional)
The SDK uses Python's standard logging module. By default, logs are suppressed. Configure logging in your application to see SDK logs:
import logging
import hiveq_data as hd
# Basic configuration - show all SDK logs at INFO level
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Or configure only HiveQ SDK logs
logging.getLogger('hiveq_data').setLevel(logging.DEBUG)
# Create client - will now show debug logs
client = hd.Historical()Log Levels:
DEBUG- Detailed information for debugging (API payloads, parsing details)INFO- General informational messages (operations completed)WARNING- Warning messages (fallback behaviors, missing data)ERROR- Error messages (API failures, parsing errors)
Filter specific modules:
# Only show futures reference logs
logging.getLogger('hiveq_data.instrument_reference.futures_reference').setLevel(logging.DEBUG)
# Suppress async client logs
logging.getLogger('hiveq_data.internal.async_http_client').setLevel(logging.ERROR)Quick Start
Make sure you have a .env file with your credentials (credentials are automatically loaded on import).
Historical Data
import hiveq_data as hd
# Create client (automatically uses .env credentials)
historical = hd.Historical()
# Equity data - use 'symbols' parameter
equity_data = historical.get_data(
dataset='HIVEQ_US_EQ',
schema='bars_1s',
symbols=['AAPL', 'MSFT'],
start='2025-08-01',
end='2025-08-30'
)
print(equity_data)
# Futures data - Option 1: Query by root (all contracts for that root)
futures_by_root = historical.get_data(
dataset='HIVEQ_US_FUT',
schema='bars_1s',
root=['ES', 'NQ'], # E-mini S&P 500 and Nasdaq futures (all contracts)
start='2025-08-01',
end='2025-08-30'
)
print(futures_by_root)
# Futures data - Option 2: Query by specific contract symbols
futures_by_symbol = historical.get_data(
dataset='HIVEQ_US_FUT',
schema='bars_1s',
symbols=['ESH25', 'NQM25'], # Specific futures contracts
start='2025-08-01',
end='2025-08-30'
)
print(futures_by_symbol)
# Options data - Option 1: Query by chains (all options for underlying)
options_by_chain = historical.get_data(
dataset='HIVEQ_US_OPT',
schema='snaps_1s',
chains=['SPY'], # All SPY options
start='2025-08-01',
end='2025-08-30',
expiration_date='2025-09-19', # Optional: filter by expiration
strike=450.0, # Optional: filter by strike
option_type='C' # Optional: filter by option type (C=Call, P=Put)
)
print(options_by_chain)
# Options data - Option 2: Query by specific option symbols
options_by_symbol = historical.get_data(
dataset='HIVEQ_US_OPT',
schema='snaps_1s',
symbols=['SPY250919C00450000'], # Specific option contract
start='2025-08-01',
end='2025-08-30'
)
print(options_by_symbol)
# Parameters are validated based on asset class
try:
# This will fail - neither 'symbols' nor 'root' provided for futures
data = historical.get_data(
dataset='HIVEQ_US_FUT',
schema='bars_1s',
start='2025-08-01',
end='2025-08-30'
)
except ValueError as e:
print(f"Validation error: {e}")
# Error: At least one of these parameters is required for schema 'bars_1s' (asset_class: futures): symbols, rootInstrument Reference
Query instrument metadata including contract specifications, multipliers, exchanges, and more via API endpoints.
Note: InstrumentReference requires API credentials and provides up-to-date reference information.
import hiveq_data as hd
# Create client (automatically uses .env credentials)
instruments = hd.InstrumentReference()
# Get equity metadata
equities = instruments.get_equities(symbols=['AAPL', 'MSFT'])
print(f"Found {equities['count']} equities")
# Futures: Specific contracts
# Supports Databento symbology (ESM1 = June 2021 or 2011 based on query dates)
futures = instruments.get_futures(
symbols=['ESM1', 'ESZ5'],
start_date='2021-05-01',
end_date='2021-06-30'
)
# Returns: ESM1 parsed as June 2021 (context-aware)
# Futures: Continuous contracts
# Format: ROOT.RULE.POSITION (e.g., ES.v.1 = volume-based, position 1)
continuous = instruments.get_futures(
symbols=['ES.v.1'], # Second month by volume
start_date='2020-01-01',
end_date='2020-12-31'
)
# Returns: All contracts that were position 1 during 2020
# Other continuous contract examples:
# ES.v.0 - Front month by volume
# ES.c.0 - Front month by calendar
# ES.v.2 - Third month by volume
# Exchange and currency filters
futures_filtered = instruments.get_futures(
symbols=['ES.v.0'],
start_date='2024-01-01',
end_date='2024-01-31',
exchange='XCME',
currency='USD'
)
# Options contracts
options = instruments.get_options(
symbols=['SPY'],
start_date='2025-08-01',
end_date='2025-09-30'
)Publisher
Note: Publisher requires API credentials and is used for writing data to HiveQ destinations. Supports both synchronous and asynchronous publishing modes.
Synchronous Mode (Default)
import hiveq_data as hd
# Create publisher client (uses .env credentials, synchronous by default)
publisher = hd.Publisher()
# Publish strategy event logs
result = publisher.publish(
schema='event_logs',
key='run_20240101_001',
operation='add', # 'add' for new records, 'modify' for updates
data=[
{
'ts_event': '2024-01-01T10:00:00.000Z',
'strategy_id': 'my_strategy',
'user_id': 'trader_001',
'nav': 100000.0,
'realized_pnl': 1500.0,
'total_pnl': 1500.0,
'event_log_type': 'TRADE',
'sub_event_type': 'FILLED',
'symbol': 'AAPL',
'message': 'Trade executed'
}
]
)
print(f"Published {result['record_count']} records")
# Modify existing records (update operation)
result = publisher.publish(
schema='event_logs',
key='run_20240101_001',
operation='modify',
data=[
{
'ts_event': '2024-01-01T10:00:00.000Z',
'strategy_id': 'my_strategy',
'user_id': 'trader_001',
'nav': 102000.0, # Updated value
'realized_pnl': 3500.0, # Updated value
'total_pnl': 3500.0
}
]
)
# Close when done (good practice)
publisher.close()Async Mode (Fire-and-Forget, High Performance)
For high-frequency publishing scenarios (e.g., backtesting), use async mode for 10-100x performance improvement:
import hiveq_data as hd
# Create publisher with async mode enabled
publisher = hd.Publisher(async_mode=True)
# Publish data (returns immediately, non-blocking)
for i in range(1000):
publisher.publish(
schema='event_logs',
key=f'run_{i}',
operation='add',
data=[{
'ts_event': '2024-01-01T10:00:00.000Z',
'strategy_id': 'my_strategy',
'nav': 100000.0,
'realized_pnl': 1500.0
}]
)
# Returns immediately without waiting for HTTP response
# Flush before shutdown to ensure all data is published
publisher.flush()
publisher.close()Async Mode Benefits:
- 10-100x faster for high-frequency publishing
- Fire-and-forget:
publish()returns immediately - Concurrent HTTP requests with connection pooling
- Automatic retry with exponential backoff
- Perfect for backtesting and batch publishing
Important: Always call flush() before shutdown when using async mode to ensure all data is published.
SignalPublisher
Note: SignalPublisher uses Kafka for high-throughput signal publishing. Requires Kafka broker access.
SignalPublisher is optimized for publishing user signals (trading signals, strategy signals, etc.) to Kafka topics with low latency and high throughput.
Signal Format:
{
'data': {'signal': 0, 'weight': 0}, # Signal data dict
'user_id': 'Raj', # User identifier
'ts_event': '2025-11-05 19:00:02.267000', # Event timestamp (auto-generated)
'symbol': 'AAPL', # Symbol
'signal_version': 1, # Version (default: 1)
'signal_id': '39831f03-7022-4c0f-...' # UUID (auto-generated)
}Basic Usage
import hiveq_data as hd
# Create signal publisher
# Bootstrap servers can be set via KAFKA_BOOTSTRAP_SERVERS env variable
publisher = hd.SignalPublisher(
bootstrap_servers='localhost:9092',
topic='user-signals'
)
# Publish signal
publisher.publish(
data={'signal': 1, 'weight': 0.85},
symbol='AAPL',
user_id='Raj'
)
# Close when done
publisher.close()Context Manager Usage
import hiveq_data as hd
# Automatic cleanup with context manager
with hd.SignalPublisher(
bootstrap_servers='localhost:9092',
topic='user-signals'
) as publisher:
publisher.publish(
data={'signal': 1, 'weight': 0.85},
symbol='AAPL',
user_id='Raj'
)
# Publisher automatically closedBatch Publishing
import hiveq_data as hd
with hd.SignalPublisher() as publisher:
signals = [
{
'data': {'signal': 1, 'weight': 0.85},
'symbol': 'AAPL',
'user_id': 'Raj'
},
{
'data': {'signal': 1, 'weight': 0.72},
'symbol': 'MSFT',
'user_id': 'Raj'
},
{
'data': {'signal': 0, 'weight': 0.0},
'symbol': 'GOOG',
'user_id': 'Raj'
},
]
count = publisher.publish_batch(signals)
print(f"Published {count} signals")Custom Partition Keys
import hiveq_data as hd
with hd.SignalPublisher() as publisher:
# Use custom key for partitioning (e.g., trader ID)
publisher.publish(
data={'signal': 1, 'weight': 0.85},
symbol='AAPL',
user_id='Raj',
key='trader-001'
)Auto-Reconnect
import hiveq_data as hd
# Enable automatic reconnection on connection failure
publisher = hd.SignalPublisher(
bootstrap_servers='localhost:9092',
topic='user-signals',
auto_reconnect=True, # Enable auto-reconnect (default: True)
reconnect_interval=1.0, # Retry every 1 second (default: 1.0)
status_log_interval=30.0 # Log status every 30 seconds (default: 30.0)
)
# Check connection status
if publisher.is_connected():
publisher.publish(
data={'signal': 1, 'weight': 0.85},
symbol='AAPL',
user_id='Raj'
)
print("Signal published successfully")
else:
print("Publisher not connected, waiting for reconnection...")
# Get detailed status
status = publisher.get_connection_status()
print(f"Status: {status}")
publisher.close()Advanced Kafka Configuration
import hiveq_data as hd
# Pass additional Kafka producer config
publisher = hd.SignalPublisher(
bootstrap_servers='localhost:9092',
topic='user-signals',
acks='all', # Wait for all replicas
compression_type='gzip', # Compress messages
retries=3, # Retry failed sends
max_in_flight_requests_per_connection=1 # Ensure ordering
)Features:
- Kafka-based high-throughput publishing
- Automatic reconnection on connection failure
- Key-based partitioning by symbol or custom key
- JSON serialization with UTF-8 encoding
- Batch publishing support
- Context manager support for automatic cleanup
- Configurable Kafka producer settings
- Connection status monitoring
- Throttled reconnection logging
- Error handling with retry logic
Metadata
Note: Metadata client requires API credentials and provides access to dataset and schema information.
import hiveq_data as hd
# Create metadata client
metadata = hd.Metadata()
# Get all available datasets
datasets = metadata.get_datasets()
print(f"Available datasets: {datasets}")
# Get schemas for all datasets
all_schemas = metadata.get_schemas(dataset=["*"])
# Get schemas for specific datasets
schemas = metadata.get_schemas(dataset=["HIVEQ_US_EQ", "HIVEQ_US_FUT"])
# Get schema for single dataset
schema = metadata.get_schemas(dataset="HIVEQ_US_EQ")
print(f"HIVEQ_US_EQ schemas: {schema}")LiveStream
Note: LiveStream client connects to a WebSocket server for real-time market data streaming. It supports multiple subscriptions with a single connection, automatic reconnection every 1 second (configurable), and connection status monitoring every 30 seconds (configurable).
Basic Usage
import asyncio
from hiveq_data.live.client import LiveStream
# Define single callback for handling all market data
async def handle_market_data(data):
print(f"Market Data: {data}")
async def main():
# Create LiveStream client
# - Reconnects every 1 second
# - Logs status every 30 seconds
stream = LiveStream(
host='localhost',
port=8765,
auto_reconnect=True
)
# Connect to WebSocket server
await stream.connect()
# Subscribe to market data using single callback
await stream.subscribe('market_data.equity.bars_1m', ['AAPL', 'MSFT'], handle_market_data)
await stream.subscribe('market_data.equity.tbbo', 'AAPL', handle_market_data)
# Keep running
await stream.wait_until_disconnected()
# Cleanup
await stream.disconnect()
# Run the stream
asyncio.run(main())Using Context Manager
import asyncio
from hiveq_data.live.client import LiveStream
async def handle_market_data(data):
print(f"Market Data: {data}")
async def main():
# Automatic connection management with context manager
async with LiveStream(host='localhost', port=8765) as stream:
await stream.subscribe('market_data.equity.bars_1m', ['AAPL'], handle_market_data)
await stream.subscribe('market_data.equity.tbbo', ['MSFT'], handle_market_data)
# Run for 60 seconds
await asyncio.sleep(60)
# Stream automatically disconnected
asyncio.run(main())Multiple Subscriptions
# One client can handle multiple subscriptions with single callback
async def handle_market_data(data):
print(f"Market Data: {data}")
async with LiveStream(host='localhost', port=8765) as stream:
# Subscribe to different topics with same callback
await stream.subscribe('market_data.equity.bars_1m', ['AAPL', 'MSFT'], handle_market_data)
await stream.subscribe('market_data.equity.tbbo', ['AAPL', 'MSFT'], handle_market_data)
await stream.wait_until_disconnected()Managing Subscriptions
async def handle_market_data(data):
print(f"Market Data: {data}")
async with LiveStream(host='localhost', port=8765) as stream:
# Subscribe
await stream.subscribe('market_data.equity.bars_1m', ['AAPL'], handle_market_data)
# Run for a while
await asyncio.sleep(30)
# Unsubscribe specific callback
await stream.unsubscribe('market_data.equity.bars_1m', 'AAPL', handle_market_data)
# Or unsubscribe all callbacks for a topic/key
await stream.unsubscribe('market_data.equity.bars_1m', 'AAPL')Features:
- WebSocket-based real-time data streaming
- Multiple subscriptions with single connection and single callback
- Fast automatic reconnection every 1 second (configurable)
- Auto-resubscription to all topics after reconnection
- Connection status monitoring every 30 seconds with throttled logging
- Can add subscriptions even when not connected
- Async/await support
- Context manager support for automatic cleanup
- Thread-safe subscription management
Discovering Available Datasets and Schemas
To get the latest information about available datasets and schemas, use the Metadata client:
import hiveq_data as hd
# Create metadata client
metadata = hd.Metadata()
# Get all available datasets
datasets = metadata.get_datasets()
print("Available datasets:", datasets)
# Get all schemas for all datasets
all_schemas = metadata.get_schemas(dataset=["*"])
# Get schemas for specific dataset
eq_schemas = metadata.get_schemas(dataset="HIVEQ_US_EQ")
print("HIVEQ_US_EQ schemas:", eq_schemas)Why use Metadata client?
- Always returns current, up-to-date information
- Includes detailed schema descriptions and parameters
- Programmatically discover what's available
- No hardcoded lists that can become stale
Parameter Requirements by Asset Class
Equity (HIVEQ_US_EQ):
- Required:
symbols,start,end - Optional:
limit,offset,columns
Futures (HIVEQ_US_FUT):
- Required:
start,end, AND one of:symbolsORroot - Optional:
limit,offset,columns
Options (HIVEQ_US_OPT):
- Required:
start,end, AND one of:symbolsORchains - Optional:
limit,offset,columns,expiration_date,strike,option_type
API Reference
Configuration
hd.configure(api_key=None, base_url=None, env_file=None) (Optional)
Configure API credentials globally. Usually not needed as credentials are automatically loaded from .env files or environment variables.
Only call this function if you need to:
- Override auto-loaded values
- Specify a non-standard .env file location
- Set credentials programmatically
Parameters:
api_key(str, optional): Your HiveQ API key (overrides auto-loaded value)base_url(str, optional): API base URL (overrides auto-loaded value)env_file(str, optional): Path to specific .env file to load
Historical Client
hd.Historical(api_key=None, base_url=None)
Historical market data client with automatic parameter validation.
Methods:
get_data(dataset, schema, symbols=None, root=None, chains=None, start=None, end=None, limit=None, **kwargs): Fetch historical datadataset(str, required): Dataset name (e.g., 'HIVEQ_US_EQ', 'HIVEQ_US_FUT', 'HIVEQ_US_OPT')schema(str, required): Schema name (e.g., 'bars_1s', 'trades')symbols(str|list, optional): Symbol(s) for equity dataroot(str|list, optional): Root symbol(s) for futures datachains(str|list, optional): Underlying symbol(s) for options datastart(str|date|datetime, required): Start date/datetimeend(str|date|datetime, required): End date/datetimelimit(int, optional): Maximum records to return**kwargs: Additional optional parameters (e.g.,expiration_date,strike,option_typefor options)- Automatically validates parameters based on dataset's asset class
- Raises ValueError if required parameters are missing or invalid parameters are provided
Parameter Requirements by Asset Class:
-
Equity (
HIVEQ_US_EQ):- Required:
symbols- Stock ticker symbols (e.g., ['AAPL', 'MSFT'])
- Required:
-
Futures (
HIVEQ_US_FUT):- Required (choose one):
root- Futures root symbols (e.g., ['ES', 'NQ']) - Returns all contracts for that rootsymbols- Specific contract symbols (e.g., ['ESH25', 'NQM25'])- Both can be provided together
- Required (choose one):
-
Options (
HIVEQ_US_OPT):- Required (choose one):
chains- Underlying symbols (e.g., ['SPY']) - Returns all options for that underlyingsymbols- Specific option symbols (e.g., ['SPY250919C00450000'])- Both can be provided together
- Required (choose one):
Publisher Client
hd.Publisher(api_key=None, base_url=None, async_mode=False)
Data publisher client for writing data to HiveQ destinations. Supports both synchronous and asynchronous publishing modes.
Parameters:
api_key(str, optional): API key (overrides global config)base_url(str, optional): Base URL (overrides global config)async_mode(bool, optional): If True, uses async HTTP for fire-and-forget publishes (10-100x faster). Default: False for backward compatibility
Methods:
-
publish(schema, data, key, operation='add'): Publish data to a schemaschema(str, required): Schema name (e.g., 'event_logs')data(list, required): List of records to publish (each record is a dict)key(str|list, required): Unique key identifier(s) for tracking and mapping all published data. Can be a single string or a list of stringsoperation(str, optional): Operation type - 'add' (insert new records) or 'modify' (update existing). Default: 'add'- Returns: Response with status and record count (immediate in async mode)
- Raises ValueError if validation fails or invalid operation type
-
flush(): Wait for all in-flight async publishes to complete- Only relevant when
async_mode=True - Blocks until all pending async publish operations finish
- Should be called before shutdown to ensure data integrity
- No-op in synchronous mode
- Only relevant when
-
close(): Close the publisher and cleanup resources- Flushes pending requests (if async mode) and closes connections
- Should be called when done using the publisher
Examples:
Synchronous Mode (Default):
import hiveq_data as hd
publisher = hd.Publisher()
# Add new records
result = publisher.publish(
schema='event_logs',
key='run_20240101_001',
operation='add',
data=[
{
'ts_event': '2024-01-01T10:00:00.000Z',
'strategy_id': 'my_strategy',
'user_id': 'trader_001',
'nav': 100000.0,
'realized_pnl': 1500.0,
'total_pnl': 1500.0,
'event_log_type': 'TRADE',
'sub_event_type': 'FILLED',
'symbol': 'AAPL',
'message': 'Trade executed'
}
]
)
# Modify existing records
result = publisher.publish(
schema='event_logs',
key='run_20240101_001',
operation='modify',
data=[
{
'ts_event': '2024-01-01T10:00:00.000Z',
'strategy_id': 'my_strategy',
'nav': 102000.0, # Updated
'realized_pnl': 3500.0 # Updated
}
]
)
publisher.close()Async Mode (High Performance):
import hiveq_data as hd
# Enable async mode for 10-100x performance
publisher = hd.Publisher(async_mode=True)
# Publish many records quickly (fire-and-forget)
for i in range(1000):
publisher.publish(
schema='event_logs',
key=f'run_{i}',
operation='add',
data=[{
'ts_event': '2024-01-01T10:00:00.000Z',
'strategy_id': 'my_strategy',
'nav': 100000.0,
'realized_pnl': 1500.0
}]
)
# Returns immediately without waiting
# Always flush before shutdown in async mode
publisher.flush()
publisher.close()SignalPublisher Client
**hd.SignalPublisher(bootstrap_servers=None, topic='user-signals', auto_reconnect=True, reconnect_interval=1.0, status_log_interval=30.0, **kafka_config)**
Kafka-based publisher for user signal data. Optimized for high-throughput signal publishing with low latency and automatic reconnection.
Parameters:
bootstrap_servers(str, optional): Kafka broker address (e.g., 'localhost:9092')- If None, loads from
KAFKA_BOOTSTRAP_SERVERSenvironment variable - Default: 'localhost:9092'
- If None, loads from
topic(str, optional): Kafka topic to publish to (default: 'user-signals')auto_reconnect(bool, optional): Enable automatic reconnection on connection failure (default: True)reconnect_interval(float, optional): Seconds between reconnection attempts (default: 1.0)status_log_interval(float, optional): Seconds between status log messages (default: 30.0)**kafka_config: Additional Kafka producer configuration- Examples:
acks='all',compression_type='gzip',retries=3 - See Kafka producer documentation for full list of options
- Examples:
Signal Format:
{
'data': {'signal': 0, 'weight': 0}, # Signal data dict
'user_id': 'Raj', # User identifier
'ts_event': '2025-11-05 19:00:02.267000', # Event timestamp (auto-generated)
'symbol': 'AAPL', # Symbol
'signal_version': 1, # Version (default: 1)
'signal_id': '39831f03-7022-4c0f-...' # UUID (auto-generated)
}Methods:
-
publish(data, symbol, user_id, ts_event=None, signal_version=1, signal_id=None, key=None): Publish a single signal to Kafkadata(dict, required): Signal data dict with 'signal' and 'weight' keys (e.g.,{'signal': 1, 'weight': 0.85})symbol(str, required): Symbol string (e.g., 'AAPL')user_id(str, required): User identifier stringts_event(datetime, optional): Event timestamp (auto-generated as current time if None)signal_version(int, optional): Signal version number (default: 1)signal_id(str, optional): Signal UUID (auto-generated if None)key(str, optional): Kafka partition key (defaults to symbol if not provided)- Raises
ValueErrorif signal data is invalid - Raises
KafkaErrorif publishing fails - Blocks until message is confirmed by Kafka
-
publish_batch(signals, key=None): Publish multiple signals to Kafkasignals(list, required): List of signal data dicts, each containing:data(dict, required): Signal data with 'signal' and 'weight' keyssymbol(str, required): Symbol stringuser_id(str, required): User identifierts_event(datetime, optional): Event timestamp (auto-generated if missing)signal_version(int, optional): Version number (default: 1)signal_id(str, optional): Signal UUID (auto-generated if missing)
key(str, optional): Kafka partition key (if None, uses symbol from each signal)- Returns: Number of successfully published signals (int)
- Flushes automatically after publishing all signals
-
flush(): Flush pending messages to Kafka- Blocks until all buffered messages are sent
- Useful when publishing is done but connection needs to stay open
-
close(): Close the Kafka producer connection- Flushes all pending messages and closes the connection gracefully
- Should be called when done publishing or use context manager
-
is_connected(): Check if the publisher is connected to Kafka- Returns: bool (True if connected, False otherwise)
- Use this to check connection status before publishing
-
get_connection_status(): Get detailed connection status information- Returns: dict with connection details including:
connected(bool): Whether currently connectedrunning(bool): Whether publisher is runningreconnect_attempts(int): Number of reconnection attemptsauto_reconnect(bool): Whether auto-reconnect is enabledbootstrap_servers(str): Kafka broker addresstopic(str): Current topic
- Returns: dict with connection details including:
Context Manager Support:
with hd.SignalPublisher(bootstrap_servers='localhost:9092') as publisher:
publisher.publish(
data={'signal': 1, 'weight': 0.85},
symbol='AAPL',
user_id='Raj'
)
# Automatically closedExample:
import hiveq_data as hd
# Create publisher
publisher = hd.SignalPublisher(
bootstrap_servers='localhost:9092',
topic='user-signals'
)
# Publish single signal
publisher.publish(
data={'signal': 1, 'weight': 0.85},
symbol='AAPL',
user_id='Raj'
)
# Publish batch
signals = [
{
'data': {'signal': 1, 'weight': 0.85},
'symbol': 'AAPL',
'user_id': 'Raj'
},
{
'data': {'signal': 0, 'weight': 0.0},
'symbol': 'MSFT',
'user_id': 'Raj'
}
]
count = publisher.publish_batch(signals)
print(f"Published {count} signals")
# Close
publisher.close()Advanced Configuration:
# With advanced Kafka settings
publisher = hd.SignalPublisher(
bootstrap_servers='localhost:9092',
topic='user-signals',
acks='all', # Wait for all replicas
compression_type='gzip', # Compress messages
retries=3, # Retry failed sends
max_in_flight_requests_per_connection=1 # Ensure ordering
)Features:
- High-throughput Kafka-based publishing
- Automatic reconnection on connection failure
- Connection status monitoring
- Key-based partitioning by symbol or custom key
- JSON serialization with UTF-8 encoding
- Batch publishing with automatic flush
- Environment variable configuration support
- Context manager for automatic cleanup
- Throttled reconnection logging
- Comprehensive error handling
- Configurable Kafka producer settings
InstrumentReference Client
hd.InstrumentReference(api_key=None, base_url=None)
Security/instrument metadata client that retrieves up-to-date reference information via API endpoints.
Important: This client uses API endpoints to fetch the latest instrument reference data and requires API credentials.
Methods:
get_instruments(symbols=None, asset_class=None): Generic method for any asset class (no date filtering)- Returns: Dictionary with
{"instruments": [...], "count": N}
- Returns: Dictionary with
Asset-Specific Methods:
-
get_equities(symbols=None): Get equity metadata- Symbols can be stock tickers (e.g., ['AAPL', 'MSFT'])
- Returns: Exchange info, multiplier, trading hours, etc.
- No date filtering needed for equities
-
get_futures(symbols, start_date=None, end_date=None, exchange=None, currency=None): Get futures contracts- Specific contracts: Databento symbology (e.g.,
ESM1,ESZ25)- Single digit years: Context-aware based on query dates (ESM1 = June 2021 or 2011)
- Two digit years: 2000+ century (ESZ25 = December 2025)
- Continuous contracts: Format
ROOT.RULE.POSITION(e.g.,ES.v.1)ES.v.0= Front month by volumeES.v.1= Second month by volume- Returns all contracts at that position during date range
- Date validation: Rejects contracts >2 years from query dates
- Results sorted by start_date (ascending)
- Specific contracts: Databento symbology (e.g.,
-
get_options(symbols, start_date=None, end_date=None): Get options contracts- Symbols: Underlying roots (e.g.,
['SPY']) - Returns contracts active during date range
- Symbols: Underlying roots (e.g.,
Return Format:
{
"count": 1,
"instruments": [
{
"symbol": "ESZ5",
"root": "ES",
"exch_mic": "XCME",
"currency": "USD",
"multiplier": 50,
"min_tick": 0.25,
"tick_value": 12.5,
"asset_type": "Fut",
"contract_month": "December",
"contract_year": 2025,
"expiry_date": "2025-12-01"
}
]
}Metadata Client
hd.Metadata(api_key=None, base_url=None)
Client for accessing HiveQ metadata API to retrieve dataset and schema information.
Methods:
-
get_datasets(): Get list of available datasets- Returns: Dict with available datasets
-
get_schemas(dataset): Get schemas for specified datasetsdataset(str or list): Dataset name(s) or ["*"] for all datasets- Returns: Dict with schema information for requested datasets
Examples:
import hiveq_data as hd
metadata = hd.Metadata()
# Get all datasets
datasets = metadata.get_datasets()
# Get schemas for all datasets
all_schemas = metadata.get_schemas(dataset=["*"])
# Get schemas for specific datasets
schemas = metadata.get_schemas(dataset=["HIVEQ_US_EQ", "HIVEQ_US_FUT"])
# Get schema for single dataset
schema = metadata.get_schemas(dataset="HIVEQ_US_EQ")LiveStream Client
LiveStream(host='localhost', port=8765, auto_reconnect=True, reconnect_interval=1.0, status_log_interval=30.0)
WebSocket-based client for streaming live market data in real-time. Supports multiple subscriptions with a single connection.
Parameters:
host(str, optional): WebSocket server host (default: 'localhost')port(int, optional): WebSocket server port (default: 8765)auto_reconnect(bool, optional): Auto-reconnect on disconnect (default: True)- Automatically resubscribes to all topics after successful reconnection
reconnect_interval(float, optional): Interval between reconnection attempts in seconds (default: 1.0)- Reconnection attempts happen every 1 second by default
- Can be customized based on your needs (e.g., 0.5 for 500ms, 5.0 for 5 seconds)
status_log_interval(float, optional): Interval for status monitoring logs in seconds (default: 30.0)- Controls how often connection status and reconnection attempts are logged
- Prevents log spam while still providing visibility
Methods:
-
async connect(): Connect to the WebSocket server- If
auto_reconnectis enabled and initial connection fails, starts reconnection attempts - Raises
ConnectionErrorif connection fails andauto_reconnectis disabled
- If
-
async disconnect(): Disconnect from the WebSocket server- Cancels all tasks and closes the connection
-
async subscribe(topic, key, callback): Subscribe to a (topic, key) pairtopic(str, required): Topic name (e.g., 'market_data.equity.bars_1m')key(str, required): Subscription key (e.g., symbol like 'AAPL')callback(callable, required): Function to call for each data record- Callback can be async or sync
- Multiple callbacks can be registered for same (topic, key)
- Can be called even when not connected - subscription will be applied when connection is established
- Sends subscription request to server if currently connected
-
async unsubscribe(topic, key, callback=None): Unsubscribe from a (topic, key) pairtopic(str, required): Topic namekey(str, required): Subscription keycallback(callable, optional): Specific callback to remove- If callback is None, removes all callbacks and unsubscribes from server
-
async wait_until_disconnected(): Wait until the client is fully stopped- Keeps the client running even during reconnection attempts
- Will only exit when
disconnect()is called or client stops running - Useful for keeping the client running in the main task
-
is_connected(): Check if currently connected- Returns:
Trueif connected,Falseotherwise
- Returns:
Context Manager Support:
async with LiveStream(host='localhost', port=8765) as stream:
await stream.subscribe('market_data.equity.bars_1m', 'AAPL', handle_data)
await stream.wait_until_disconnected()
# Automatically disconnectedExample:
import asyncio
from hiveq_data.live.client import LiveStream
async def handle_market_data(data):
print(f"Market Data: {data}")
async def main():
stream = LiveStream(host='localhost', port=8765)
await stream.connect()
# Subscribe to multiple topics with single callback
await stream.subscribe('market_data.equity.bars_1m', 'AAPL', handle_market_data)
await stream.subscribe('market_data.equity.bars_1m', 'MSFT', handle_market_data)
await stream.subscribe('market_data.equity.tbbo', 'AAPL', handle_market_data)
await stream.wait_until_disconnected()
await stream.disconnect()
asyncio.run(main())Advanced Usage
Context Manager Support
All clients support context managers for automatic cleanup:
import hiveq_data as hd
# Credentials auto-loaded from .env
with hd.Historical() as historical:
data = historical.get_data(
dataset='HIVEQ_US_EQ',
schema='bars_1s',
symbols='AAPL',
start='2025-08-01',
end='2025-08-30'
)
print(data)
# Session automatically closedPer-Client Credentials
Override auto-loaded credentials for specific clients:
import hiveq_data as hd
# Use .env credentials for most clients
prod_historical = hd.Historical()
# Use different credentials for specific client (e.g., local testing)
dev_historical = hd.Historical(
api_key='dev_api_key',
base_url='http://localhost:3000'
)Override Base URL for Development
import hiveq_data as hd
# Override base URL while keeping API key from .env
hd.configure(base_url='http://localhost:3000')
# All clients now use localhost
client = hd.Historical()License
MIT License