HiveQ Docs
HiveQ Data

API Reference

HiveQ Data API Reference

Complete API reference for the HiveQ Data Python SDK.

Version: 0.2.1

Table of Contents


Configuration

hiveq_data.configure()

Configure global API credentials. Optional - credentials are automatically loaded from .env files or environment variables.

Parameters:

hiveq_data.configure(
  api_key: Optional[str] = None,      # Your HiveQ API key
  base_url: Optional[str] = None,     # API base URL (default: https://api.hiveq.com)
  user_id: Optional[str] = None,      # User ID for request headers (X-User-ID)
  org_id: Optional[str] = None,       # Organization ID for request headers (X-Org-ID)
  user_name: Optional[str] = None     # User name for request headers (X-User-Name)
)

Example:

import hiveq_data as hd

# Override auto-loaded credentials
hd.configure(
  api_key="your_api_key_here",
  base_url="https://api.hiveq.com",
  user_id="your_user_id",
  org_id="your_org_id",
  user_name="your_username"
)

# Check SDK version
print(hd.__version__)  # Outputs: 0.2.1

Logging Configuration

The SDK uses Python's standard logging module and follows library best practices. By default, all logs are suppressed via NullHandler. Configure logging in your application to see SDK logs.

Basic Configuration:

import logging
import hiveq_data as hd

# Show all SDK logs at INFO level
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

client = hd.Historical()

Configure Only HiveQ Logs:

import logging

# Set level for entire SDK
logging.getLogger('hiveq_data').setLevel(logging.DEBUG)

# Or configure specific modules
logging.getLogger('hiveq_data.instrument_reference.futures_reference').setLevel(logging.DEBUG)
logging.getLogger('hiveq_data.internal.async_http_client').setLevel(logging.WARNING)

Log Levels:

LevelDescriptionUse Case
DEBUGDetailed debugging info (API payloads, parsing details)Development/troubleshooting
INFOGeneral informational messages (operations completed)Production monitoring
WARNINGWarning messages (fallback behaviors, missing data)Production (default)
ERRORError messages (API failures, parsing errors)Production alerts

Logger Hierarchy:

  • hiveq_data - Root logger
    • hiveq_data.base_client - Base HTTP client
    • hiveq_data.historical - Historical data client
    • hiveq_data.instrument_reference - Instrument metadata
      • hiveq_data.instrument_reference.futures_reference - Futures contracts
      • hiveq_data.instrument_reference.options_reference - Options contracts
      • hiveq_data.instrument_reference.equity_referene - Equity data
    • hiveq_data.publisher - Data publishing
    • hiveq_data.internal.async_http_client - Async HTTP client
    • hiveq_data.live - Live streaming

Example: Production Logging Setup:

import logging

# Configure file logging for production
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('hiveq_sdk.log'),
        logging.StreamHandler()  # Also log to console
    ]
)

# Set SDK to WARNING (only show warnings and errors)
logging.getLogger('hiveq_data').setLevel(logging.WARNING)

# But keep detailed logs for debugging specific issues
logging.getLogger('hiveq_data.publisher').setLevel(logging.DEBUG)

Historical Client

hd.Historical()

Client for querying historical market data with automatic parameter validation.

Initialization:

client = hd.Historical(
  api_key: Optional[str] = None,      # Override global API key
  base_url: Optional[str] = None,     # Override global base URL
  user_id: Optional[str] = None,      # User ID for request headers
  org_id: Optional[str] = None,       # Organization ID for request headers
  user_name: Optional[str] = None     # User name for request headers
)

client.get_data()

Fetch historical market data with flexible parameters based on asset class.

Parameters:

client.get_data(
  dataset: str,                                    # Required: Dataset name
  schema: str,                                     # Required: Schema name
  symbols: Optional[Union[str, List[str]]] = None, # Equity symbols or specific contract symbols
  root: Optional[Union[str, List[str]]] = None,    # Futures root symbols
  chains: Optional[Union[str, List[str]]] = None,  # Options underlying symbols
  start: Union[str, date, datetime],               # Required: Start date/datetime
  end: Union[str, date, datetime],                 # Required: End date/datetime
  limit: Optional[int] = None,                     # Max records to return (for pagination)
  offset: Optional[int] = None,                    # Number of records to skip (for pagination)
  filter_mode: Optional[str] = None,               # 'continuous' (default) or 'session'
  columns: Optional[List[str]] = None,             # Specific columns to return
  **kwargs                                         # Additional schema-specific parameters
)

Parameter Requirements by Asset Class:

Asset ClassRequired ParametersOptional Parameters
Equity (HIVEQ_US_EQ)symbols, start, endlimit, offset, columns
Futures (HIVEQ_US_FUT)start, end, one of: symbols OR rootlimit, offset, columns
Options (HIVEQ_US_OPT)start, end, one of: symbols OR chainslimit, offset, columns, expiration_date, strike, option_type

Examples:

Equity Data:

import hiveq_data as hd

client = hd.Historical()

# Query equity bars
equity_data = client.get_data(
  dataset='HIVEQ_US_EQ',
  schema='bars_1s',
  symbols=['AAPL', 'MSFT'],
  start='2025-08-01',
  end='2025-08-30',
  limit=1000
)

Pagination Example:

# Get first 1000 records
page1 = client.get_data(
  dataset='HIVEQ_US_EQ',
  schema='bars_1s',
  symbols=['AAPL'],
  start='2025-08-01',
  end='2025-08-30',
  limit=1000,
  offset=0
)

# Get next 1000 records (skip first 1000)
page2 = client.get_data(
  dataset='HIVEQ_US_EQ',
  schema='bars_1s',
  symbols=['AAPL'],
  start='2025-08-01',
  end='2025-08-30',
  limit=1000,
  offset=1000
)

