HiveQ Docs
HiveQ Data

Python Data API

HiveQ Data API

A Python client for accessing HiveQ's historical and live market data with ease.

Version: 0.2.0

Features

Five Powerful Clients

  1. Historical - Query historical market data with flexible date/time ranges (API-based)
  2. InstrumentReference - Access security metadata like multipliers, roots, symbols, etc. (API-based)
  3. Publisher - Publish strategy events, logs, and custom data to HiveQ destinations (API-based, supports async mode)
  4. Metadata - Get dataset and schema information from HiveQ platform (API-based)
  5. LiveStream - Stream live market data in real-time (API-based)

Additional Features

  • Centralized Configuration: Load credentials once from .env files, environment variables, or direct parameters
  • Fast JSON Parsing: Built on orjson for high-performance data processing
  • Async Publishing: Fire-and-forget publishing with 10-100x performance improvement
  • Automatic Rollover Detection: Futures contract rollover information with multiple methodologies

Installation

Install from PyPI (coming soon)

pip install hiveq-data

Development Installation

# Clone the repository
cd python-sdk

# Install in development mode
pip install -e .

# Or install with development dependencies
pip install -e ".[dev]"

Check Version

import hiveq_data as hd

print(hd.__version__)  # Outputs: 0.2.0

Configuration

No configuration needed! The SDK automatically loads credentials when you import it:

  1. Searches for .env file in your current directory and parent directories
  2. Falls back to environment variables (HIVEQ_API_KEY, HIVEQ_BASE_URL)

Just create a .env file (see .env.example):

HIVEQ_API_KEY=your_api_key_here
HIVEQ_BASE_URL=https://api.hiveq.com

That's it! No configure() call needed:

import hiveq_data as hd

# Credentials automatically loaded from .env!
client = hd.Historical()
data = client.get_data(...)

Manual Configuration (Optional)

You only need to call configure() if you want to:

  • Override auto-loaded values
  • Use a specific .env file location
  • Set credentials programmatically

Override base URL for local development

import hiveq_data as hd

# Auto-loads credentials from .env, but overrides base_url
hd.configure(base_url="http://localhost:3000")

Use a specific .env file

import hiveq_data as hd

hd.configure(env_file=".env.production")

Set credentials programmatically

import hiveq_data as hd

hd.configure(
    api_key='your_api_key_here',
    base_url='https://api.hiveq.com'
)

Use environment variables only

export HIVEQ_API_KEY=your_api_key_here
export HIVEQ_BASE_URL=https://api.hiveq.com
import hiveq_data as hd

# Auto-loads from environment variables
client = hd.Historical()

Logging Configuration (Optional)

The SDK uses Python's standard logging module. By default, logs are suppressed. Configure logging in your application to see SDK logs:

import logging
import hiveq_data as hd

# Basic configuration - show all SDK logs at INFO level
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Or configure only HiveQ SDK logs
logging.getLogger('hiveq_data').setLevel(logging.DEBUG)

# Create client - will now show debug logs
client = hd.Historical()

Log Levels:

  • DEBUG - Detailed information for debugging (API payloads, parsing details)
  • INFO - General informational messages (operations completed)
  • WARNING - Warning messages (fallback behaviors, missing data)
  • ERROR - Error messages (API failures, parsing errors)

Filter specific modules:

# Only show futures reference logs
logging.getLogger('hiveq_data.instrument_reference.futures_reference').setLevel(logging.DEBUG)

# Suppress async client logs
logging.getLogger('hiveq_data.internal.async_http_client').setLevel(logging.ERROR)

Quick Start

Make sure you have a .env file with your credentials (credentials are automatically loaded on import).

Historical Data

import hiveq_data as hd

# Create client (automatically uses .env credentials)
historical = hd.Historical()

# Equity data - use 'symbols' parameter
equity_data = historical.get_data(
    dataset='HIVEQ_US_EQ',
    schema='bars_1s',
    symbols=['AAPL', 'MSFT'],
    start='2025-08-01',
    end='2025-08-30'
)
print(equity_data)

# Futures data - Option 1: Query by root (all contracts for that root)
futures_by_root = historical.get_data(
    dataset='HIVEQ_US_FUT',
    schema='bars_1s',
    root=['ES', 'NQ'],  # E-mini S&P 500 and Nasdaq futures (all contracts)
    start='2025-08-01',
    end='2025-08-30'
)
print(futures_by_root)

# Futures data - Option 2: Query by specific contract symbols
futures_by_symbol = historical.get_data(
    dataset='HIVEQ_US_FUT',
    schema='bars_1s',
    symbols=['ESH25', 'NQM25'],  # Specific futures contracts
    start='2025-08-01',
    end='2025-08-30'
)
print(futures_by_symbol)

