API Reference
HiveQ Data API Reference
Complete API reference for the HiveQ Data Python SDK.
Version: 0.2.1
Table of Contents
- Configuration
- Historical Client
- InstrumentReference Client
- Publisher Client
- Return Data Reference
- Error Handling
- Common Patterns
- Futures Rollover Reference
Configuration
hiveq_data.configure()
Configure global API credentials. Optional - credentials are automatically loaded from .env files or environment variables.
Parameters:
hiveq_data.configure(
api_key: Optional[str] = None, # Your HiveQ API key
base_url: Optional[str] = None, # API base URL (default: https://api.hiveq.com)
user_id: Optional[str] = None, # User ID for request headers (X-User-ID)
org_id: Optional[str] = None, # Organization ID for request headers (X-Org-ID)
user_name: Optional[str] = None # User name for request headers (X-User-Name)
)Example:
import hiveq_data as hd
# Override auto-loaded credentials
hd.configure(
api_key="your_api_key_here",
base_url="https://api.hiveq.com",
user_id="your_user_id",
org_id="your_org_id",
user_name="your_username"
)
# Check SDK version
print(hd.__version__) # Outputs: 0.2.1Logging Configuration
The SDK uses Python's standard logging module and follows library best practices. By default, all logs are suppressed via NullHandler. Configure logging in your application to see SDK logs.
Basic Configuration:
import logging
import hiveq_data as hd
# Show all SDK logs at INFO level
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
client = hd.Historical()Configure Only HiveQ Logs:
import logging
# Set level for entire SDK
logging.getLogger('hiveq_data').setLevel(logging.DEBUG)
# Or configure specific modules
logging.getLogger('hiveq_data.instrument_reference.futures_reference').setLevel(logging.DEBUG)
logging.getLogger('hiveq_data.internal.async_http_client').setLevel(logging.WARNING)Log Levels:
| Level | Description | Use Case |
|---|---|---|
DEBUG | Detailed debugging info (API payloads, parsing details) | Development/troubleshooting |
INFO | General informational messages (operations completed) | Production monitoring |
WARNING | Warning messages (fallback behaviors, missing data) | Production (default) |
ERROR | Error messages (API failures, parsing errors) | Production alerts |
Logger Hierarchy:
hiveq_data- Root loggerhiveq_data.base_client- Base HTTP clienthiveq_data.historical- Historical data clienthiveq_data.instrument_reference- Instrument metadatahiveq_data.instrument_reference.futures_reference- Futures contractshiveq_data.instrument_reference.options_reference- Options contractshiveq_data.instrument_reference.equity_referene- Equity data
hiveq_data.publisher- Data publishinghiveq_data.internal.async_http_client- Async HTTP clienthiveq_data.live- Live streaming
Example: Production Logging Setup:
import logging
# Configure file logging for production
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('hiveq_sdk.log'),
logging.StreamHandler() # Also log to console
]
)
# Set SDK to WARNING (only show warnings and errors)
logging.getLogger('hiveq_data').setLevel(logging.WARNING)
# But keep detailed logs for debugging specific issues
logging.getLogger('hiveq_data.publisher').setLevel(logging.DEBUG)Historical Client
hd.Historical()
Client for querying historical market data with automatic parameter validation.
Initialization:
client = hd.Historical(
api_key: Optional[str] = None, # Override global API key
base_url: Optional[str] = None, # Override global base URL
user_id: Optional[str] = None, # User ID for request headers
org_id: Optional[str] = None, # Organization ID for request headers
user_name: Optional[str] = None # User name for request headers
)client.get_data()
Fetch historical market data with flexible parameters based on asset class.
Parameters:
client.get_data(
dataset: str, # Required: Dataset name
schema: str, # Required: Schema name
symbols: Optional[Union[str, List[str]]] = None, # Equity symbols or specific contract symbols
root: Optional[Union[str, List[str]]] = None, # Futures root symbols
chains: Optional[Union[str, List[str]]] = None, # Options underlying symbols
start: Union[str, date, datetime], # Required: Start date/datetime
end: Union[str, date, datetime], # Required: End date/datetime
limit: Optional[int] = None, # Max records to return (for pagination)
offset: Optional[int] = None, # Number of records to skip (for pagination)
filter_mode: Optional[str] = None, # 'continuous' (default) or 'session'
columns: Optional[List[str]] = None, # Specific columns to return
**kwargs # Additional schema-specific parameters
)Parameter Requirements by Asset Class:
| Asset Class | Required Parameters | Optional Parameters |
|---|---|---|
Equity (HIVEQ_US_EQ) | symbols, start, end | limit, offset, columns |
Futures (HIVEQ_US_FUT) | start, end, one of: symbols OR root | limit, offset, columns |
Options (HIVEQ_US_OPT) | start, end, one of: symbols OR chains | limit, offset, columns, expiration_date, strike, option_type |
Examples:
Equity Data:
import hiveq_data as hd
client = hd.Historical()
# Query equity bars
equity_data = client.get_data(
dataset='HIVEQ_US_EQ',
schema='bars_1s',
symbols=['AAPL', 'MSFT'],
start='2025-08-01',
end='2025-08-30',
limit=1000
)Pagination Example:
# Get first 1000 records
page1 = client.get_data(
dataset='HIVEQ_US_EQ',
schema='bars_1s',
symbols=['AAPL'],
start='2025-08-01',
end='2025-08-30',
limit=1000,
offset=0
)
# Get next 1000 records (skip first 1000)
page2 = client.get_data(
dataset='HIVEQ_US_EQ',
schema='bars_1s',
symbols=['AAPL'],
start='2025-08-01',
end='2025-08-30',
limit=1000,
offset=1000
)
# Pagination loop example
def fetch_all_data(dataset, schema, symbols, start, end, page_size=1000):
all_data = []
offset = 0
while True:
page = client.get_data(
dataset=dataset,
schema=schema,
symbols=symbols,
start=start,
end=end,
limit=page_size,
offset=offset
)
if not page['data'] or len(page['data']) == 0:
break
all_data.extend(page['data'])
offset += page_size
# Break if we got fewer records than page_size (last page)
if len(page['data']) < page_size:
break
return all_dataResponse Format:
{
"data": [
{
"date": "2025-08-01",
"time": "2025-08-01T09:30:00.000000000",
"symbol": "AAPL",
"open": 185.5,
"high": 186.2,
"low": 185.3,
"close": 186.0,
"volume": 125000.0,
"ts_event": "2025-08-01T09:30:00.000000000"
}
],
"schema": "bars_1s",
"dataset": "HIVEQ_US_EQ"
}Futures Data by Root:
# Query all contracts for ES and NQ roots
futures_data = client.get_data(
dataset='HIVEQ_US_FUT',
schema='bars_1s',
root=['ES', 'NQ'], # All ES and NQ contracts
start='2025-08-01',
end='2025-08-30'
)Futures Data by Specific Contracts:
# Query specific futures contracts
futures_data = client.get_data(
dataset='HIVEQ_US_FUT',
schema='bars_1s',
symbols=['ES.H25', 'NQ.M25'], # Specific contracts
start='2025-08-01',
end='2025-08-30'
)Futures Response Format:
{
"data": [
{
"date": "2025-08-01",
"time": "2025-08-01T09:30:00.000000000",
"symbol": "ES.H25",
"root": "ES",
"open": 4500.25,
"high": 4505.5,
"low": 4498.75,
"close": 4502.0,
"volume": 50000.0,
"ts_event": "2025-08-01T09:30:00.000000000"
}
],
"schema": "bars_1s",
"dataset": "HIVEQ_US_FUT"
}Options Data by Chains:
# Query all options for SPY underlying
options_data = client.get_data(
dataset='HIVEQ_US_OPT',
schema='snaps_1s',
chains=['SPY'], # All SPY options
start='2025-08-01',
end='2025-08-30',
expiration_date='2025-09-19', # Optional: filter by expiration
strike=450.0, # Optional: filter by strike
option_type='C' # Optional: C=Call, P=Put
)Options Data by Specific Symbols:
# Query specific option contracts
options_data = client.get_data(
dataset='HIVEQ_US_OPT',
schema='snaps_1s',
symbols=['SPY250919C00450000'],
start='2025-08-01',
end='2025-08-30'
)Options Response Format:
{
"data": [
{
"date": "2025-08-01",
"time": "2025-08-01T09:30:00.000000000",
"symbol": "SPY250919C00450000",
"underlying": "SPY",
"strike": 450.0,
"expiration": "2025-09-19",
"option_type": "C",
"bid_px": 12.5,
"ask_px": 12.55,
"bid_sz": 100,
"ask_sz": 150,
"implied_vol": 0.185,
"delta": 0.52,
"gamma": 0.015,
"theta": -0.08,
"vega": 0.35
}
],
"schema": "snaps_1s",
"dataset": "HIVEQ_US_OPT"
}InstrumentReference Client
hd.InstrumentReference()
API-based client for accessing instrument metadata.
Initialization:
client = hd.InstrumentReference(
api_key: Optional[str] = None, # Override global API key
base_url: Optional[str] = None, # Override global base URL
user_id: Optional[str] = None, # User ID for request headers
org_id: Optional[str] = None, # Organization ID for request headers
user_name: Optional[str] = None # User name for request headers
)client.get_instruments()
Generic method to fetch instrument metadata for any asset class.
Parameters:
client.get_instruments(
symbols: Optional[List[str]] = None, # Symbol(s) to filter
asset_class: Optional[str] = None, # 'equity', 'futures', 'options', or 'index'
start_date: Optional[str] = None, # Start date (for futures/options)
end_date: Optional[str] = None, # End date (for futures/options)
exchange: Optional[str] = None, # Filter by exchange
currency: Optional[str] = None, # Filter by currency
limit: Optional[int] = None, # Max records to return
offset: Optional[int] = None # Records to skip
)Example:
import hiveq_data as hd
client = hd.InstrumentReference()
# Get all equity instruments
equities = client.get_instruments(asset_class='equity')
# Get specific instruments
instruments = client.get_instruments(
symbols=['AAPL', 'ES'],
asset_class='equity'
)Response Format:
{
"instruments": [
{
"sym": "AAPL",
"root": "AAPL",
"ex": "NASDAQ",
"name": "Apple Inc",
"assetType": "Eq",
"multiplier": "1",
"openTime": "09:30:00",
"closeTime": "16:00:00"
}
]
}client.get_equities()
Fetch equity instrument metadata.
Parameters:
client.get_equities(
symbols: Optional[List[str]] = None, # Equity symbols
date: Optional[str] = None, # Date for filtering (ISO format)
exchange: Optional[str] = None, # Filter by exchange
currency: Optional[str] = None, # Filter by currency
limit: Optional[int] = None, # Max records to return
offset: Optional[int] = None # Records to skip
)Example:
# Get equity metadata
equities = client.get_equities(symbols=['AAPL', 'MSFT', 'GOOGL'])
# Get all equities
all_equities = client.get_equities()Response Format:
{
"instruments": [
{
"date": "2025-08-26",
"sym": "AAPL",
"root": "AAPL",
"ex": "NASDAQ",
"name": "Apple Inc",
"openTime": "09:30:00",
"closeTime": "16:00:00",
"multiplier": "1",
"assetType": "Eq",
"tickSizePilot": "false",
"minOrderSize": "1",
"maxOrderSize": "10000",
"baseIncrement": "1",
"quoteIncrement": "0.01"
}
]
}client.get_futures()
Query futures contracts including specific contracts and continuous contract series.
Parameters:
client.get_futures(
symbols: Optional[List[str]] = None, # Contract or continuous symbols
start_date: Optional[str] = None, # Start date (YYYY-MM-DD)
end_date: Optional[str] = None, # End date (YYYY-MM-DD)
exchange: Optional[str] = None, # Filter by exchange (e.g., 'XCME')
currency: Optional[str] = None, # Filter by currency (e.g., 'USD')
expiry_type: str = "volume", # Rollover type: 'volume', 'expiry', 'last_trade', 'open_interest'
limit: Optional[int] = None, # Max records to return
offset: Optional[int] = None # Records to skip
)Symbol Formats:
-
Specific Contracts (Databento symbology):
- Single digit years:
ESM1(June 2021 or 2011 based on query dates) - Two digit years:
ESZ25(December 2025) - Context-aware: Same symbol interpreted differently based on start_date
- Single digit years:
-
Continuous Contracts (Databento format):
ES.v.0- Front month by volumeES.v.1- Second month by volume (forward contract)ES.c.0- Front month by calendar- Format:
ROOT.RULE.POSITIONwhere RULE = v (volume) or c (calendar)
Examples:
Specific contracts with context-aware parsing:
# ESM1 interpreted as June 2021 (decade from query dates)
futures_2021 = client.get_futures(
symbols=['ESM1'],
start_date='2021-05-01',
end_date='2021-06-30'
)
# Same symbol ESM1 interpreted as June 2011
futures_2011 = client.get_futures(
symbols=['ESM1'],
start_date='2011-05-01',
end_date='2011-06-30'
)Continuous contracts:
# Get all contracts that were position 1 (second month) during 2020
continuous = client.get_futures(
symbols=['ES.v.1'],
start_date='2020-01-01',
end_date='2020-12-31'
)
# Returns: Multiple contracts, sorted by start_date ascendingResponse Format:
{
"count": 2,
"instruments": [
{
"symbol": "ESZ5",
"root": "ES",
"root_symbol": "ES",
"exch_mic": "XCME",
"currency": "USD",
"multiplier": 50,
"min_tick": 0.25,
"tick_value": 12.5,
"lot_units": "IPNT",
"asset_type": "Fut",
"contract_month": "December",
"contract_year": 2025,
"expiry_date": "2025-12-01"
}
]
}Continuous contract response includes rollover fields:
{
"symbol": "ESU1",
"continuous_symbol": "ES.v.1",
"rollover_rule": "v",
"rollover_rank": 1,
"start_date": "2011-09-12",
"end_date": "2011-09-18"
}Key Features:
- Context-aware decade detection: Single-digit years interpreted based on query dates
- Date validation: Rejects contracts >2 years from query date range
- Proper overlap filtering: Contracts must have meaningful overlap with query range
- Sorted results: Continuous contracts returned in ascending order by start_date
client.get_options()
Fetch options contract information.
Parameters:
client.get_options(
symbols: Optional[List[str]] = None, # OCC-format option symbols
chains: Optional[Union[str, List[str]]] = None, # Underlying symbol(s) for option chains
start_date: Optional[str] = None, # Start date for filtering
end_date: Optional[str] = None, # End date for filtering
expiration_date: Optional[str] = None, # Filter by expiration date
strike: Optional[Union[float, str]] = None, # Filter by strike price
option_type: Optional[str] = None, # Filter by type ('C' or 'P')
exchange: Optional[str] = None, # Filter by exchange
currency: Optional[str] = None, # Filter by currency
limit: Optional[int] = None, # Max records to return
offset: Optional[int] = None, # Records to skip
root: Optional[str] = None # Specify root
)Examples:
Get option chain by underlying:
# Returns all options for SPY underlying
all_options = client.get_options(chains=['SPY'])Get options with filters:
# Returns options active during period with filters
options = client.get_options(
chains=['SPY'],
start_date='2025-08-01',
end_date='2025-09-30',
expiration_date='2025-09-19',
strike=450.0,
option_type='C'
)Get specific option symbols:
options = client.get_options(
symbols=['SPY250919C00450000'],
start_date='2025-08-01',
end_date='2025-09-30'
)Response Format:
{
"instruments": [
{
"date": "2025-09-19",
"sym": "SPY250919C00450000",
"root": "SPY",
"ex": "CBOE",
"name": "SPDR S&P 500 ETF",
"underlying": "SPY",
"strike": "450.0",
"expiration": "2025-09-19",
"option_type": "C",
"multiplier": "100",
"assetType": "Opt"
}
]
}client.get_indices()
Fetch index instrument metadata.
Parameters:
client.get_indices(
symbols: Optional[List[str]] = None, # Index symbols (e.g., 'SPX', 'NDX')
date: Optional[str] = None, # Date for filtering (ISO format)
exchange: Optional[str] = None, # Filter by exchange
currency: Optional[str] = None, # Filter by currency
limit: Optional[int] = None, # Max records to return
offset: Optional[int] = None # Records to skip
)Example:
indices = client.get_indices(symbols=['SPX', 'NDX'])Publisher Client
hd.Publisher()
Client for publishing data to HiveQ destinations. Supports adding new records and modifying existing records. Provides both synchronous and asynchronous publishing modes.
Initialization:
client = hd.Publisher(
api_key: Optional[str] = None, # Override global API key
base_url: Optional[str] = None, # Override global base URL
async_mode: bool = True, # Async HTTP for 10-100x performance (default)
user_id: Optional[str] = None, # User ID for request headers
org_id: Optional[str] = None, # Organization ID for request headers
user_name: Optional[str] = None # User name for request headers
)Parameters:
api_key: Override global API keybase_url: Override global base URLasync_mode: If True (default), uses async HTTP for fire-and-forget publishesuser_id: User ID for request headers (X-User-ID)org_id: Organization ID for request headers (X-Org-ID)user_name: User name for request headers (X-User-Name)
client.publish()
Publish data to a schema in a HiveQ destination.
Parameters:
client.publish(
schema: str, # Required: Schema name (e.g., "event_logs")
data: List[Dict[str, Any]], # Required: List of records to publish
key: str, # Required: Unique key identifier for tracking
operation: str = "add" # Optional: "add" (insert) or "modify" (update)
)Operation Types:
| Operation | Description | Use Case |
|---|---|---|
add | Insert new records | Publishing new events, logs, or data |
modify | Update existing records | Correcting or updating previously published data |
client.flush()
Wait for all in-flight async publishes to complete. Only relevant when async_mode=True.
Usage:
publisher = hd.Publisher(async_mode=True)
# Publish many records...
for i in range(1000):
publisher.publish(...)
# Wait for all publishes to complete
publisher.flush()Behavior:
- Blocks until all pending async publish operations finish
- Should be called before shutdown to ensure data integrity
- No-op in synchronous mode (async_mode=False)
client.close()
Close the publisher and cleanup resources.
Usage:
publisher = hd.Publisher(async_mode=True)
# ... publish data ...
publisher.close() # Automatically flushes and closes connectionsBehavior:
- Flushes pending requests (if async mode)
- Closes HTTP connections
- Should be called when done using the publisher
Examples
Synchronous Mode
Add New Records:
import hiveq_data as hd
publisher = hd.Publisher(async_mode=False)
# Publish strategy event logs
result = publisher.publish(
schema='event_logs',
key='run_20240101_001',
operation='add',
data=[
{
'ts_event': '2024-01-01T10:00:00.000Z',
'strategy_id': 'my_strategy',
'trader_id': 'trader_001',
'nav': 100000.0,
'realized_pnl': 1500.0,
'total_pnl': 1500.0,
'event_log_type': 'TRADE',
'sub_event_type': 'FILLED',
'symbol': 'AAPL',
'message': 'Trade executed'
},
{
'ts_event': '2024-01-01T10:05:00.000Z',
'strategy_id': 'my_strategy',
'trader_id': 'trader_001',
'nav': 101200.0,
'realized_pnl': 2700.0,
'total_pnl': 2700.0,
'event_log_type': 'TRADE',
'sub_event_type': 'FILLED',
'symbol': 'MSFT',
'message': 'Trade executed'
}
]
)
print(f"Published {result['record_count']} records")
publisher.close()Response Format:
{
"status": "success",
"record_count": 2,
"schema": "event_logs",
"key": "run_20240101_001",
"operation": "add"
}Modify Existing Records:
# Update previously published records
result = publisher.publish(
schema='event_logs',
key='run_20240101_001',
operation='modify',
data=[
{
'ts_event': '2024-01-01T10:00:00.000Z',
'strategy_id': 'my_strategy',
'trader_id': 'trader_001',
'nav': 102000.0, # Updated value
'realized_pnl': 3500.0, # Updated value
'total_pnl': 3500.0 # Updated value
}
]
)Async Mode (Default, High Performance)
Async mode is the default and provides 10-100x performance improvement for high-frequency publishing:
import hiveq_data as hd
# Async mode is the default
publisher = hd.Publisher()
# Publish many records quickly (fire-and-forget)
for i in range(1000):
result = publisher.publish(
schema='event_logs',
key=f'run_{i}',
operation='add',
data=[{
'ts_event': '2024-01-01T10:00:00.000Z',
'strategy_id': 'my_strategy',
'nav': 100000.0,
'realized_pnl': 1500.0
}]
)
# Returns immediately without waiting for HTTP response
# Always flush before shutdown to ensure all data is published
publisher.flush()
publisher.close()Async Mode Benefits:
- 10-100x faster for high-frequency publishing
- Fire-and-forget:
publish()returns immediately - Concurrent HTTP requests with connection pooling (100 connections)
- Automatic retry with exponential backoff (3 attempts)
- Perfect for backtesting and batch publishing
Async Mode Response Format:
{
"success": true,
"async": true,
"records": 1
}Error Handling:
try:
result = publisher.publish(
schema='event_logs',
key='run_20240101_001',
operation='invalid_op', # Invalid operation
data=[...]
)
except ValueError as e:
print(f"Validation error: {e}")
# Output: Invalid operation 'invalid_op'. Must be one of: add, modifyCommon Schema Examples:
Event Logs Schema:
{
'ts_event': '2024-01-01T10:00:00.000Z', # Event timestamp (ISO format)
'strategy_id': 'my_strategy', # Strategy identifier
'trader_id': 'trader_001', # Trader identifier
'nav': 100000.0, # Net asset value
'realized_pnl': 1500.0, # Realized P&L
'total_pnl': 1500.0, # Total P&L
'event_log_type': 'TRADE', # Event type
'sub_event_type': 'FILLED', # Sub-event type
'symbol': 'AAPL', # Trading symbol
'message': 'Trade executed' # Event message
}SignalPublisher Client
hd.SignalPublisher()
Kafka-based publisher for user signal data. Optimized for high-throughput signal publishing with low latency and automatic reconnection.
Initialization:
publisher = hd.SignalPublisher(
bootstrap_servers: Optional[str] = None, # Kafka broker address (e.g., 'localhost:9092')
topic: str = 'user-signals', # Kafka topic to publish to
auto_reconnect: bool = True, # Enable automatic reconnection
reconnect_interval: float = 1.0, # Seconds between reconnection attempts
status_log_interval: float = 30.0, # Seconds between status log messages
**kafka_config # Additional Kafka producer configuration
)Parameters:
bootstrap_servers(str, optional): Kafka broker address. If None, loads fromKAFKA_BOOTSTRAP_SERVERSenv variable. Default:'localhost:9092'topic(str, optional): Kafka topic to publish to (default:'user-signals')auto_reconnect(bool, optional): Enable automatic reconnection on connection failure (default:True)reconnect_interval(float, optional): Seconds between reconnection attempts (default:1.0)status_log_interval(float, optional): Seconds between status log messages (default:30.0)**kafka_config: Additional Kafka producer configuration (e.g.,acks='all',compression_type='gzip',retries=3)
Signal Format:
{
'data': {'signal': 0, 'weight': 0}, # Signal data dict
'user_id': 'Raj', # User identifier
'ts_event': '2025-11-05 19:00:02.267000', # Event timestamp (auto-generated)
'symbol': 'AAPL', # Symbol
'signal_version': 1, # Version (default: 1)
'signal_id': '39831f03-7022-4c0f-...' # UUID (auto-generated)
}Methods:
-
publish(data, symbol, user_id, ts_event=None, signal_version=1, signal_id=None, key=None): Publish a single signaldata(dict, required): Signal data dict with'signal'and'weight'keyssymbol(str, required): Symbol string (e.g.,'AAPL')user_id(str, required): User identifier stringts_event(datetime, optional): Event timestamp (auto-generated if None)signal_version(int, optional): Signal version number (default: 1)signal_id(str, optional): Signal UUID (auto-generated if None)key(str, optional): Kafka partition key (defaults to symbol)
-
publish_batch(signals, key=None): Publish multiple signalssignals(list, required): List of signal dicts, each containingdata,symbol,user_idkey(str, optional): Kafka partition key (if None, uses symbol from each signal)- Returns: Number of successfully published signals (int)
-
flush(): Flush pending messages to Kafka -
close(): Close the Kafka producer connection -
is_connected(): Check if connected to Kafka (returns bool) -
get_connection_status(): Get detailed connection status dict
Context Manager Support:
with hd.SignalPublisher(bootstrap_servers='localhost:9092') as publisher:
publisher.publish(
data={'signal': 1, 'weight': 0.85},
symbol='AAPL',
user_id='Raj'
)
# Automatically closedBatch Example:
import hiveq_data as hd
publisher = hd.SignalPublisher(
bootstrap_servers='localhost:9092',
topic='user-signals'
)
# Publish batch
signals = [
{
'data': {'signal': 1, 'weight': 0.85},
'symbol': 'AAPL',
'user_id': 'Raj'
},
{
'data': {'signal': 0, 'weight': 0.0},
'symbol': 'MSFT',
'user_id': 'Raj'
}
]
count = publisher.publish_batch(signals)
print(f"Published {count} signals")
publisher.close()Advanced Configuration:
publisher = hd.SignalPublisher(
bootstrap_servers='localhost:9092',
topic='user-signals',
acks='all', # Wait for all replicas
compression_type='gzip', # Compress messages
retries=3, # Retry failed sends
max_in_flight_requests_per_connection=1 # Ensure ordering
)Return Data Reference
Complete field-by-field documentation of all return data structures with data types and descriptions.
Historical Data Response Structure
All Historical.get_data() calls return a consistent structure:
{
"data": [...],
"schema": "string",
"dataset": "string"
}Top-Level Fields:
| Field | Type | Description |
|---|---|---|
data | List[Dict] | Array of data records matching the query |
schema | str | Schema name used for the query (e.g., "bars_1s") |
dataset | str | Dataset name queried (e.g., "HIVEQ_US_EQ") |
Equity OHLC Data Fields (bars_1s, bars_1m)
{
"date": "2025-08-01",
"time": "2025-08-01T09:30:01.000000000",
"symbol": "AAPL",
"rtype": 32,
"instrument_id": 12345,
"source_id": "DATABENTO",
"src_dataset": "GLBX.MDP3",
"publisher_id": 1,
"open": 185.5,
"high": 186.2,
"low": 185.3,
"close": 186.0,
"volume": 125000.0,
"ts_event": "2025-08-01T09:30:01.000000000",
"recv_event": "2025-08-01T09:30:01.100000000",
"version": 1,
"db_event": "2025-08-01T09:30:01.200000000"
}Field Descriptions:
| Field | Type | Nullable | Description |
|---|---|---|---|
date | Date | No | Trading date (YYYY-MM-DD) |
time | DateTime64(9) | No | Timestamp with nanosecond precision |
symbol | String | No | Stock ticker symbol |
rtype | UInt16 | No | Record type identifier |
instrument_id | UInt32 | No | Unique instrument identifier |
source_id | String | No | Data source identifier |
src_dataset | String | No | Source dataset name |
publisher_id | UInt32 | Yes | Publisher identifier |
open | Float64 | Yes | Opening price for the period |
high | Float64 | Yes | Highest price during the period |
low | Float64 | Yes | Lowest price during the period |
close | Float64 | Yes | Closing price for the period |
volume | Float64 | Yes | Trading volume (number of shares) |
ts_event | DateTime64(9) | No | Event timestamp from exchange |
recv_event | DateTime64(9) | No | Timestamp when received by system |
version | UInt8 | Yes | Data version number |
db_event | DateTime64(9) | No | Database insertion timestamp |
Equity Daily Bars (bars_daily)
{
"date": "2025-08-01",
"trade_date": "2025-08-01",
"symbol": "AAPL",
"open": 185.5,
"high": 186.2,
"low": 185.3,
"close": 186.0,
"volume": 45000000.0,
"adj_open": 185.5,
"adj_high": 186.2,
"adj_low": 185.3,
"adj_close": 186.0,
"adj_volume": 45000000.0,
"factor": 1.0,
"ts_created": "2025-08-01T16:00:00.000",
"recv_event": "2025-08-01T16:00:01",
"source_id": "DATABENTO",
"version": "2025-08-01T16:00:01"
}Field Descriptions:
| Field | Type | Nullable | Description |
|---|---|---|---|
date | Date | No | Calendar date |
trade_date | Date | No | Actual trading date |
symbol | String | No | Stock ticker symbol |
open | Float64 | Yes | Unadjusted opening price |
high | Float64 | Yes | Unadjusted high price |
low | Float64 | Yes | Unadjusted low price |
close | Float64 | Yes | Unadjusted closing price |
volume | Float64 | Yes | Unadjusted volume |
adj_open | Float64 | Yes | Split & dividend adjusted opening price |
adj_high | Float64 | Yes | Split & dividend adjusted high price |
adj_low | Float64 | Yes | Split & dividend adjusted low price |
adj_close | Float64 | Yes | Split & dividend adjusted closing price |
adj_volume | Float64 | Yes | Split adjusted volume |
factor | Float64 | No | Cumulative adjustment factor |
ts_created | DateTime64(3) | No | Record creation timestamp |
recv_event | DateTime | No | Receive timestamp |
source_id | String | No | Data source identifier |
version | DateTime | No | Version timestamp |
Equity Trades Data
{
"date": "2025-08-01",
"time": "2025-08-01T09:30:01.123456789",
"symbol": "AAPL",
"rtype": 160,
"instrument_id": 12345,
"source_id": "DATABENTO",
"src_dataset": "GLBX.MDP3",
"publisher_id": 1,
"price": 185.75,
"size": 100,
"action": "T",
"side": "B",
"flags": 0,
"depth": 0,
"bid_px": 185.74,
"ask_px": 185.76,
"bid_sz": 500,
"ask_sz": 300,
"bid_ct": 5,
"ask_ct": 3,
"sequence": 12345678,
"ts_event": "2025-08-01T09:30:01.123456789",
"ts_recv": "2025-08-01T09:30:01.124000000",
"ts_in_delta": 544211,
"recv_event": "2025-08-01T09:30:01.125000000",
"version": 1,
"db_event": "2025-08-01T09:30:01.126000000"
}Field Descriptions:
| Field | Type | Nullable | Description |
|---|---|---|---|
date | Date | No | Trading date |
time | DateTime64(9) | No | Timestamp with nanosecond precision |
symbol | String | No | Stock ticker symbol |
rtype | UInt16 | No | Record type (160 for trades) |
instrument_id | UInt32 | No | Unique instrument identifier |
source_id | String | No | Data source identifier |
src_dataset | String | No | Source dataset name |
publisher_id | UInt32 | Yes | Publisher identifier |
price | Float64 | Yes | Trade price |
size | UInt32 | Yes | Trade size (shares) |
action | String | No | Trade action (T=Trade, C=Cancel, etc.) |
side | String | No | Trade side (B=Buy, S=Sell, N=None) |
flags | UInt8 | Yes | Trade condition flags |
depth | UInt8 | Yes | Market depth level |
bid_px | Float64 | Yes | Best bid price at time of trade |
ask_px | Float64 | Yes | Best ask price at time of trade |
bid_sz | UInt32 | Yes | Best bid size |
ask_sz | UInt32 | Yes | Best ask size |
bid_ct | UInt32 | Yes | Number of orders at best bid |
ask_ct | UInt32 | Yes | Number of orders at best ask |
sequence | UInt32 | Yes | Sequence number |
ts_event | DateTime64(9) | No | Event timestamp from exchange |
ts_recv | DateTime64(9) | No | Receive timestamp |
ts_in_delta | UInt32 | Yes | Time delta (nanoseconds) |
recv_event | DateTime64(9) | No | System receive timestamp |
version | UInt8 | Yes | Data version |
db_event | DateTime64(9) | No | Database insertion timestamp |
Futures OHLC Data Fields (bars_1s, bars_1m)
{
"date": "2025-08-01",
"time": "2025-08-01T09:30:01.000000000",
"symbol": "ES.H25",
"root": "ES",
"rtype": 32,
"instrument_id": 54321,
"source_id": "DATABENTO",
"src_dataset": "GLBX.MDP3",
"publisher_id": 1,
"open": 4500.25,
"high": 4505.5,
"low": 4498.75,
"close": 4502.0,
"volume": 50000.0,
"ts_event": "2025-08-01T09:30:01.000000000",
"recv_event": "2025-08-01T09:30:01.100000000",
"version": 1,
"db_event": "2025-08-01T09:30:01.200000000"
}Field Descriptions:
| Field | Type | Nullable | Description |
|---|---|---|---|
date | Date | No | Trading date |
time | DateTime64(9) | No | Timestamp with nanosecond precision |
symbol | String | No | Full contract symbol (e.g., ES.H25) |
root | String | No | Futures root symbol (e.g., ES) |
rtype | UInt16 | No | Record type identifier |
instrument_id | UInt32 | No | Unique instrument identifier |
source_id | String | No | Data source identifier |
src_dataset | String | No | Source dataset name |
publisher_id | UInt32 | Yes | Publisher identifier |
open | Float64 | Yes | Opening price for the period |
high | Float64 | Yes | Highest price during the period |
low | Float64 | Yes | Lowest price during the period |
close | Float64 | Yes | Closing price for the period |
volume | Float64 | Yes | Trading volume (number of contracts) |
ts_event | DateTime64(9) | No | Event timestamp from exchange |
recv_event | DateTime64(9) | No | Timestamp when received by system |
version | UInt8 | Yes | Data version number |
db_event | DateTime64(9) | No | Database insertion timestamp |
Options Snapshot Data Fields (snaps_1s)
{
"date": "2025-08-01",
"time": "2025-08-01T09:30:01.000000000",
"symbol": "SPY250919C00450000",
"underlying": "SPY",
"strike": 450.0,
"expiration": "2025-09-19",
"option_type": "C",
"rtype": 96,
"instrument_id": 98765,
"source_id": "OPRA",
"src_dataset": "OPRA",
"publisher_id": 1,
"bid_px": 12.5,
"ask_px": 12.55,
"bid_sz": 100,
"ask_sz": 150,
"bid_ct": 5,
"ask_ct": 7,
"last_px": 12.52,
"last_sz": 10,
"open_interest": 25000,
"implied_vol": 0.185,
"delta": 0.52,
"gamma": 0.015,
"theta": -0.08,
"vega": 0.35,
"rho": 0.12,
"sequence": 123456,
"ts_event": "2025-08-01T09:30:01.000000000",
"ts_recv": "2025-08-01T09:30:01.100000000",
"recv_event": "2025-08-01T09:30:01.110000000",
"version": 1,
"db_event": "2025-08-01T09:30:01.120000000"
}Field Descriptions:
| Field | Type | Nullable | Description |
|---|---|---|---|
date | Date | No | Trading date |
time | DateTime64(9) | No | Timestamp with nanosecond precision |
symbol | String | No | Full option symbol (OSI format) |
underlying | String | No | Underlying security symbol |
strike | Float64 | No | Strike price |
expiration | Date | No | Expiration date |
option_type | String | No | Option type (C=Call, P=Put) |
rtype | UInt16 | No | Record type identifier |
instrument_id | UInt32 | No | Unique instrument identifier |
source_id | String | No | Data source identifier |
src_dataset | String | No | Source dataset name |
publisher_id | UInt32 | Yes | Publisher identifier |
bid_px | Float64 | Yes | Best bid price |
ask_px | Float64 | Yes | Best ask price |
bid_sz | UInt32 | Yes | Best bid size (contracts) |
ask_sz | UInt32 | Yes | Best ask size (contracts) |
bid_ct | UInt32 | Yes | Number of orders at best bid |
ask_ct | UInt32 | Yes | Number of orders at best ask |
last_px | Float64 | Yes | Last trade price |
last_sz | UInt32 | Yes | Last trade size |
open_interest | UInt32 | Yes | Total open interest |
implied_vol | Float64 | Yes | Implied volatility (decimal) |
delta | Float64 | Yes | Option delta (Greek) |
gamma | Float64 | Yes | Option gamma (Greek) |
theta | Float64 | Yes | Option theta (Greek) |
vega | Float64 | Yes | Option vega (Greek) |
rho | Float64 | Yes | Option rho (Greek) |
sequence | UInt32 | Yes | Sequence number |
ts_event | DateTime64(9) | No | Event timestamp from exchange |
ts_recv | DateTime64(9) | No | Receive timestamp |
recv_event | DateTime64(9) | No | System receive timestamp |
version | UInt8 | Yes | Data version |
db_event | DateTime64(9) | No | Database insertion timestamp |
InstrumentReference Response Structure
All InstrumentReference methods return:
{
"instruments": [...]
}Top-Level Fields:
| Field | Type | Description |
|---|---|---|
instruments | List[Dict] | Array of instrument metadata records |
Equity Instrument Fields
{
"date": "2025-08-26",
"sym": "AAPL",
"root": "AAPL",
"ex": "NASDAQ",
"name": "Apple Inc",
"openTime": "09:30:00",
"closeTime": "16:00:00",
"rootMinTick": "0.01",
"curContract": "AAPL",
"multiplier": "1",
"assetType": "Eq",
"tickSizePilot": "false",
"rolloverSymbol": "",
"rolloverDate": "",
"securityID": "AAPL",
"minOrderSize": "1",
"maxOrderSize": "10000",
"baseIncrement": "1",
"quoteIncrement": "0.01",
"underlying": "",
"Version": "V4"
}Field Descriptions:
| Field | Type | Description |
|---|---|---|
date | String | Reference data timestamp |
sym | String | Trading symbol |
root | String | Root symbol (same as sym for equities) |
ex | String | Primary exchange |
name | String | Full company name |
openTime | String | Market open time (HH:MM:SS) |
closeTime | String | Market close time (HH:MM:SS) |
rootMinTick | String | Minimum price increment |
curContract | String | Current contract symbol |
multiplier | String | Contract multiplier (1 for equities) |
assetType | String | Asset type (Eq=Equity, Fut=Futures, Opt=Options) |
tickSizePilot | String | Tick size pilot program participant ("true"/"false") |
rolloverSymbol | String | Next rollover symbol (empty for equities) |
rolloverDate | String | Rollover date (empty for equities) |
securityID | String | Security identifier |
minOrderSize | String | Minimum order size |
maxOrderSize | String | Maximum order size |
baseIncrement | String | Base order size increment |
quoteIncrement | String | Quote price increment |
underlying | String | Underlying security (empty for equities) |
Version | String | Data version |
Futures Instrument Fields
{
"date": "2025-03-13",
"sym": "ES.H25",
"root": "ES",
"ex": "CME",
"name": "E-MINI S&P 500 FUTURES",
"openTime": "18:00:00",
"closeTime": "17:00:00",
"rootMinTick": "0.25",
"curContract": "ES.H25",
"multiplier": "50",
"assetType": "Fut",
"tickSizePilot": "false",
"rolloverSymbol": "",
"rolloverDate": "",
"securityID": "ES.H25",
"minOrderSize": "1",
"maxOrderSize": "100",
"baseIncrement": "1",
"quoteIncrement": "0.25",
"underlying": "",
"Version": "V4"
}Futures-Specific Fields:
| Field | Type | Description |
|---|---|---|
symbol | String | Contract symbol (Databento format, e.g., ESZ5, ESM1) |
root | String | Futures root symbol (e.g., ES) |
root_symbol | String | Futures root symbol (e.g., ES) |
multiplier | Number | Contract multiplier (e.g., 50 for ES) |
min_tick | Number | Minimum tick size (e.g., 0.25 for ES) |
tick_value | Number | Value per tick (e.g., 12.5 for ES) |
contract_month | String | Contract month name (e.g., "December") |
contract_year | Number | Contract year (e.g., 2025) |
expiry_date | String | Contract expiry date (YYYY-MM-DD) |
Continuous Contract Additional Fields:
| Field | Type | Description |
|---|---|---|
continuous_symbol | String | Continuous symbol (e.g., ES.v.1) |
rollover_rule | String | Rollover methodology (v=volume, c=calendar) |
rollover_rank | Number | Position (0=front, 1=second, etc.) |
start_date | String | Date contract became active at this position |
end_date | String | Date contract rolled off this position |
Options Instrument Fields
{
"date": "2025-09-19",
"sym": "SPY250919C00450000",
"root": "SPY",
"ex": "CBOE",
"name": "SPDR S&P 500 ETF",
"openTime": "09:30:00",
"closeTime": "16:00:00",
"rootMinTick": "0.01",
"curContract": "SPY250919C00450000",
"multiplier": "100",
"assetType": "Opt",
"tickSizePilot": "false",
"rolloverSymbol": "",
"rolloverDate": "",
"securityID": "SPY250919C00450000",
"minOrderSize": "1",
"maxOrderSize": "500",
"baseIncrement": "1",
"quoteIncrement": "0.01",
"underlying": "SPY",
"Version": "V4"
}Additional Options-Specific Fields:
| Field | Type | Description |
|---|---|---|
sym | String | Full option symbol in OSI format |
root | String | Underlying security symbol |
underlying | String | Underlying security (same as root) |
multiplier | String | Contract multiplier (typically 100) |
MetaReference Response Structures
get_databases() Response
{
"databases": ["prod_equities_001", "prod_options_001", "prod_futures_001"]
}| Field | Type | Description |
|---|---|---|
databases | List[String] | Array of database names |
get_tables() Response
{
"tables": [
{
"db_name": "prod_equities_001",
"table_name": "eq_ohlc_1s",
"schema_name": "bars_1s",
"description": "Equity OHLC data at 1-second intervals with volume information",
"schema": [],
"col_descriptions": [],
"default_query_cols": [],
"primary_keys": [],
"order_by": [],
"tags": [],
"data_sets": [],
"src_ids": [],
"src_data_sets": [],
"is_active": true,
"last_updated": "2025-08-26T12:00:00",
"created": "2024-01-01T00:00:00"
}
]
}Table Object Fields:
| Field | Type | Description |
|---|---|---|
db_name | String | Database name |
table_name | String | Table name |
schema_name | String | Associated schema name |
description | String | Table description |
schema | List[Object] | Column schema definitions |
col_descriptions | List[Object] | Column descriptions |
default_query_cols | List[String] | Recommended query columns |
primary_keys | List[String] | Primary key columns |
order_by | List[String] | Default ordering columns |
tags | List[String] | Associated tags |
data_sets | List[String] | Associated dataset names |
src_ids | List[String] | Source identifiers |
src_data_sets | List[String] | Source dataset names |
is_active | Boolean | Whether table is active |
last_updated | String | Last update timestamp |
created | String | Creation timestamp |
Schema Column Object:
{
"name": "date",
"type": "Date",
"nullable": false
}| Field | Type | Description |
|---|---|---|
name | String | Column name |
type | String | ClickHouse data type |
nullable | Boolean | Whether column allows NULL values |
Column Description Object:
{
"name": "date",
"description": "Trading date"
}| Field | Type | Description |
|---|---|---|
name | String | Column name |
description | String | Human-readable description |
get_tags() Response
{
"tags": [
"1-minute",
"1-second",
"adjusted",
"best-bid-offer",
"daily",
"equity",
"futures",
"market-data",
"ohlc",
"options",
"timeseries",
"trades"
]
}| Field | Type | Description |
|---|---|---|
tags | List[String] | Array of available tags |
Error Handling
All API methods raise ValueError for validation errors and other exceptions for runtime errors.
Example:
import hiveq_data as hd
try:
client = hd.Historical()
data = client.get_data(
dataset='HIVEQ_US_FUT',
schema='bars_1s',
start='2025-08-01',
end='2025-08-30'
# Missing required parameter: neither 'symbols' nor 'root' provided
)
except ValueError as e:
print(f"Validation error: {e}")
# Output: "At least one of these parameters is required for schema 'bars_1s' (asset_class: futures): symbols, root"Common Patterns
Context Manager Support
All clients support context managers for automatic cleanup:
import hiveq_data as hd
with hd.Historical() as client:
data = client.get_data(
dataset='HIVEQ_US_EQ',
schema='bars_1s',
symbols='AAPL',
start='2025-08-01',
end='2025-08-30'
)
# Session automatically closedPer-Client Credentials
Override global credentials for specific clients:
import hiveq_data as hd
# Production client using .env credentials
prod_client = hd.Historical()
# Development client with different credentials
dev_client = hd.Historical(
api_key='dev_key',
base_url='http://localhost:3000'
)Flexible Date Formats
All date parameters accept multiple formats:
from datetime import date, datetime
import hiveq_data as hd
client = hd.Historical()
# String format
data = client.get_data(..., start='2025-08-01', end='2025-08-30')
# Date object
data = client.get_data(..., start=date(2025, 8, 1), end=date(2025, 8, 30))
# Datetime object
data = client.get_data(..., start=datetime(2025, 8, 1, 9, 30), end=datetime(2025, 8, 30, 16, 0))Futures Symbology Reference
Databento Symbology (Context-Aware):
- Single digit years: Decade inferred from query dates
ESM1with 2021 dates -> June 2021ESM1with 2011 dates -> June 2011
- Two digit years: 2000+ century
ESZ25-> December 2025ESM26-> June 2026
Continuous Contracts:
Format: ROOT.RULE.POSITION
- ROOT: Futures symbol (ES, NQ, etc.)
- RULE:
v= Volume-based rolloverc= Calendar-based rollover
- POSITION:
0= Front month (currently active)1= Second month (forward contract)2= Third month, etc.
Examples:
ES.v.0- ES front month by volumeES.v.1- ES second month by volumeNQ.c.0- NQ front month by calendar
Month Codes:
- F=Jan, G=Feb, H=Mar, J=Apr, K=May, M=Jun
- N=Jul, Q=Aug, U=Sep, V=Oct, X=Nov, Z=Dec