# Pagination loop example
def fetch_all_data(dataset, schema, symbols, start, end, page_size=1000):
  all_data = []
  offset = 0

  while True:
    page = client.get_data(
      dataset=dataset,
      schema=schema,
      symbols=symbols,
      start=start,
      end=end,
      limit=page_size,
      offset=offset
    )

    if not page['data'] or len(page['data']) == 0:
      break

    all_data.extend(page['data'])
    offset += page_size

    # Break if we got fewer records than page_size (last page)
    if len(page['data']) < page_size:
      break

  return all_data

Response Format:

{
  "data": [
    {
      "date": "2025-08-01",
      "time": "2025-08-01T09:30:00.000000000",
      "symbol": "AAPL",
      "open": 185.5,
      "high": 186.2,
      "low": 185.3,
      "close": 186.0,
      "volume": 125000.0,
      "ts_event": "2025-08-01T09:30:00.000000000"
    }
  ],
  "schema": "bars_1s",
  "dataset": "HIVEQ_US_EQ"
}

Futures Data by Root:

# Query all contracts for ES and NQ roots
futures_data = client.get_data(
  dataset='HIVEQ_US_FUT',
  schema='bars_1s',
  root=['ES', 'NQ'],  # All ES and NQ contracts
  start='2025-08-01',
  end='2025-08-30'
)

Futures Data by Specific Contracts:

# Query specific futures contracts
futures_data = client.get_data(
  dataset='HIVEQ_US_FUT',
  schema='bars_1s',
  symbols=['ES.H25', 'NQ.M25'],  # Specific contracts
  start='2025-08-01',
  end='2025-08-30'
)

Futures Response Format:

{
  "data": [
    {
      "date": "2025-08-01",
      "time": "2025-08-01T09:30:00.000000000",
      "symbol": "ES.H25",
      "root": "ES",
      "open": 4500.25,
      "high": 4505.5,
      "low": 4498.75,
      "close": 4502.0,
      "volume": 50000.0,
      "ts_event": "2025-08-01T09:30:00.000000000"
    }
  ],
  "schema": "bars_1s",
  "dataset": "HIVEQ_US_FUT"
}

Options Data by Chains:

# Query all options for SPY underlying
options_data = client.get_data(
  dataset='HIVEQ_US_OPT',
  schema='snaps_1s',
  chains=['SPY'],  # All SPY options
  start='2025-08-01',
  end='2025-08-30',
  expiration_date='2025-09-19',  # Optional: filter by expiration
  strike=450.0,                  # Optional: filter by strike
  option_type='C'                # Optional: C=Call, P=Put
)

Options Data by Specific Symbols:

# Query specific option contracts
options_data = client.get_data(
  dataset='HIVEQ_US_OPT',
  schema='snaps_1s',
  symbols=['SPY250919C00450000'],
  start='2025-08-01',
  end='2025-08-30'
)

Options Response Format:

{
  "data": [
    {
      "date": "2025-08-01",
      "time": "2025-08-01T09:30:00.000000000",
      "symbol": "SPY250919C00450000",
      "underlying": "SPY",
      "strike": 450.0,
      "expiration": "2025-09-19",
      "option_type": "C",
      "bid_px": 12.5,
      "ask_px": 12.55,
      "bid_sz": 100,
      "ask_sz": 150,
      "implied_vol": 0.185,
      "delta": 0.52,
      "gamma": 0.015,
      "theta": -0.08,
      "vega": 0.35
    }
  ],
  "schema": "snaps_1s",
  "dataset": "HIVEQ_US_OPT"
}

InstrumentReference Client

hd.InstrumentReference()

API-based client for accessing instrument metadata.

Initialization:

client = hd.InstrumentReference(
  api_key: Optional[str] = None,      # Override global API key
  base_url: Optional[str] = None,     # Override global base URL
  user_id: Optional[str] = None,      # User ID for request headers
  org_id: Optional[str] = None,       # Organization ID for request headers
  user_name: Optional[str] = None     # User name for request headers
)

client.get_instruments()

Generic method to fetch instrument metadata for any asset class.

Parameters:

client.get_instruments(
  symbols: Optional[List[str]] = None,              # Symbol(s) to filter
  asset_class: Optional[str] = None,                # 'equity', 'futures', 'options', or 'index'
  start_date: Optional[str] = None,                 # Start date (for futures/options)
  end_date: Optional[str] = None,                   # End date (for futures/options)
  exchange: Optional[str] = None,                   # Filter by exchange
  currency: Optional[str] = None,                   # Filter by currency
  limit: Optional[int] = None,                      # Max records to return
  offset: Optional[int] = None                      # Records to skip
)

Example:

import hiveq_data as hd

client = hd.InstrumentReference()

# Get all equity instruments
equities = client.get_instruments(asset_class='equity')

# Get specific instruments
instruments = client.get_instruments(
  symbols=['AAPL', 'ES'],
  asset_class='equity'
)

Response Format:

{
  "instruments": [
    {
      "sym": "AAPL",
      "root": "AAPL",
      "ex": "NASDAQ",
      "name": "Apple Inc",
      "assetType": "Eq",
      "multiplier": "1",
      "openTime": "09:30:00",
      "closeTime": "16:00:00"
    }
  ]
}

client.get_equities()

Fetch equity instrument metadata.

Parameters:

client.get_equities(
  symbols: Optional[List[str]] = None,              # Equity symbols
  date: Optional[str] = None,                       # Date for filtering (ISO format)
  exchange: Optional[str] = None,                   # Filter by exchange
  currency: Optional[str] = None,                   # Filter by currency
  limit: Optional[int] = None,                      # Max records to return
  offset: Optional[int] = None                      # Records to skip
)

Example:

# Get equity metadata
equities = client.get_equities(symbols=['AAPL', 'MSFT', 'GOOGL'])

# Get all equities
all_equities = client.get_equities()

Response Format:

{
  "instruments": [
    {
      "date": "2025-08-26",
      "sym": "AAPL",
      "root": "AAPL",
      "ex": "NASDAQ",
      "name": "Apple Inc",
      "openTime": "09:30:00",
      "closeTime": "16:00:00",
      "multiplier": "1",
      "assetType": "Eq",
      "tickSizePilot": "false",
      "minOrderSize": "1",
      "maxOrderSize": "10000",
      "baseIncrement": "1",
      "quoteIncrement": "0.01"
    }
  ]
}