# Options data - Option 1: Query by chains (all options for underlying)
options_by_chain = historical.get_data(
    dataset='HIVEQ_US_OPT',
    schema='snaps_1s',
    chains=['SPY'],  # All SPY options
    start='2025-08-01',
    end='2025-08-30',
    expiration_date='2025-09-19',  # Optional: filter by expiration
    strike=450.0,  # Optional: filter by strike
    option_type='C'  # Optional: filter by option type (C=Call, P=Put)
)
print(options_by_chain)

# Options data - Option 2: Query by specific option symbols
options_by_symbol = historical.get_data(
    dataset='HIVEQ_US_OPT',
    schema='snaps_1s',
    symbols=['SPY250919C00450000'],  # Specific option contract
    start='2025-08-01',
    end='2025-08-30'
)
print(options_by_symbol)

# Parameters are validated based on asset class
try:
    # This will fail - neither 'symbols' nor 'root' provided for futures
    data = historical.get_data(
        dataset='HIVEQ_US_FUT',
        schema='bars_1s',
        start='2025-08-01',
        end='2025-08-30'
    )
except ValueError as e:
    print(f"Validation error: {e}")
    # Error: At least one of these parameters is required for schema 'bars_1s' (asset_class: futures): symbols, root

Instrument Reference

Query instrument metadata including contract specifications, multipliers, exchanges, and more via API endpoints.

Note: InstrumentReference requires API credentials and provides up-to-date reference information.

import hiveq_data as hd

# Create client (automatically uses .env credentials)
instruments = hd.InstrumentReference()

# Get equity metadata
equities = instruments.get_equities(symbols=['AAPL', 'MSFT'])
print(f"Found {equities['count']} equities")

# Futures: Specific contracts
# Supports Databento symbology (ESM1 = June 2021 or 2011 based on query dates)
futures = instruments.get_futures(
    symbols=['ESM1', 'ESZ5'],
    start_date='2021-05-01',
    end_date='2021-06-30'
)
# Returns: ESM1 parsed as June 2021 (context-aware)

# Futures: Continuous contracts
# Format: ROOT.RULE.POSITION (e.g., ES.v.1 = volume-based, position 1)
continuous = instruments.get_futures(
    symbols=['ES.v.1'],      # Second month by volume
    start_date='2020-01-01',
    end_date='2020-12-31'
)
# Returns: All contracts that were position 1 during 2020

# Other continuous contract examples:
# ES.v.0 - Front month by volume
# ES.c.0 - Front month by calendar
# ES.v.2 - Third month by volume

# Exchange and currency filters
futures_filtered = instruments.get_futures(
    symbols=['ES.v.0'],
    start_date='2024-01-01',
    end_date='2024-01-31',
    exchange='XCME',
    currency='USD'
)

# Options contracts
options = instruments.get_options(
    symbols=['SPY'],
    start_date='2025-08-01',
    end_date='2025-09-30'
)

Publisher

Note: Publisher requires API credentials and is used for writing data to HiveQ destinations. Supports both synchronous and asynchronous publishing modes.

Synchronous Mode (Default)

import hiveq_data as hd

# Create publisher client (uses .env credentials, synchronous by default)
publisher = hd.Publisher()

# Publish strategy event logs
result = publisher.publish(
    schema='event_logs',
    key='run_20240101_001',
    operation='add',  # 'add' for new records, 'modify' for updates
    data=[
        {
            'ts_event': '2024-01-01T10:00:00.000Z',
            'strategy_id': 'my_strategy',
            'user_id': 'trader_001',
            'nav': 100000.0,
            'realized_pnl': 1500.0,
            'total_pnl': 1500.0,
            'event_log_type': 'TRADE',
            'sub_event_type': 'FILLED',
            'symbol': 'AAPL',
            'message': 'Trade executed'
        }
    ]
)

print(f"Published {result['record_count']} records")

# Modify existing records (update operation)
result = publisher.publish(
    schema='event_logs',
    key='run_20240101_001',
    operation='modify',
    data=[
        {
            'ts_event': '2024-01-01T10:00:00.000Z',
            'strategy_id': 'my_strategy',
            'user_id': 'trader_001',
            'nav': 102000.0,  # Updated value
            'realized_pnl': 3500.0,  # Updated value
            'total_pnl': 3500.0
        }
    ]
)

# Close when done (good practice)
publisher.close()

Async Mode (Fire-and-Forget, High Performance)

For high-frequency publishing scenarios (e.g., backtesting), use async mode for 10-100x performance improvement:

import hiveq_data as hd

# Create publisher with async mode enabled
publisher = hd.Publisher(async_mode=True)