client.get_futures()

Query futures contracts including specific contracts and continuous contract series.

Parameters:

client.get_futures(
  symbols: Optional[List[str]] = None,            # Contract or continuous symbols
  start_date: Optional[str] = None,               # Start date (YYYY-MM-DD)
  end_date: Optional[str] = None,                 # End date (YYYY-MM-DD)
  exchange: Optional[str] = None,                 # Filter by exchange (e.g., 'XCME')
  currency: Optional[str] = None,                 # Filter by currency (e.g., 'USD')
  expiry_type: str = "volume",                    # Rollover type: 'volume', 'expiry', 'last_trade', 'open_interest'
  limit: Optional[int] = None,                    # Max records to return
  offset: Optional[int] = None                    # Records to skip
)

Symbol Formats:

  1. Specific Contracts (Databento symbology):

    • Single digit years: ESM1 (June 2021 or 2011 based on query dates)
    • Two digit years: ESZ25 (December 2025)
    • Context-aware: Same symbol interpreted differently based on start_date
  2. Continuous Contracts (Databento format):

    • ES.v.0 - Front month by volume
    • ES.v.1 - Second month by volume (forward contract)
    • ES.c.0 - Front month by calendar
    • Format: ROOT.RULE.POSITION where RULE = v (volume) or c (calendar)

Examples:

Specific contracts with context-aware parsing:

# ESM1 interpreted as June 2021 (decade from query dates)
futures_2021 = client.get_futures(
    symbols=['ESM1'],
    start_date='2021-05-01',
    end_date='2021-06-30'
)

# Same symbol ESM1 interpreted as June 2011
futures_2011 = client.get_futures(
    symbols=['ESM1'],
    start_date='2011-05-01',
    end_date='2011-06-30'
)

Continuous contracts:

# Get all contracts that were position 1 (second month) during 2020
continuous = client.get_futures(
    symbols=['ES.v.1'],
    start_date='2020-01-01',
    end_date='2020-12-31'
)
# Returns: Multiple contracts, sorted by start_date ascending

Response Format:

{
  "count": 2,
  "instruments": [
    {
      "symbol": "ESZ5",
      "root": "ES",
      "root_symbol": "ES",
      "exch_mic": "XCME",
      "currency": "USD",
      "multiplier": 50,
      "min_tick": 0.25,
      "tick_value": 12.5,
      "lot_units": "IPNT",
      "asset_type": "Fut",
      "contract_month": "December",
      "contract_year": 2025,
      "expiry_date": "2025-12-01"
    }
  ]
}

Continuous contract response includes rollover fields:

{
  "symbol": "ESU1",
  "continuous_symbol": "ES.v.1",
  "rollover_rule": "v",
  "rollover_rank": 1,
  "start_date": "2011-09-12",
  "end_date": "2011-09-18"
}

Key Features:

  • Context-aware decade detection: Single-digit years interpreted based on query dates
  • Date validation: Rejects contracts >2 years from query date range
  • Proper overlap filtering: Contracts must have meaningful overlap with query range
  • Sorted results: Continuous contracts returned in ascending order by start_date

client.get_options()

Fetch options contract information.

Parameters:

client.get_options(
  symbols: Optional[List[str]] = None,             # OCC-format option symbols
  chains: Optional[Union[str, List[str]]] = None,  # Underlying symbol(s) for option chains
  start_date: Optional[str] = None,                # Start date for filtering
  end_date: Optional[str] = None,                  # End date for filtering
  expiration_date: Optional[str] = None,           # Filter by expiration date
  strike: Optional[Union[float, str]] = None,      # Filter by strike price
  option_type: Optional[str] = None,               # Filter by type ('C' or 'P')
  exchange: Optional[str] = None,                  # Filter by exchange
  currency: Optional[str] = None,                  # Filter by currency
  limit: Optional[int] = None,                     # Max records to return
  offset: Optional[int] = None,                    # Records to skip
  root: Optional[str] = None                       # Specify root
)

Examples:

Get option chain by underlying:

# Returns all options for SPY underlying
all_options = client.get_options(chains=['SPY'])

Get options with filters:

# Returns options active during period with filters
options = client.get_options(
  chains=['SPY'],
  start_date='2025-08-01',
  end_date='2025-09-30',
  expiration_date='2025-09-19',
  strike=450.0,
  option_type='C'
)

Get specific option symbols:

options = client.get_options(
  symbols=['SPY250919C00450000'],
  start_date='2025-08-01',
  end_date='2025-09-30'
)

Response Format:

{
  "instruments": [
    {
      "date": "2025-09-19",
      "sym": "SPY250919C00450000",
      "root": "SPY",
      "ex": "CBOE",
      "name": "SPDR S&P 500 ETF",
      "underlying": "SPY",
      "strike": "450.0",
      "expiration": "2025-09-19",
      "option_type": "C",
      "multiplier": "100",
      "assetType": "Opt"
    }
  ]
}

client.get_indices()

Fetch index instrument metadata.

Parameters:

client.get_indices(
  symbols: Optional[List[str]] = None,             # Index symbols (e.g., 'SPX', 'NDX')
  date: Optional[str] = None,                      # Date for filtering (ISO format)
  exchange: Optional[str] = None,                  # Filter by exchange
  currency: Optional[str] = None,                  # Filter by currency
  limit: Optional[int] = None,                     # Max records to return
  offset: Optional[int] = None                     # Records to skip
)

Example:

indices = client.get_indices(symbols=['SPX', 'NDX'])

Publisher Client

hd.Publisher()

Client for publishing data to HiveQ destinations. Supports adding new records and modifying existing records. Provides both synchronous and asynchronous publishing modes.

Initialization:

client = hd.Publisher(
  api_key: Optional[str] = None,       # Override global API key
  base_url: Optional[str] = None,      # Override global base URL
  async_mode: bool = True,             # Async HTTP for 10-100x performance (default)
  user_id: Optional[str] = None,       # User ID for request headers
  org_id: Optional[str] = None,        # Organization ID for request headers
  user_name: Optional[str] = None      # User name for request headers
)

Parameters:

  • api_key: Override global API key
  • base_url: Override global base URL
  • async_mode: If True (default), uses async HTTP for fire-and-forget publishes
  • user_id: User ID for request headers (X-User-ID)
  • org_id: Organization ID for request headers (X-Org-ID)
  • user_name: User name for request headers (X-User-Name)

client.publish()

Publish data to a schema in a HiveQ destination.

Parameters:

client.publish(
  schema: str,                         # Required: Schema name (e.g., "event_logs")
  data: List[Dict[str, Any]],          # Required: List of records to publish
  key: str,                            # Required: Unique key identifier for tracking
  operation: str = "add"               # Optional: "add" (insert) or "modify" (update)
)

Operation Types:

OperationDescriptionUse Case
addInsert new recordsPublishing new events, logs, or data
modifyUpdate existing recordsCorrecting or updating previously published data

client.flush()

Wait for all in-flight async publishes to complete. Only relevant when async_mode=True.

Usage:

publisher = hd.Publisher(async_mode=True)

# Publish many records...
for i in range(1000):
    publisher.publish(...)

# Wait for all publishes to complete
publisher.flush()

Behavior:

  • Blocks until all pending async publish operations finish
  • Should be called before shutdown to ensure data integrity
  • No-op in synchronous mode (async_mode=False)

client.close()

Close the publisher and cleanup resources.

Usage:

publisher = hd.Publisher(async_mode=True)
# ... publish data ...
publisher.close()  # Automatically flushes and closes connections

Behavior:

  • Flushes pending requests (if async mode)
  • Closes HTTP connections
  • Should be called when done using the publisher

Examples

Synchronous Mode

Add New Records:

import hiveq_data as hd

publisher = hd.Publisher(async_mode=False)

# Publish strategy event logs
result = publisher.publish(
  schema='event_logs',
  key='run_20240101_001',
  operation='add',
  data=[
    {
      'ts_event': '2024-01-01T10:00:00.000Z',
      'strategy_id': 'my_strategy',
      'trader_id': 'trader_001',
      'nav': 100000.0,
      'realized_pnl': 1500.0,
      'total_pnl': 1500.0,
      'event_log_type': 'TRADE',
      'sub_event_type': 'FILLED',
      'symbol': 'AAPL',
      'message': 'Trade executed'
    },
    {
      'ts_event': '2024-01-01T10:05:00.000Z',
      'strategy_id': 'my_strategy',
      'trader_id': 'trader_001',
      'nav': 101200.0,
      'realized_pnl': 2700.0,
      'total_pnl': 2700.0,
      'event_log_type': 'TRADE',
      'sub_event_type': 'FILLED',
      'symbol': 'MSFT',
      'message': 'Trade executed'
    }
  ]
)

print(f"Published {result['record_count']} records")
publisher.close()

Response Format:

{
  "status": "success",
  "record_count": 2,
  "schema": "event_logs",
  "key": "run_20240101_001",
  "operation": "add"
}

Modify Existing Records:

# Update previously published records
result = publisher.publish(
  schema='event_logs',
  key='run_20240101_001',
  operation='modify',
  data=[
    {
      'ts_event': '2024-01-01T10:00:00.000Z',
      'strategy_id': 'my_strategy',
      'trader_id': 'trader_001',
      'nav': 102000.0,        # Updated value
      'realized_pnl': 3500.0,  # Updated value
      'total_pnl': 3500.0      # Updated value
    }
  ]
)

Async Mode (Default, High Performance)

Async mode is the default and provides 10-100x performance improvement for high-frequency publishing:

import hiveq_data as hd

# Async mode is the default
publisher = hd.Publisher()

# Publish many records quickly (fire-and-forget)
for i in range(1000):
    result = publisher.publish(
        schema='event_logs',
        key=f'run_{i}',
        operation='add',
        data=[{
            'ts_event': '2024-01-01T10:00:00.000Z',
            'strategy_id': 'my_strategy',
            'nav': 100000.0,
            'realized_pnl': 1500.0
        }]
    )
    # Returns immediately without waiting for HTTP response

# Always flush before shutdown to ensure all data is published
publisher.flush()
publisher.close()

Async Mode Benefits:

  • 10-100x faster for high-frequency publishing
  • Fire-and-forget: publish() returns immediately
  • Concurrent HTTP requests with connection pooling (100 connections)
  • Automatic retry with exponential backoff (3 attempts)
  • Perfect for backtesting and batch publishing

Async Mode Response Format:

{
  "success": true,
  "async": true,
  "records": 1
}

Error Handling:

try:
  result = publisher.publish(
    schema='event_logs',
    key='run_20240101_001',
    operation='invalid_op',  # Invalid operation
    data=[...]
  )
except ValueError as e:
  print(f"Validation error: {e}")
  # Output: Invalid operation 'invalid_op'. Must be one of: add, modify

Common Schema Examples:

Event Logs Schema:

{
  'ts_event': '2024-01-01T10:00:00.000Z',        # Event timestamp (ISO format)
  'strategy_id': 'my_strategy',                   # Strategy identifier
  'trader_id': 'trader_001',                      # Trader identifier
  'nav': 100000.0,                                # Net asset value
  'realized_pnl': 1500.0,                         # Realized P&L
  'total_pnl': 1500.0,                            # Total P&L
  'event_log_type': 'TRADE',                      # Event type
  'sub_event_type': 'FILLED',                     # Sub-event type
  'symbol': 'AAPL',                               # Trading symbol
  'message': 'Trade executed'                     # Event message
}

SignalPublisher Client

hd.SignalPublisher()

Kafka-based publisher for user signal data. Optimized for high-throughput signal publishing with low latency and automatic reconnection.