# Publish data (returns immediately, non-blocking)
for i in range(1000):
    publisher.publish(
        schema='event_logs',
        key=f'run_{i}',
        operation='add',
        data=[{
            'ts_event': '2024-01-01T10:00:00.000Z',
            'strategy_id': 'my_strategy',
            'nav': 100000.0,
            'realized_pnl': 1500.0
        }]
    )
    # Returns immediately without waiting for HTTP response

# Flush before shutdown to ensure all data is published
publisher.flush()
publisher.close()

Async Mode Benefits:

  • 10-100x faster for high-frequency publishing
  • Fire-and-forget: publish() returns immediately
  • Concurrent HTTP requests with connection pooling
  • Automatic retry with exponential backoff
  • Perfect for backtesting and batch publishing

Important: Always call flush() before shutdown when using async mode to ensure all data is published.

SignalPublisher

Note: SignalPublisher uses Kafka for high-throughput signal publishing. Requires Kafka broker access.

SignalPublisher is optimized for publishing user signals (trading signals, strategy signals, etc.) to Kafka topics with low latency and high throughput.

Signal Format:

{
    'data': {'signal': 0, 'weight': 0},           # Signal data dict
    'user_id': 'Raj',                           # User identifier
    'ts_event': '2025-11-05 19:00:02.267000',     # Event timestamp (auto-generated)
    'symbol': 'AAPL',                             # Symbol
    'signal_version': 1,                          # Version (default: 1)
    'signal_id': '39831f03-7022-4c0f-...'        # UUID (auto-generated)
}

Basic Usage

import hiveq_data as hd

# Create signal publisher
# Bootstrap servers can be set via KAFKA_BOOTSTRAP_SERVERS env variable
publisher = hd.SignalPublisher(
    bootstrap_servers='localhost:9092',
    topic='user-signals'
)

# Publish signal
publisher.publish(
    data={'signal': 1, 'weight': 0.85},
    symbol='AAPL',
    user_id='Raj'
)

# Close when done
publisher.close()

Context Manager Usage

import hiveq_data as hd

# Automatic cleanup with context manager
with hd.SignalPublisher(
    bootstrap_servers='localhost:9092',
    topic='user-signals'
) as publisher:
    publisher.publish(
        data={'signal': 1, 'weight': 0.85},
        symbol='AAPL',
        user_id='Raj'
    )
# Publisher automatically closed

Batch Publishing

import hiveq_data as hd

with hd.SignalPublisher() as publisher:
    signals = [
        {
            'data': {'signal': 1, 'weight': 0.85},
            'symbol': 'AAPL',
            'user_id': 'Raj'
        },
        {
            'data': {'signal': 1, 'weight': 0.72},
            'symbol': 'MSFT',
            'user_id': 'Raj'
        },
        {
            'data': {'signal': 0, 'weight': 0.0},
            'symbol': 'GOOG',
            'user_id': 'Raj'
        },
    ]

    count = publisher.publish_batch(signals)
    print(f"Published {count} signals")

Custom Partition Keys

import hiveq_data as hd

with hd.SignalPublisher() as publisher:
    # Use custom key for partitioning (e.g., trader ID)
    publisher.publish(
        data={'signal': 1, 'weight': 0.85},
        symbol='AAPL',
        user_id='Raj',
        key='trader-001'
    )

Auto-Reconnect

import hiveq_data as hd

# Enable automatic reconnection on connection failure
publisher = hd.SignalPublisher(
    bootstrap_servers='localhost:9092',
    topic='user-signals',
    auto_reconnect=True,          # Enable auto-reconnect (default: True)
    reconnect_interval=1.0,        # Retry every 1 second (default: 1.0)
    status_log_interval=30.0       # Log status every 30 seconds (default: 30.0)
)

# Check connection status
if publisher.is_connected():
    publisher.publish(
        data={'signal': 1, 'weight': 0.85},
        symbol='AAPL',
        user_id='Raj'
    )
    print("Signal published successfully")
else:
    print("Publisher not connected, waiting for reconnection...")

# Get detailed status
status = publisher.get_connection_status()
print(f"Status: {status}")

publisher.close()

Advanced Kafka Configuration

import hiveq_data as hd

# Pass additional Kafka producer config
publisher = hd.SignalPublisher(
    bootstrap_servers='localhost:9092',
    topic='user-signals',
    acks='all',  # Wait for all replicas
    compression_type='gzip',  # Compress messages
    retries=3,  # Retry failed sends
    max_in_flight_requests_per_connection=1  # Ensure ordering
)

Features:

  • Kafka-based high-throughput publishing
  • Automatic reconnection on connection failure
  • Key-based partitioning by symbol or custom key
  • JSON serialization with UTF-8 encoding
  • Batch publishing support
  • Context manager support for automatic cleanup
  • Configurable Kafka producer settings
  • Connection status monitoring
  • Throttled reconnection logging
  • Error handling with retry logic

Metadata

Note: Metadata client requires API credentials and provides access to dataset and schema information.

import hiveq_data as hd

# Create metadata client
metadata = hd.Metadata()

# Get all available datasets
datasets = metadata.get_datasets()
print(f"Available datasets: {datasets}")

# Get schemas for all datasets
all_schemas = metadata.get_schemas(dataset=["*"])

# Get schemas for specific datasets
schemas = metadata.get_schemas(dataset=["HIVEQ_US_EQ", "HIVEQ_US_FUT"])

# Get schema for single dataset
schema = metadata.get_schemas(dataset="HIVEQ_US_EQ")
print(f"HIVEQ_US_EQ schemas: {schema}")

LiveStream

Note: LiveStream client connects to a WebSocket server for real-time market data streaming. It supports multiple subscriptions with a single connection, automatic reconnection every 1 second (configurable), and connection status monitoring every 30 seconds (configurable).

Basic Usage

import asyncio
from hiveq_data.live.client import LiveStream

# Define single callback for handling all market data
async def handle_market_data(data):
    print(f"Market Data: {data}")

async def main():
    # Create LiveStream client
    # - Reconnects every 1 second
    # - Logs status every 30 seconds
    stream = LiveStream(
        host='localhost',
        port=8765,
        auto_reconnect=True
    )

    # Connect to WebSocket server
    await stream.connect()

    # Subscribe to market data using single callback
    await stream.subscribe('market_data.equity.bars_1m', ['AAPL', 'MSFT'], handle_market_data)
    await stream.subscribe('market_data.equity.tbbo', 'AAPL', handle_market_data)

    # Keep running
    await stream.wait_until_disconnected()

    # Cleanup
    await stream.disconnect()

# Run the stream
asyncio.run(main())

Using Context Manager

import asyncio
from hiveq_data.live.client import LiveStream

async def handle_market_data(data):
    print(f"Market Data: {data}")

async def main():
    # Automatic connection management with context manager
    async with LiveStream(host='localhost', port=8765) as stream:
        await stream.subscribe('market_data.equity.bars_1m', ['AAPL'], handle_market_data)
        await stream.subscribe('market_data.equity.tbbo', ['MSFT'], handle_market_data)

        # Run for 60 seconds
        await asyncio.sleep(60)
    # Stream automatically disconnected

asyncio.run(main())

Multiple Subscriptions

# One client can handle multiple subscriptions with single callback
async def handle_market_data(data):
    print(f"Market Data: {data}")

async with LiveStream(host='localhost', port=8765) as stream:
    # Subscribe to different topics with same callback
    await stream.subscribe('market_data.equity.bars_1m', ['AAPL', 'MSFT'], handle_market_data)
    await stream.subscribe('market_data.equity.tbbo', ['AAPL', 'MSFT'], handle_market_data)

    await stream.wait_until_disconnected()

Managing Subscriptions

async def handle_market_data(data):
    print(f"Market Data: {data}")

async with LiveStream(host='localhost', port=8765) as stream:
    # Subscribe
    await stream.subscribe('market_data.equity.bars_1m', ['AAPL'], handle_market_data)

    # Run for a while
    await asyncio.sleep(30)

    # Unsubscribe specific callback
    await stream.unsubscribe('market_data.equity.bars_1m', 'AAPL', handle_market_data)

    # Or unsubscribe all callbacks for a topic/key
    await stream.unsubscribe('market_data.equity.bars_1m', 'AAPL')

Features:

  • WebSocket-based real-time data streaming
  • Multiple subscriptions with single connection and single callback
  • Fast automatic reconnection every 1 second (configurable)
  • Auto-resubscription to all topics after reconnection
  • Connection status monitoring every 30 seconds with throttled logging
  • Can add subscriptions even when not connected
  • Async/await support
  • Context manager support for automatic cleanup
  • Thread-safe subscription management

Discovering Available Datasets and Schemas

To get the latest information about available datasets and schemas, use the Metadata client:

import hiveq_data as hd

# Create metadata client
metadata = hd.Metadata()

# Get all available datasets
datasets = metadata.get_datasets()
print("Available datasets:", datasets)

# Get all schemas for all datasets
all_schemas = metadata.get_schemas(dataset=["*"])

# Get schemas for specific dataset
eq_schemas = metadata.get_schemas(dataset="HIVEQ_US_EQ")
print("HIVEQ_US_EQ schemas:", eq_schemas)