Initialization:

publisher = hd.SignalPublisher(
  bootstrap_servers: Optional[str] = None,     # Kafka broker address (e.g., 'localhost:9092')
  topic: str = 'user-signals',                 # Kafka topic to publish to
  auto_reconnect: bool = True,                 # Enable automatic reconnection
  reconnect_interval: float = 1.0,             # Seconds between reconnection attempts
  status_log_interval: float = 30.0,           # Seconds between status log messages
  **kafka_config                               # Additional Kafka producer configuration
)

Parameters:

  • bootstrap_servers (str, optional): Kafka broker address. If None, loads from KAFKA_BOOTSTRAP_SERVERS env variable. Default: 'localhost:9092'
  • topic (str, optional): Kafka topic to publish to (default: 'user-signals')
  • auto_reconnect (bool, optional): Enable automatic reconnection on connection failure (default: True)
  • reconnect_interval (float, optional): Seconds between reconnection attempts (default: 1.0)
  • status_log_interval (float, optional): Seconds between status log messages (default: 30.0)
  • **kafka_config: Additional Kafka producer configuration (e.g., acks='all', compression_type='gzip', retries=3)

Signal Format:

{
    'data': {'signal': 0, 'weight': 0},           # Signal data dict
    'user_id': 'Raj',                           # User identifier
    'ts_event': '2025-11-05 19:00:02.267000',     # Event timestamp (auto-generated)
    'symbol': 'AAPL',                             # Symbol
    'signal_version': 1,                          # Version (default: 1)
    'signal_id': '39831f03-7022-4c0f-...'        # UUID (auto-generated)
}

Methods:

  • publish(data, symbol, user_id, ts_event=None, signal_version=1, signal_id=None, key=None): Publish a single signal

    • data (dict, required): Signal data dict with 'signal' and 'weight' keys
    • symbol (str, required): Symbol string (e.g., 'AAPL')
    • user_id (str, required): User identifier string
    • ts_event (datetime, optional): Event timestamp (auto-generated if None)
    • signal_version (int, optional): Signal version number (default: 1)
    • signal_id (str, optional): Signal UUID (auto-generated if None)
    • key (str, optional): Kafka partition key (defaults to symbol)
  • publish_batch(signals, key=None): Publish multiple signals

    • signals (list, required): List of signal dicts, each containing data, symbol, user_id
    • key (str, optional): Kafka partition key (if None, uses symbol from each signal)
    • Returns: Number of successfully published signals (int)
  • flush(): Flush pending messages to Kafka

  • close(): Close the Kafka producer connection

  • is_connected(): Check if connected to Kafka (returns bool)

  • get_connection_status(): Get detailed connection status dict

Context Manager Support:

with hd.SignalPublisher(bootstrap_servers='localhost:9092') as publisher:
    publisher.publish(
        data={'signal': 1, 'weight': 0.85},
        symbol='AAPL',
        user_id='Raj'
    )
# Automatically closed

Batch Example:

import hiveq_data as hd

publisher = hd.SignalPublisher(
    bootstrap_servers='localhost:9092',
    topic='user-signals'
)

# Publish batch
signals = [
    {
        'data': {'signal': 1, 'weight': 0.85},
        'symbol': 'AAPL',
        'user_id': 'Raj'
    },
    {
        'data': {'signal': 0, 'weight': 0.0},
        'symbol': 'MSFT',
        'user_id': 'Raj'
    }
]
count = publisher.publish_batch(signals)
print(f"Published {count} signals")

publisher.close()

Advanced Configuration:

publisher = hd.SignalPublisher(
    bootstrap_servers='localhost:9092',
    topic='user-signals',
    acks='all',  # Wait for all replicas
    compression_type='gzip',  # Compress messages
    retries=3,  # Retry failed sends
    max_in_flight_requests_per_connection=1  # Ensure ordering
)

Return Data Reference

Complete field-by-field documentation of all return data structures with data types and descriptions.

Historical Data Response Structure

All Historical.get_data() calls return a consistent structure:

{
  "data": [...],
  "schema": "string",
  "dataset": "string"
}

Top-Level Fields:

FieldTypeDescription
dataList[Dict]Array of data records matching the query
schemastrSchema name used for the query (e.g., "bars_1s")
datasetstrDataset name queried (e.g., "HIVEQ_US_EQ")

Equity OHLC Data Fields (bars_1s, bars_1m)

{
  "date": "2025-08-01",
  "time": "2025-08-01T09:30:01.000000000",
  "symbol": "AAPL",
  "rtype": 32,
  "instrument_id": 12345,
  "source_id": "DATABENTO",
  "src_dataset": "GLBX.MDP3",
  "publisher_id": 1,
  "open": 185.5,
  "high": 186.2,
  "low": 185.3,
  "close": 186.0,
  "volume": 125000.0,
  "ts_event": "2025-08-01T09:30:01.000000000",
  "recv_event": "2025-08-01T09:30:01.100000000",
  "version": 1,
  "db_event": "2025-08-01T09:30:01.200000000"
}

Field Descriptions:

FieldTypeNullableDescription
dateDateNoTrading date (YYYY-MM-DD)
timeDateTime64(9)NoTimestamp with nanosecond precision
symbolStringNoStock ticker symbol
rtypeUInt16NoRecord type identifier
instrument_idUInt32NoUnique instrument identifier
source_idStringNoData source identifier
src_datasetStringNoSource dataset name
publisher_idUInt32YesPublisher identifier
openFloat64YesOpening price for the period
highFloat64YesHighest price during the period
lowFloat64YesLowest price during the period
closeFloat64YesClosing price for the period
volumeFloat64YesTrading volume (number of shares)
ts_eventDateTime64(9)NoEvent timestamp from exchange
recv_eventDateTime64(9)NoTimestamp when received by system
versionUInt8YesData version number
db_eventDateTime64(9)NoDatabase insertion timestamp

Equity Daily Bars (bars_daily)