Why use Metadata client?

  • Always returns current, up-to-date information
  • Includes detailed schema descriptions and parameters
  • Programmatically discover what's available
  • No hardcoded lists that can become stale

Parameter Requirements by Asset Class

Equity (HIVEQ_US_EQ):

  • Required: symbols, start, end
  • Optional: limit, offset, columns

Futures (HIVEQ_US_FUT):

  • Required: start, end, AND one of: symbols OR root
  • Optional: limit, offset, columns

Options (HIVEQ_US_OPT):

  • Required: start, end, AND one of: symbols OR chains
  • Optional: limit, offset, columns, expiration_date, strike, option_type

API Reference

Configuration

hd.configure(api_key=None, base_url=None, env_file=None) (Optional)

Configure API credentials globally. Usually not needed as credentials are automatically loaded from .env files or environment variables.

Only call this function if you need to:

  • Override auto-loaded values
  • Specify a non-standard .env file location
  • Set credentials programmatically

Parameters:

  • api_key (str, optional): Your HiveQ API key (overrides auto-loaded value)
  • base_url (str, optional): API base URL (overrides auto-loaded value)
  • env_file (str, optional): Path to specific .env file to load

Historical Client

hd.Historical(api_key=None, base_url=None)

Historical market data client with automatic parameter validation.

Methods:

  • get_data(dataset, schema, symbols=None, root=None, chains=None, start=None, end=None, limit=None, **kwargs): Fetch historical data
    • dataset (str, required): Dataset name (e.g., 'HIVEQ_US_EQ', 'HIVEQ_US_FUT', 'HIVEQ_US_OPT')
    • schema (str, required): Schema name (e.g., 'bars_1s', 'trades')
    • symbols (str|list, optional): Symbol(s) for equity data
    • root (str|list, optional): Root symbol(s) for futures data
    • chains (str|list, optional): Underlying symbol(s) for options data
    • start (str|date|datetime, required): Start date/datetime
    • end (str|date|datetime, required): End date/datetime
    • limit (int, optional): Maximum records to return
    • **kwargs: Additional optional parameters (e.g., expiration_date, strike, option_type for options)
    • Automatically validates parameters based on dataset's asset class
    • Raises ValueError if required parameters are missing or invalid parameters are provided

Parameter Requirements by Asset Class:

  • Equity (HIVEQ_US_EQ):

    • Required: symbols - Stock ticker symbols (e.g., ['AAPL', 'MSFT'])
  • Futures (HIVEQ_US_FUT):

    • Required (choose one):
      • root - Futures root symbols (e.g., ['ES', 'NQ']) - Returns all contracts for that root
      • symbols - Specific contract symbols (e.g., ['ESH25', 'NQM25'])
      • Both can be provided together
  • Options (HIVEQ_US_OPT):

    • Required (choose one):
      • chains - Underlying symbols (e.g., ['SPY']) - Returns all options for that underlying
      • symbols - Specific option symbols (e.g., ['SPY250919C00450000'])
      • Both can be provided together

Publisher Client

hd.Publisher(api_key=None, base_url=None, async_mode=False)

Data publisher client for writing data to HiveQ destinations. Supports both synchronous and asynchronous publishing modes.

Parameters:

  • api_key (str, optional): API key (overrides global config)
  • base_url (str, optional): Base URL (overrides global config)
  • async_mode (bool, optional): If True, uses async HTTP for fire-and-forget publishes (10-100x faster). Default: False for backward compatibility

Methods:

  • publish(schema, data, key, operation='add'): Publish data to a schema

    • schema (str, required): Schema name (e.g., 'event_logs')
    • data (list, required): List of records to publish (each record is a dict)
    • key (str|list, required): Unique key identifier(s) for tracking and mapping all published data. Can be a single string or a list of strings
    • operation (str, optional): Operation type - 'add' (insert new records) or 'modify' (update existing). Default: 'add'
    • Returns: Response with status and record count (immediate in async mode)
    • Raises ValueError if validation fails or invalid operation type
  • flush(): Wait for all in-flight async publishes to complete

    • Only relevant when async_mode=True
    • Blocks until all pending async publish operations finish
    • Should be called before shutdown to ensure data integrity
    • No-op in synchronous mode
  • close(): Close the publisher and cleanup resources

    • Flushes pending requests (if async mode) and closes connections
    • Should be called when done using the publisher

Examples:

Synchronous Mode (Default):

import hiveq_data as hd

publisher = hd.Publisher()

# Add new records
result = publisher.publish(
    schema='event_logs',
    key='run_20240101_001',
    operation='add',
    data=[
        {
            'ts_event': '2024-01-01T10:00:00.000Z',
            'strategy_id': 'my_strategy',
            'user_id': 'trader_001',
            'nav': 100000.0,
            'realized_pnl': 1500.0,
            'total_pnl': 1500.0,
            'event_log_type': 'TRADE',
            'sub_event_type': 'FILLED',
            'symbol': 'AAPL',
            'message': 'Trade executed'
        }
    ]
)

# Modify existing records
result = publisher.publish(
    schema='event_logs',
    key='run_20240101_001',
    operation='modify',
    data=[
        {
            'ts_event': '2024-01-01T10:00:00.000Z',
            'strategy_id': 'my_strategy',
            'nav': 102000.0,  # Updated
            'realized_pnl': 3500.0  # Updated
        }
    ]
)

publisher.close()

Async Mode (High Performance):

import hiveq_data as hd

# Enable async mode for 10-100x performance
publisher = hd.Publisher(async_mode=True)

# Publish many records quickly (fire-and-forget)
for i in range(1000):
    publisher.publish(
        schema='event_logs',
        key=f'run_{i}',
        operation='add',
        data=[{
            'ts_event': '2024-01-01T10:00:00.000Z',
            'strategy_id': 'my_strategy',
            'nav': 100000.0,
            'realized_pnl': 1500.0
        }]
    )
    # Returns immediately without waiting

# Always flush before shutdown in async mode
publisher.flush()
publisher.close()

SignalPublisher Client

**hd.SignalPublisher(bootstrap_servers=None, topic='user-signals', auto_reconnect=True, reconnect_interval=1.0, status_log_interval=30.0, **kafka_config)**

Kafka-based publisher for user signal data. Optimized for high-throughput signal publishing with low latency and automatic reconnection.

Parameters:

  • bootstrap_servers (str, optional): Kafka broker address (e.g., 'localhost:9092')
    • If None, loads from KAFKA_BOOTSTRAP_SERVERS environment variable
    • Default: 'localhost:9092'
  • topic (str, optional): Kafka topic to publish to (default: 'user-signals')
  • auto_reconnect (bool, optional): Enable automatic reconnection on connection failure (default: True)
  • reconnect_interval (float, optional): Seconds between reconnection attempts (default: 1.0)
  • status_log_interval (float, optional): Seconds between status log messages (default: 30.0)
  • **kafka_config: Additional Kafka producer configuration
    • Examples: acks='all', compression_type='gzip', retries=3
    • See Kafka producer documentation for full list of options

Signal Format:

{
    'data': {'signal': 0, 'weight': 0},           # Signal data dict
    'user_id': 'Raj',                           # User identifier
    'ts_event': '2025-11-05 19:00:02.267000',     # Event timestamp (auto-generated)
    'symbol': 'AAPL',                             # Symbol
    'signal_version': 1,                          # Version (default: 1)
    'signal_id': '39831f03-7022-4c0f-...'        # UUID (auto-generated)
}

Methods:

  • publish(data, symbol, user_id, ts_event=None, signal_version=1, signal_id=None, key=None): Publish a single signal to Kafka

    • data (dict, required): Signal data dict with 'signal' and 'weight' keys (e.g., {'signal': 1, 'weight': 0.85})
    • symbol (str, required): Symbol string (e.g., 'AAPL')
    • user_id (str, required): User identifier string
    • ts_event (datetime, optional): Event timestamp (auto-generated as current time if None)
    • signal_version (int, optional): Signal version number (default: 1)
    • signal_id (str, optional): Signal UUID (auto-generated if None)
    • key (str, optional): Kafka partition key (defaults to symbol if not provided)
    • Raises ValueError if signal data is invalid
    • Raises KafkaError if publishing fails
    • Blocks until message is confirmed by Kafka
  • publish_batch(signals, key=None): Publish multiple signals to Kafka

    • signals (list, required): List of signal data dicts, each containing:
      • data (dict, required): Signal data with 'signal' and 'weight' keys
      • symbol (str, required): Symbol string
      • user_id (str, required): User identifier
      • ts_event (datetime, optional): Event timestamp (auto-generated if missing)
      • signal_version (int, optional): Version number (default: 1)
      • signal_id (str, optional): Signal UUID (auto-generated if missing)
    • key (str, optional): Kafka partition key (if None, uses symbol from each signal)
    • Returns: Number of successfully published signals (int)
    • Flushes automatically after publishing all signals
  • flush(): Flush pending messages to Kafka

    • Blocks until all buffered messages are sent
    • Useful when publishing is done but connection needs to stay open
  • close(): Close the Kafka producer connection

    • Flushes all pending messages and closes the connection gracefully
    • Should be called when done publishing or use context manager
  • is_connected(): Check if the publisher is connected to Kafka

    • Returns: bool (True if connected, False otherwise)
    • Use this to check connection status before publishing
  • get_connection_status(): Get detailed connection status information

    • Returns: dict with connection details including:
      • connected (bool): Whether currently connected
      • running (bool): Whether publisher is running
      • reconnect_attempts (int): Number of reconnection attempts
      • auto_reconnect (bool): Whether auto-reconnect is enabled
      • bootstrap_servers (str): Kafka broker address
      • topic (str): Current topic