{
  "date": "2025-08-01",
  "trade_date": "2025-08-01",
  "symbol": "AAPL",
  "open": 185.5,
  "high": 186.2,
  "low": 185.3,
  "close": 186.0,
  "volume": 45000000.0,
  "adj_open": 185.5,
  "adj_high": 186.2,
  "adj_low": 185.3,
  "adj_close": 186.0,
  "adj_volume": 45000000.0,
  "factor": 1.0,
  "ts_created": "2025-08-01T16:00:00.000",
  "recv_event": "2025-08-01T16:00:01",
  "source_id": "DATABENTO",
  "version": "2025-08-01T16:00:01"
}

Field Descriptions:

FieldTypeNullableDescription
dateDateNoCalendar date
trade_dateDateNoActual trading date
symbolStringNoStock ticker symbol
openFloat64YesUnadjusted opening price
highFloat64YesUnadjusted high price
lowFloat64YesUnadjusted low price
closeFloat64YesUnadjusted closing price
volumeFloat64YesUnadjusted volume
adj_openFloat64YesSplit & dividend adjusted opening price
adj_highFloat64YesSplit & dividend adjusted high price
adj_lowFloat64YesSplit & dividend adjusted low price
adj_closeFloat64YesSplit & dividend adjusted closing price
adj_volumeFloat64YesSplit adjusted volume
factorFloat64NoCumulative adjustment factor
ts_createdDateTime64(3)NoRecord creation timestamp
recv_eventDateTimeNoReceive timestamp
source_idStringNoData source identifier
versionDateTimeNoVersion timestamp

Equity Trades Data

{
  "date": "2025-08-01",
  "time": "2025-08-01T09:30:01.123456789",
  "symbol": "AAPL",
  "rtype": 160,
  "instrument_id": 12345,
  "source_id": "DATABENTO",
  "src_dataset": "GLBX.MDP3",
  "publisher_id": 1,
  "price": 185.75,
  "size": 100,
  "action": "T",
  "side": "B",
  "flags": 0,
  "depth": 0,
  "bid_px": 185.74,
  "ask_px": 185.76,
  "bid_sz": 500,
  "ask_sz": 300,
  "bid_ct": 5,
  "ask_ct": 3,
  "sequence": 12345678,
  "ts_event": "2025-08-01T09:30:01.123456789",
  "ts_recv": "2025-08-01T09:30:01.124000000",
  "ts_in_delta": 544211,
  "recv_event": "2025-08-01T09:30:01.125000000",
  "version": 1,
  "db_event": "2025-08-01T09:30:01.126000000"
}

Field Descriptions:

FieldTypeNullableDescription
dateDateNoTrading date
timeDateTime64(9)NoTimestamp with nanosecond precision
symbolStringNoStock ticker symbol
rtypeUInt16NoRecord type (160 for trades)
instrument_idUInt32NoUnique instrument identifier
source_idStringNoData source identifier
src_datasetStringNoSource dataset name
publisher_idUInt32YesPublisher identifier
priceFloat64YesTrade price
sizeUInt32YesTrade size (shares)
actionStringNoTrade action (T=Trade, C=Cancel, etc.)
sideStringNoTrade side (B=Buy, S=Sell, N=None)
flagsUInt8YesTrade condition flags
depthUInt8YesMarket depth level
bid_pxFloat64YesBest bid price at time of trade
ask_pxFloat64YesBest ask price at time of trade
bid_szUInt32YesBest bid size
ask_szUInt32YesBest ask size
bid_ctUInt32YesNumber of orders at best bid
ask_ctUInt32YesNumber of orders at best ask
sequenceUInt32YesSequence number
ts_eventDateTime64(9)NoEvent timestamp from exchange
ts_recvDateTime64(9)NoReceive timestamp
ts_in_deltaUInt32YesTime delta (nanoseconds)
recv_eventDateTime64(9)NoSystem receive timestamp
versionUInt8YesData version
db_eventDateTime64(9)NoDatabase insertion timestamp

Futures OHLC Data Fields (bars_1s, bars_1m)

{
  "date": "2025-08-01",
  "time": "2025-08-01T09:30:01.000000000",
  "symbol": "ES.H25",
  "root": "ES",
  "rtype": 32,
  "instrument_id": 54321,
  "source_id": "DATABENTO",
  "src_dataset": "GLBX.MDP3",
  "publisher_id": 1,
  "open": 4500.25,
  "high": 4505.5,
  "low": 4498.75,
  "close": 4502.0,
  "volume": 50000.0,
  "ts_event": "2025-08-01T09:30:01.000000000",
  "recv_event": "2025-08-01T09:30:01.100000000",
  "version": 1,
  "db_event": "2025-08-01T09:30:01.200000000"
}

Field Descriptions:

FieldTypeNullableDescription
dateDateNoTrading date
timeDateTime64(9)NoTimestamp with nanosecond precision
symbolStringNoFull contract symbol (e.g., ES.H25)
rootStringNoFutures root symbol (e.g., ES)
rtypeUInt16NoRecord type identifier
instrument_idUInt32NoUnique instrument identifier
source_idStringNoData source identifier
src_datasetStringNoSource dataset name
publisher_idUInt32YesPublisher identifier
openFloat64YesOpening price for the period
highFloat64YesHighest price during the period
lowFloat64YesLowest price during the period
closeFloat64YesClosing price for the period
volumeFloat64YesTrading volume (number of contracts)
ts_eventDateTime64(9)NoEvent timestamp from exchange
recv_eventDateTime64(9)NoTimestamp when received by system
versionUInt8YesData version number
db_eventDateTime64(9)NoDatabase insertion timestamp

Options Snapshot Data Fields (snaps_1s)

{
  "date": "2025-08-01",
  "time": "2025-08-01T09:30:01.000000000",
  "symbol": "SPY250919C00450000",
  "underlying": "SPY",
  "strike": 450.0,
  "expiration": "2025-09-19",
  "option_type": "C",
  "rtype": 96,
  "instrument_id": 98765,
  "source_id": "OPRA",
  "src_dataset": "OPRA",
  "publisher_id": 1,
  "bid_px": 12.5,
  "ask_px": 12.55,
  "bid_sz": 100,
  "ask_sz": 150,
  "bid_ct": 5,
  "ask_ct": 7,
  "last_px": 12.52,
  "last_sz": 10,
  "open_interest": 25000,
  "implied_vol": 0.185,
  "delta": 0.52,
  "gamma": 0.015,
  "theta": -0.08,
  "vega": 0.35,
  "rho": 0.12,
  "sequence": 123456,
  "ts_event": "2025-08-01T09:30:01.000000000",
  "ts_recv": "2025-08-01T09:30:01.100000000",
  "recv_event": "2025-08-01T09:30:01.110000000",
  "version": 1,
  "db_event": "2025-08-01T09:30:01.120000000"
}

Field Descriptions:

FieldTypeNullableDescription
dateDateNoTrading date
timeDateTime64(9)NoTimestamp with nanosecond precision
symbolStringNoFull option symbol (OSI format)
underlyingStringNoUnderlying security symbol
strikeFloat64NoStrike price
expirationDateNoExpiration date
option_typeStringNoOption type (C=Call, P=Put)
rtypeUInt16NoRecord type identifier
instrument_idUInt32NoUnique instrument identifier
source_idStringNoData source identifier
src_datasetStringNoSource dataset name
publisher_idUInt32YesPublisher identifier
bid_pxFloat64YesBest bid price
ask_pxFloat64YesBest ask price
bid_szUInt32YesBest bid size (contracts)
ask_szUInt32YesBest ask size (contracts)
bid_ctUInt32YesNumber of orders at best bid
ask_ctUInt32YesNumber of orders at best ask
last_pxFloat64YesLast trade price
last_szUInt32YesLast trade size
open_interestUInt32YesTotal open interest
implied_volFloat64YesImplied volatility (decimal)
deltaFloat64YesOption delta (Greek)
gammaFloat64YesOption gamma (Greek)
thetaFloat64YesOption theta (Greek)
vegaFloat64YesOption vega (Greek)
rhoFloat64YesOption rho (Greek)
sequenceUInt32YesSequence number
ts_eventDateTime64(9)NoEvent timestamp from exchange
ts_recvDateTime64(9)NoReceive timestamp
recv_eventDateTime64(9)NoSystem receive timestamp
versionUInt8YesData version
db_eventDateTime64(9)NoDatabase insertion timestamp

InstrumentReference Response Structure

All InstrumentReference methods return:

{
  "instruments": [...]
}

Top-Level Fields:

FieldTypeDescription
instrumentsList[Dict]Array of instrument metadata records

Equity Instrument Fields

{
  "date": "2025-08-26",
  "sym": "AAPL",
  "root": "AAPL",
  "ex": "NASDAQ",
  "name": "Apple Inc",
  "openTime": "09:30:00",
  "closeTime": "16:00:00",
  "rootMinTick": "0.01",
  "curContract": "AAPL",
  "multiplier": "1",
  "assetType": "Eq",
  "tickSizePilot": "false",
  "rolloverSymbol": "",
  "rolloverDate": "",
  "securityID": "AAPL",
  "minOrderSize": "1",
  "maxOrderSize": "10000",
  "baseIncrement": "1",
  "quoteIncrement": "0.01",
  "underlying": "",
  "Version": "V4"
}

Field Descriptions:

FieldTypeDescription
dateStringReference data timestamp
symStringTrading symbol
rootStringRoot symbol (same as sym for equities)
exStringPrimary exchange
nameStringFull company name
openTimeStringMarket open time (HH:MM:SS)
closeTimeStringMarket close time (HH:MM:SS)
rootMinTickStringMinimum price increment
curContractStringCurrent contract symbol
multiplierStringContract multiplier (1 for equities)
assetTypeStringAsset type (Eq=Equity, Fut=Futures, Opt=Options)
tickSizePilotStringTick size pilot program participant ("true"/"false")
rolloverSymbolStringNext rollover symbol (empty for equities)
rolloverDateStringRollover date (empty for equities)
securityIDStringSecurity identifier
minOrderSizeStringMinimum order size
maxOrderSizeStringMaximum order size
baseIncrementStringBase order size increment
quoteIncrementStringQuote price increment
underlyingStringUnderlying security (empty for equities)
VersionStringData version

Futures Instrument Fields

{
  "date": "2025-03-13",
  "sym": "ES.H25",
  "root": "ES",
  "ex": "CME",
  "name": "E-MINI S&P 500 FUTURES",
  "openTime": "18:00:00",
  "closeTime": "17:00:00",
  "rootMinTick": "0.25",
  "curContract": "ES.H25",
  "multiplier": "50",
  "assetType": "Fut",
  "tickSizePilot": "false",
  "rolloverSymbol": "",
  "rolloverDate": "",
  "securityID": "ES.H25",
  "minOrderSize": "1",
  "maxOrderSize": "100",
  "baseIncrement": "1",
  "quoteIncrement": "0.25",
  "underlying": "",
  "Version": "V4"
}

Futures-Specific Fields:

FieldTypeDescription
symbolStringContract symbol (Databento format, e.g., ESZ5, ESM1)
rootStringFutures root symbol (e.g., ES)
root_symbolStringFutures root symbol (e.g., ES)
multiplierNumberContract multiplier (e.g., 50 for ES)
min_tickNumberMinimum tick size (e.g., 0.25 for ES)
tick_valueNumberValue per tick (e.g., 12.5 for ES)
contract_monthStringContract month name (e.g., "December")
contract_yearNumberContract year (e.g., 2025)
expiry_dateStringContract expiry date (YYYY-MM-DD)

Continuous Contract Additional Fields:

FieldTypeDescription
continuous_symbolStringContinuous symbol (e.g., ES.v.1)
rollover_ruleStringRollover methodology (v=volume, c=calendar)
rollover_rankNumberPosition (0=front, 1=second, etc.)
start_dateStringDate contract became active at this position
end_dateStringDate contract rolled off this position