Context Manager Support:

with hd.SignalPublisher(bootstrap_servers='localhost:9092') as publisher:
    publisher.publish(
        data={'signal': 1, 'weight': 0.85},
        symbol='AAPL',
        user_id='Raj'
    )
# Automatically closed

Example:

import hiveq_data as hd

# Create publisher
publisher = hd.SignalPublisher(
    bootstrap_servers='localhost:9092',
    topic='user-signals'
)

# Publish single signal
publisher.publish(
    data={'signal': 1, 'weight': 0.85},
    symbol='AAPL',
    user_id='Raj'
)

# Publish batch
signals = [
    {
        'data': {'signal': 1, 'weight': 0.85},
        'symbol': 'AAPL',
        'user_id': 'Raj'
    },
    {
        'data': {'signal': 0, 'weight': 0.0},
        'symbol': 'MSFT',
        'user_id': 'Raj'
    }
]
count = publisher.publish_batch(signals)
print(f"Published {count} signals")

# Close
publisher.close()

Advanced Configuration:

# With advanced Kafka settings
publisher = hd.SignalPublisher(
    bootstrap_servers='localhost:9092',
    topic='user-signals',
    acks='all',  # Wait for all replicas
    compression_type='gzip',  # Compress messages
    retries=3,  # Retry failed sends
    max_in_flight_requests_per_connection=1  # Ensure ordering
)

Features:

  • High-throughput Kafka-based publishing
  • Automatic reconnection on connection failure
  • Connection status monitoring
  • Key-based partitioning by symbol or custom key
  • JSON serialization with UTF-8 encoding
  • Batch publishing with automatic flush
  • Environment variable configuration support
  • Context manager for automatic cleanup
  • Throttled reconnection logging
  • Comprehensive error handling
  • Configurable Kafka producer settings

InstrumentReference Client

hd.InstrumentReference(api_key=None, base_url=None)

Security/instrument metadata client that retrieves up-to-date reference information via API endpoints.

Important: This client uses API endpoints to fetch the latest instrument reference data and requires API credentials.

Methods:

  • get_instruments(symbols=None, asset_class=None): Generic method for any asset class (no date filtering)
    • Returns: Dictionary with {"instruments": [...], "count": N}

Asset-Specific Methods:

  • get_equities(symbols=None): Get equity metadata

    • Symbols can be stock tickers (e.g., ['AAPL', 'MSFT'])
    • Returns: Exchange info, multiplier, trading hours, etc.
    • No date filtering needed for equities
  • get_futures(symbols, start_date=None, end_date=None, exchange=None, currency=None): Get futures contracts

    • Specific contracts: Databento symbology (e.g., ESM1, ESZ25)
      • Single digit years: Context-aware based on query dates (ESM1 = June 2021 or 2011)
      • Two digit years: 2000+ century (ESZ25 = December 2025)
    • Continuous contracts: Format ROOT.RULE.POSITION (e.g., ES.v.1)
      • ES.v.0 = Front month by volume
      • ES.v.1 = Second month by volume
      • Returns all contracts at that position during date range
    • Date validation: Rejects contracts >2 years from query dates
    • Results sorted by start_date (ascending)
  • get_options(symbols, start_date=None, end_date=None): Get options contracts

    • Symbols: Underlying roots (e.g., ['SPY'])
    • Returns contracts active during date range

Return Format:

{
    "count": 1,
    "instruments": [
        {
            "symbol": "ESZ5",
            "root": "ES",
            "exch_mic": "XCME",
            "currency": "USD",
            "multiplier": 50,
            "min_tick": 0.25,
            "tick_value": 12.5,
            "asset_type": "Fut",
            "contract_month": "December",
            "contract_year": 2025,
            "expiry_date": "2025-12-01"
        }
    ]
}

Metadata Client

hd.Metadata(api_key=None, base_url=None)

Client for accessing HiveQ metadata API to retrieve dataset and schema information.

Methods:

  • get_datasets(): Get list of available datasets

    • Returns: Dict with available datasets
  • get_schemas(dataset): Get schemas for specified datasets

    • dataset (str or list): Dataset name(s) or ["*"] for all datasets
    • Returns: Dict with schema information for requested datasets

Examples:

import hiveq_data as hd

metadata = hd.Metadata()

# Get all datasets
datasets = metadata.get_datasets()

# Get schemas for all datasets
all_schemas = metadata.get_schemas(dataset=["*"])

# Get schemas for specific datasets
schemas = metadata.get_schemas(dataset=["HIVEQ_US_EQ", "HIVEQ_US_FUT"])

# Get schema for single dataset
schema = metadata.get_schemas(dataset="HIVEQ_US_EQ")

LiveStream Client

LiveStream(host='localhost', port=8765, auto_reconnect=True, reconnect_interval=1.0, status_log_interval=30.0)

WebSocket-based client for streaming live market data in real-time. Supports multiple subscriptions with a single connection.

Parameters:

  • host (str, optional): WebSocket server host (default: 'localhost')
  • port (int, optional): WebSocket server port (default: 8765)
  • auto_reconnect (bool, optional): Auto-reconnect on disconnect (default: True)
    • Automatically resubscribes to all topics after successful reconnection
  • reconnect_interval (float, optional): Interval between reconnection attempts in seconds (default: 1.0)
    • Reconnection attempts happen every 1 second by default
    • Can be customized based on your needs (e.g., 0.5 for 500ms, 5.0 for 5 seconds)
  • status_log_interval (float, optional): Interval for status monitoring logs in seconds (default: 30.0)
    • Controls how often connection status and reconnection attempts are logged
    • Prevents log spam while still providing visibility

Methods:

  • async connect(): Connect to the WebSocket server

    • If auto_reconnect is enabled and initial connection fails, starts reconnection attempts
    • Raises ConnectionError if connection fails and auto_reconnect is disabled
  • async disconnect(): Disconnect from the WebSocket server

    • Cancels all tasks and closes the connection
  • async subscribe(topic, key, callback): Subscribe to a (topic, key) pair

    • topic (str, required): Topic name (e.g., 'market_data.equity.bars_1m')
    • key (str, required): Subscription key (e.g., symbol like 'AAPL')
    • callback (callable, required): Function to call for each data record
    • Callback can be async or sync
    • Multiple callbacks can be registered for same (topic, key)
    • Can be called even when not connected - subscription will be applied when connection is established
    • Sends subscription request to server if currently connected
  • async unsubscribe(topic, key, callback=None): Unsubscribe from a (topic, key) pair

    • topic (str, required): Topic name
    • key (str, required): Subscription key
    • callback (callable, optional): Specific callback to remove
    • If callback is None, removes all callbacks and unsubscribes from server
  • async wait_until_disconnected(): Wait until the client is fully stopped

    • Keeps the client running even during reconnection attempts
    • Will only exit when disconnect() is called or client stops running
    • Useful for keeping the client running in the main task
  • is_connected(): Check if currently connected

    • Returns: True if connected, False otherwise

Context Manager Support:

async with LiveStream(host='localhost', port=8765) as stream:
    await stream.subscribe('market_data.equity.bars_1m', 'AAPL', handle_data)
    await stream.wait_until_disconnected()
# Automatically disconnected

Example:

import asyncio
from hiveq_data.live.client import LiveStream

async def handle_market_data(data):
    print(f"Market Data: {data}")

async def main():
    stream = LiveStream(host='localhost', port=8765)
    await stream.connect()

    # Subscribe to multiple topics with single callback
    await stream.subscribe('market_data.equity.bars_1m', 'AAPL', handle_market_data)
    await stream.subscribe('market_data.equity.bars_1m', 'MSFT', handle_market_data)
    await stream.subscribe('market_data.equity.tbbo', 'AAPL', handle_market_data)

    await stream.wait_until_disconnected()
    await stream.disconnect()

asyncio.run(main())

Advanced Usage

Context Manager Support

All clients support context managers for automatic cleanup:

import hiveq_data as hd

# Credentials auto-loaded from .env
with hd.Historical() as historical:
    data = historical.get_data(
        dataset='HIVEQ_US_EQ',
        schema='bars_1s',
        symbols='AAPL',
        start='2025-08-01',
        end='2025-08-30'
    )
    print(data)
# Session automatically closed

Per-Client Credentials

Override auto-loaded credentials for specific clients:

import hiveq_data as hd

# Use .env credentials for most clients
prod_historical = hd.Historical()

# Use different credentials for specific client (e.g., local testing)
dev_historical = hd.Historical(
    api_key='dev_api_key',
    base_url='http://localhost:3000'
)

Override Base URL for Development

import hiveq_data as hd

# Override base URL while keeping API key from .env
hd.configure(base_url='http://localhost:3000')

# All clients now use localhost
client = hd.Historical()

License

MIT License