Options Instrument Fields

{
  "date": "2025-09-19",
  "sym": "SPY250919C00450000",
  "root": "SPY",
  "ex": "CBOE",
  "name": "SPDR S&P 500 ETF",
  "openTime": "09:30:00",
  "closeTime": "16:00:00",
  "rootMinTick": "0.01",
  "curContract": "SPY250919C00450000",
  "multiplier": "100",
  "assetType": "Opt",
  "tickSizePilot": "false",
  "rolloverSymbol": "",
  "rolloverDate": "",
  "securityID": "SPY250919C00450000",
  "minOrderSize": "1",
  "maxOrderSize": "500",
  "baseIncrement": "1",
  "quoteIncrement": "0.01",
  "underlying": "SPY",
  "Version": "V4"
}

Additional Options-Specific Fields:

FieldTypeDescription
symStringFull option symbol in OSI format
rootStringUnderlying security symbol
underlyingStringUnderlying security (same as root)
multiplierStringContract multiplier (typically 100)

MetaReference Response Structures

get_databases() Response

{
  "databases": ["prod_equities_001", "prod_options_001", "prod_futures_001"]
}
FieldTypeDescription
databasesList[String]Array of database names

get_tables() Response

{
  "tables": [
    {
      "db_name": "prod_equities_001",
      "table_name": "eq_ohlc_1s",
      "schema_name": "bars_1s",
      "description": "Equity OHLC data at 1-second intervals with volume information",
      "schema": [],
      "col_descriptions": [],
      "default_query_cols": [],
      "primary_keys": [],
      "order_by": [],
      "tags": [],
      "data_sets": [],
      "src_ids": [],
      "src_data_sets": [],
      "is_active": true,
      "last_updated": "2025-08-26T12:00:00",
      "created": "2024-01-01T00:00:00"
    }
  ]
}

Table Object Fields:

FieldTypeDescription
db_nameStringDatabase name
table_nameStringTable name
schema_nameStringAssociated schema name
descriptionStringTable description
schemaList[Object]Column schema definitions
col_descriptionsList[Object]Column descriptions
default_query_colsList[String]Recommended query columns
primary_keysList[String]Primary key columns
order_byList[String]Default ordering columns
tagsList[String]Associated tags
data_setsList[String]Associated dataset names
src_idsList[String]Source identifiers
src_data_setsList[String]Source dataset names
is_activeBooleanWhether table is active
last_updatedStringLast update timestamp
createdStringCreation timestamp

Schema Column Object:

{
  "name": "date",
  "type": "Date",
  "nullable": false
}
FieldTypeDescription
nameStringColumn name
typeStringClickHouse data type
nullableBooleanWhether column allows NULL values

Column Description Object:

{
  "name": "date",
  "description": "Trading date"
}
FieldTypeDescription
nameStringColumn name
descriptionStringHuman-readable description

get_tags() Response

{
  "tags": [
    "1-minute",
    "1-second",
    "adjusted",
    "best-bid-offer",
    "daily",
    "equity",
    "futures",
    "market-data",
    "ohlc",
    "options",
    "timeseries",
    "trades"
  ]
}
FieldTypeDescription
tagsList[String]Array of available tags

Error Handling

All API methods raise ValueError for validation errors and other exceptions for runtime errors.

Example:

import hiveq_data as hd

try:
  client = hd.Historical()
  data = client.get_data(
    dataset='HIVEQ_US_FUT',
    schema='bars_1s',
    start='2025-08-01',
    end='2025-08-30'
    # Missing required parameter: neither 'symbols' nor 'root' provided
  )
except ValueError as e:
  print(f"Validation error: {e}")
  # Output: "At least one of these parameters is required for schema 'bars_1s' (asset_class: futures): symbols, root"

Common Patterns

Context Manager Support

All clients support context managers for automatic cleanup:

import hiveq_data as hd

with hd.Historical() as client:
  data = client.get_data(
    dataset='HIVEQ_US_EQ',
    schema='bars_1s',
    symbols='AAPL',
    start='2025-08-01',
    end='2025-08-30'
  )
# Session automatically closed

Per-Client Credentials

Override global credentials for specific clients:

import hiveq_data as hd

# Production client using .env credentials
prod_client = hd.Historical()

# Development client with different credentials
dev_client = hd.Historical(
  api_key='dev_key',
  base_url='http://localhost:3000'
)

Flexible Date Formats

All date parameters accept multiple formats:

from datetime import date, datetime
import hiveq_data as hd

client = hd.Historical()

# String format
data = client.get_data(..., start='2025-08-01', end='2025-08-30')

# Date object
data = client.get_data(..., start=date(2025, 8, 1), end=date(2025, 8, 30))

# Datetime object
data = client.get_data(..., start=datetime(2025, 8, 1, 9, 30), end=datetime(2025, 8, 30, 16, 0))

Futures Symbology Reference

Databento Symbology (Context-Aware):

  • Single digit years: Decade inferred from query dates
    • ESM1 with 2021 dates -> June 2021
    • ESM1 with 2011 dates -> June 2011
  • Two digit years: 2000+ century
    • ESZ25 -> December 2025
    • ESM26 -> June 2026

Continuous Contracts:

Format: ROOT.RULE.POSITION

  • ROOT: Futures symbol (ES, NQ, etc.)
  • RULE:
    • v = Volume-based rollover
    • c = Calendar-based rollover
  • POSITION:
    • 0 = Front month (currently active)
    • 1 = Second month (forward contract)
    • 2 = Third month, etc.

Examples:

  • ES.v.0 - ES front month by volume
  • ES.v.1 - ES second month by volume
  • NQ.c.0 - NQ front month by calendar

Month Codes:

  • F=Jan, G=Feb, H=Mar, J=Apr, K=May, M=Jun
  • N=Jul, Q=Aug, U=Sep, V=Oct, X=Nov, Z=Dec

On this page