HiveQ Docs
HiveQ Data

Python Data API

HiveQ Data API

A Python client for accessing HiveQ's historical and live market data with ease.

Version: 0.2.1

Features

Five Powerful Clients

  1. Historical - Query historical market data with flexible date/time ranges (API-based)
  2. InstrumentReference - Access instrument metadata like multipliers, roots, symbols, etc. (API-based)
  3. Publisher - Publish strategy events, logs, and custom data to HiveQ destinations (API-based, async mode default)
  4. Metadata - Get dataset and schema information from HiveQ platform (API-based)
  5. LiveStream - Stream live market data in real-time (WebSocket-based)

Additional Features

  • Centralized Configuration: Load credentials from environment variables or direct parameters
  • Fast JSON Parsing: Built on orjson for high-performance data processing
  • Async Publishing: Fire-and-forget publishing with 10-100x performance improvement (default mode)
  • Automatic Rollover Detection: Futures contract rollover information with multiple methodologies

Installation

Install from PyPI (coming soon)

pip install hiveq-data

Development Installation

# Clone the repository
cd python-sdk

# Install in development mode
pip install -e .

# Or install with development dependencies
pip install -e ".[dev]"

Check Version

import hiveq_data as hd

print(hd.__version__)  # Outputs: 0.2.1

Configuration

No configuration needed! The SDK automatically loads credentials from environment variables:

  1. Searches for .env file in your current directory and parent directories
  2. Falls back to environment variables (HIVEQ_API_KEY, HIVEQ_BASE_URL, HIVEQ_USER_ID, HIVEQ_ORG_ID, HIVEQ_USER_NAME)

Just create a .env file (see .env.example):

HIVEQ_API_KEY=your_api_key_here
HIVEQ_BASE_URL=https://api.hiveq.com
HIVEQ_USER_ID=your_user_id
HIVEQ_ORG_ID=your_org_id
HIVEQ_USER_NAME=your_username

That's it! No configure() call needed:

import hiveq_data as hd

# Credentials automatically loaded from .env!
client = hd.Historical()
data = client.get_data(...)

Manual Configuration (Optional)

You only need to call configure() if you want to:

  • Override auto-loaded values
  • Set credentials programmatically

Override base URL for local development

import hiveq_data as hd

# Auto-loads credentials from .env, but overrides base_url
hd.configure(base_url="http://localhost:3000")

Set credentials programmatically

import hiveq_data as hd

hd.configure(
    api_key='your_api_key_here',
    base_url='https://api.hiveq.com',
    user_id='your_user_id',
    org_id='your_org_id',
    user_name='your_username'
)

Use environment variables only

export HIVEQ_API_KEY=your_api_key_here
export HIVEQ_BASE_URL=https://api.hiveq.com
export HIVEQ_USER_ID=your_user_id
export HIVEQ_ORG_ID=your_org_id
export HIVEQ_USER_NAME=your_username
import hiveq_data as hd

# Auto-loads from environment variables
client = hd.Historical()

Logging Configuration (Optional)

The SDK uses Python's standard logging module. By default, logs are suppressed. Configure logging in your application to see SDK logs:

import logging
import hiveq_data as hd

# Basic configuration - show all SDK logs at INFO level
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Or configure only HiveQ SDK logs
logging.getLogger('hiveq_data').setLevel(logging.DEBUG)

# Create client - will now show debug logs
client = hd.Historical()

Log Levels:

  • DEBUG - Detailed information for debugging (API payloads, parsing details)
  • INFO - General informational messages (operations completed)
  • WARNING - Warning messages (fallback behaviors, missing data)
  • ERROR - Error messages (API failures, parsing errors)

Filter specific modules:

# Only show futures reference logs
logging.getLogger('hiveq_data.instrument_reference.futures_reference').setLevel(logging.DEBUG)

# Suppress async client logs
logging.getLogger('hiveq_data.internal.async_http_client').setLevel(logging.ERROR)

Quick Start

Make sure you have a .env file with your credentials (credentials are automatically loaded on import).

Historical Data

import hiveq_data as hd

# Create client (automatically uses .env credentials)
historical = hd.Historical()

# Equity data - use 'symbols' parameter
equity_data = historical.get_data(
    dataset='HIVEQ_US_EQ',
    schema='bars_1s',
    symbols=['AAPL', 'MSFT'],
    start='2025-08-01',
    end='2025-08-30'
)
print(equity_data)

# Futures data - Option 1: Query by root (all contracts for that root)
futures_by_root = historical.get_data(
    dataset='HIVEQ_US_FUT',
    schema='bars_1s',
    root=['ES', 'NQ'],  # E-mini S&P 500 and Nasdaq futures (all contracts)
    start='2025-08-01',
    end='2025-08-30'
)
print(futures_by_root)

# Futures data - Option 2: Query by specific contract symbols
futures_by_symbol = historical.get_data(
    dataset='HIVEQ_US_FUT',
    schema='bars_1s',
    symbols=['ESH25', 'NQM25'],  # Specific futures contracts
    start='2025-08-01',
    end='2025-08-30'
)
print(futures_by_symbol)

# Options data - Option 1: Query by chains (all options for underlying)
options_by_chain = historical.get_data(
    dataset='HIVEQ_US_OPT',
    schema='snaps_1s',
    chains=['SPY'],  # All SPY options
    start='2025-08-01',
    end='2025-08-30',
    expiration_date='2025-09-19',  # Optional: filter by expiration
    strike=450.0,  # Optional: filter by strike
    option_type='C'  # Optional: filter by option type (C=Call, P=Put)
)
print(options_by_chain)

# Options data - Option 2: Query by specific option symbols
options_by_symbol = historical.get_data(
    dataset='HIVEQ_US_OPT',
    schema='snaps_1s',
    symbols=['SPY250919C00450000'],  # Specific option contract
    start='2025-08-01',
    end='2025-08-30'
)
print(options_by_symbol)

# Session mode - same intraday window across multiple dates
session_data = historical.get_data(
    dataset='HIVEQ_US_EQ',
    schema='bars_1s',
    symbols=['AAPL'],
    start='2025-08-01 09:30:00',
    end='2025-08-05 10:00:00',
    filter_mode='session'
)
print(session_data)

Instrument Reference

Query instrument metadata including contract specifications, multipliers, exchanges, and more.

import hiveq_data as hd

# Create client
instruments = hd.InstrumentReference()

# Get equity metadata
equities = instruments.get_equities(symbols=['AAPL', 'MSFT'])
print(f"Found {equities['count']} equities")

# Futures: Specific contracts
# Supports Databento symbology (ESM1 = June 2021 or 2011 based on query dates)
futures = instruments.get_futures(
    symbols=['ESM1', 'ESZ5'],
    start_date='2021-05-01',
    end_date='2021-06-30'
)
# Returns: ESM1 parsed as June 2021 (context-aware)

# Futures: Continuous contracts
# Format: ROOT.RULE.POSITION (e.g., ES.v.1 = volume-based, position 1)
continuous = instruments.get_futures(
    symbols=['ES.v.1'],      # Second month by volume
    start_date='2020-01-01',
    end_date='2020-12-31',
    expiry_type='volume'     # 'volume', 'expiry', 'last_trade', 'open_interest'
)
# Returns: All contracts that were position 1 during 2020

# Other continuous contract examples:
# ES.v.0 - Front month by volume
# ES.c.0 - Front month by calendar
# ES.v.2 - Third month by volume

# Exchange and currency filters
futures_filtered = instruments.get_futures(
    symbols=['ES.v.0'],
    start_date='2024-01-01',
    end_date='2024-01-31',
    exchange='XCME',
    currency='USD'
)

# Options contracts
options = instruments.get_options(
    chains=['AAPL'],
    expiration_date='2025-06-20',
    option_type='C'
)

# Index instruments
indices = instruments.get_indices(symbols=['SPX', 'NDX'])

Publisher

Note: Publisher requires API credentials and is used for writing data to HiveQ destinations. Supports both synchronous and asynchronous publishing modes. Async mode is the default.

Async Mode (Default)

import hiveq_data as hd

# Create publisher client (async_mode=True by default)
publisher = hd.Publisher()

# Publish strategy event logs (returns immediately, non-blocking)
publisher.publish(
    schema='event_logs',
    key='run_20240101_001',
    operation='add',  # 'add' for new records, 'modify' for updates
    data=[
        {
            'ts_event': '2024-01-01T10:00:00.000Z',
            'strategy_id': 'my_strategy',
            'trader_id': 'trader_001',
            'nav': 100000.0,
            'realized_pnl': 1500.0,
            'total_pnl': 1500.0,
            'event_log_type': 'TRADE',
            'sub_event_type': 'FILLED',
            'symbol': 'AAPL',
            'message': 'Trade executed'
        }
    ]
)

# Always flush before shutdown in async mode
publisher.flush()
publisher.close()

Synchronous Mode

import hiveq_data as hd

# Create publisher with sync mode
publisher = hd.Publisher(async_mode=False)

# Publish and wait for response
result = publisher.publish(
    schema='event_logs',
    key='run_20240101_001',
    operation='add',
    data=[
        {
            'ts_event': '2024-01-01T10:00:00.000Z',
            'strategy_id': 'my_strategy',
            'trader_id': 'trader_001',
            'nav': 100000.0
        }
    ]
)

publisher.close()

High-Frequency Publishing

For high-frequency publishing scenarios (e.g., backtesting), the default async mode provides 10-100x performance improvement:

import hiveq_data as hd

publisher = hd.Publisher()  # async_mode=True by default

# Publish data (returns immediately, non-blocking)
for i in range(1000):
    publisher.publish(
        schema='event_logs',
        key=f'run_{i}',
        operation='add',
        data=[{
            'ts_event': '2024-01-01T10:00:00.000Z',
            'strategy_id': 'my_strategy',
            'nav': 100000.0,
            'realized_pnl': 1500.0
        }]
    )

# Flush before shutdown to ensure all data is published
publisher.flush()
publisher.close()

SignalPublisher

Note: SignalPublisher uses Kafka for high-throughput signal publishing. Requires Kafka broker access.

SignalPublisher is optimized for publishing user signals (trading signals, strategy signals, etc.) to Kafka topics with low latency and high throughput.

Signal Format:

{
    'data': {'signal': 0, 'weight': 0},           # Signal data dict
    'user_id': 'Raj',                           # User identifier
    'ts_event': '2025-11-05 19:00:02.267000',     # Event timestamp (auto-generated)
    'symbol': 'AAPL',                             # Symbol
    'signal_version': 1,                          # Version (default: 1)
    'signal_id': '39831f03-7022-4c0f-...'        # UUID (auto-generated)
}

Basic Usage

import hiveq_data as hd

# Create signal publisher
# Bootstrap servers can be set via KAFKA_BOOTSTRAP_SERVERS env variable
publisher = hd.SignalPublisher(
    bootstrap_servers='localhost:9092',
    topic='user-signals'
)

# Publish signal
publisher.publish(
    data={'signal': 1, 'weight': 0.85},
    symbol='AAPL',
    user_id='Raj'
)

# Close when done
publisher.close()

Context Manager Usage

import hiveq_data as hd

# Automatic cleanup with context manager
with hd.SignalPublisher(
    bootstrap_servers='localhost:9092',
    topic='user-signals'
) as publisher:
    publisher.publish(
        data={'signal': 1, 'weight': 0.85},
        symbol='AAPL',
        user_id='Raj'
    )
# Publisher automatically closed

Batch Publishing

import hiveq_data as hd

with hd.SignalPublisher() as publisher:
    signals = [
        {
            'data': {'signal': 1, 'weight': 0.85},
            'symbol': 'AAPL',
            'user_id': 'Raj'
        },
        {
            'data': {'signal': 1, 'weight': 0.72},
            'symbol': 'MSFT',
            'user_id': 'Raj'
        },
        {
            'data': {'signal': 0, 'weight': 0.0},
            'symbol': 'GOOG',
            'user_id': 'Raj'
        },
    ]

    count = publisher.publish_batch(signals)
    print(f"Published {count} signals")

Custom Partition Keys

import hiveq_data as hd

with hd.SignalPublisher() as publisher:
    # Use custom key for partitioning (e.g., trader ID)
    publisher.publish(
        data={'signal': 1, 'weight': 0.85},
        symbol='AAPL',
        user_id='Raj',
        key='trader-001'
    )

Auto-Reconnect

import hiveq_data as hd

# Enable automatic reconnection on connection failure
publisher = hd.SignalPublisher(
    bootstrap_servers='localhost:9092',
    topic='user-signals',
    auto_reconnect=True,          # Enable auto-reconnect (default: True)
    reconnect_interval=1.0,        # Retry every 1 second (default: 1.0)
    status_log_interval=30.0       # Log status every 30 seconds (default: 30.0)
)

# Check connection status
if publisher.is_connected():
    publisher.publish(
        data={'signal': 1, 'weight': 0.85},
        symbol='AAPL',
        user_id='Raj'
    )
    print("Signal published successfully")
else:
    print("Publisher not connected, waiting for reconnection...")

# Get detailed status
status = publisher.get_connection_status()
print(f"Status: {status}")

publisher.close()

Advanced Kafka Configuration

import hiveq_data as hd

# Pass additional Kafka producer config
publisher = hd.SignalPublisher(
    bootstrap_servers='localhost:9092',
    topic='user-signals',
    acks='all',  # Wait for all replicas
    compression_type='gzip',  # Compress messages
    retries=3,  # Retry failed sends
    max_in_flight_requests_per_connection=1  # Ensure ordering
)

Features:

  • Kafka-based high-throughput publishing
  • Automatic reconnection on connection failure
  • Key-based partitioning by symbol or custom key
  • JSON serialization with UTF-8 encoding
  • Batch publishing support
  • Context manager support for automatic cleanup
  • Configurable Kafka producer settings
  • Connection status monitoring
  • Throttled reconnection logging
  • Error handling with retry logic

Metadata

Note: Metadata client requires API credentials and provides access to dataset and schema information.

import hiveq_data as hd

# Create metadata client
metadata = hd.Metadata()

# Get all available datasets
datasets = metadata.get_datasets()
print(f"Available datasets: {datasets}")

# Get schemas for all datasets
all_schemas = metadata.get_schemas(dataset=["*"])

# Get schemas for specific datasets
schemas = metadata.get_schemas(dataset=["HIVEQ_US_EQ", "HIVEQ_US_FUT"])

# Get schema for single dataset
schema = metadata.get_schemas(dataset="HIVEQ_US_EQ")
print(f"HIVEQ_US_EQ schemas: {schema}")

# Get details for a specific schema
schema_info = metadata.get_schema(schema='bars_1s', dataset='HIVEQ_US_EQ')

# Refresh schema cache
metadata.refresh_schema()

LiveStream

Note: LiveStream client connects to a WebSocket server for real-time market data streaming. It supports multiple subscriptions with a single connection, automatic reconnection every 1 second (configurable), and connection status monitoring every 30 seconds (configurable).

Basic Usage

import asyncio
import hiveq_data as hd

# Define single callback for handling all market data
async def handle_market_data(data):
    print(f"Market Data: {data}")

async def main():
    # Create LiveStream client
    # - Reconnects every 1 second
    # - Logs status every 30 seconds
    stream = hd.LiveStream(
        host='localhost',
        port=8765,
        auto_reconnect=True
    )

    # Connect to WebSocket server
    await stream.connect()

    # Subscribe to market data using single callback
    await stream.subscribe('market_data.equity.bars_1m', 'AAPL', handle_market_data)
    await stream.subscribe('market_data.equity.bars_1m', 'MSFT', handle_market_data)
    await stream.subscribe('market_data.equity.tbbo', 'AAPL', handle_market_data)

    # Subscribe to multiple keys at once
    await stream.subscribe('market_data.equity.bars_1m', ['GOOGL', 'TSLA'], handle_market_data)

    # Keep running
    await stream.wait_until_disconnected()

    # Cleanup
    await stream.disconnect()

# Run the stream
asyncio.run(main())

Using Context Manager

import asyncio
import hiveq_data as hd

async def handle_market_data(data):
    print(f"Market Data: {data}")

async def main():
    # Automatic connection management with context manager
    async with hd.LiveStream(host='localhost', port=8765) as stream:
        await stream.subscribe('market_data.equity.bars_1m', 'AAPL', handle_market_data)
        await stream.subscribe('market_data.equity.tbbo', 'MSFT', handle_market_data)

        # Run for 60 seconds
        await asyncio.sleep(60)
    # Stream automatically disconnected

asyncio.run(main())

Managing Subscriptions

async def handle_market_data(data):
    print(f"Market Data: {data}")

async with hd.LiveStream(host='localhost', port=8765) as stream:
    # Subscribe
    await stream.subscribe('market_data.equity.bars_1m', 'AAPL', handle_market_data)

    # Run for a while
    await asyncio.sleep(30)

    # Unsubscribe specific callback
    await stream.unsubscribe('market_data.equity.bars_1m', 'AAPL', handle_market_data)

    # Or unsubscribe all callbacks for a topic/key
    await stream.unsubscribe('market_data.equity.bars_1m', 'AAPL')

Features:

  • WebSocket-based real-time data streaming
  • Multiple subscriptions with single connection and single callback
  • Fast automatic reconnection every 1 second (configurable)
  • Auto-resubscription to all topics after reconnection
  • Connection status monitoring every 30 seconds with throttled logging
  • Can add subscriptions even when not connected
  • Async/await support with both sync and async callbacks
  • Context manager support for automatic cleanup
  • Thread-safe subscription management

Discovering Available Datasets and Schemas

To get the latest information about available datasets and schemas, use the Metadata client:

import hiveq_data as hd

# Create metadata client
metadata = hd.Metadata()

# Get all available datasets
datasets = metadata.get_datasets()
print("Available datasets:", datasets)

# Get all schemas for all datasets
all_schemas = metadata.get_schemas(dataset=["*"])

# Get schemas for specific dataset
eq_schemas = metadata.get_schemas(dataset="HIVEQ_US_EQ")
print("HIVEQ_US_EQ schemas:", eq_schemas)

Why use Metadata client?

  • Always returns current, up-to-date information
  • Includes detailed schema descriptions and parameters
  • Programmatically discover what's available
  • No hardcoded lists that can become stale

Parameter Requirements by Asset Class

Equity (HIVEQ_US_EQ):

  • Required: symbols, start, end
  • Optional: limit, offset, columns

Futures (HIVEQ_US_FUT):

  • Required: start, end, AND one of: symbols OR root
  • Optional: limit, offset, columns

Options (HIVEQ_US_OPT):

  • Required: start, end, AND one of: symbols OR chains
  • Optional: limit, offset, columns, expiration_date, strike, option_type

API Reference

Configuration

hd.configure(api_key=None, base_url=None, user_id=None, org_id=None, user_name=None) (Optional)

Configure API credentials globally. Usually not needed as credentials are automatically loaded from environment variables.

Parameters:

  • api_key (str, optional): Your HiveQ API key (overrides auto-loaded value)
  • base_url (str, optional): API base URL (overrides auto-loaded value)
  • user_id (str, optional): User ID for request headers (X-User-ID)
  • org_id (str, optional): Organization ID for request headers (X-Org-ID)
  • user_name (str, optional): User name for request headers (X-User-Name)

Historical Client

hd.Historical(api_key=None, base_url=None, user_id=None, org_id=None, user_name=None)

Historical market data client.

Methods:

  • get_data(dataset, schema, symbols=None, root=None, chains=None, start=None, end=None, limit=None, offset=None, filter_mode=None, **kwargs): Fetch historical data
    • dataset (str, required): Dataset name (e.g., 'HIVEQ_US_EQ', 'HIVEQ_US_FUT', 'HIVEQ_US_OPT')
    • schema (str, required): Schema name (e.g., 'bars_1s', 'trades', 'snaps_1s')
    • symbols (str|list, optional): Symbol(s) to query
    • root (str|list, optional): Root symbol(s) for futures
    • chains (str|list, optional): Underlying symbol(s) for options
    • start (str|date|datetime, required): Start date/datetime
    • end (str|date|datetime, required): End date/datetime
    • limit (int, optional): Maximum records to return
    • offset (int, optional): Records to skip for pagination
    • filter_mode (str, optional): 'continuous' (default) or 'session'
    • **kwargs: Additional parameters (e.g., expiration_date, strike, option_type, columns)
    • Returns: dict with 'data' and metadata

Publisher Client

hd.Publisher(api_key=None, base_url=None, async_mode=True, user_id=None, org_id=None, user_name=None)

Data publisher client. Async mode is the default.

Parameters:

  • api_key (str, optional): API key (overrides global config)
  • base_url (str, optional): Base URL (overrides global config)
  • async_mode (bool, optional): If True (default), uses async HTTP for fire-and-forget publishes
  • user_id (str, optional): User ID for request headers
  • org_id (str, optional): Organization ID for request headers
  • user_name (str, optional): User name for request headers

Methods:

  • publish(schema, data, key, operation='add'): Publish data to a schema

    • schema (str, required): Schema name (e.g., 'event_logs')
    • data (list, required): List of records to publish (each record is a dict)
    • key (str, required): Unique key identifier for tracking published data
    • operation (str, optional): 'add' (insert new) or 'modify' (update existing). Default: 'add'
    • Returns: Response dict (immediate in async mode)
  • flush(): Wait for all in-flight async publishes to complete

    • Only relevant when async_mode=True
  • close(): Close the publisher and cleanup resources

InstrumentReference Client

hd.InstrumentReference(api_key=None, base_url=None, user_id=None, org_id=None, user_name=None)

API-based instrument metadata client.

Methods:

  • get_equities(symbols=None, date=None, exchange=None, currency=None, limit=None, offset=None): Get equity metadata

    • Returns: dict with 'instruments' and 'count'
  • get_futures(symbols=None, start_date=None, end_date=None, exchange=None, currency=None, expiry_type='volume', limit=None, offset=None): Get futures contracts

    • Specific contracts: Databento symbology (e.g., ESM1, ESZ25)
    • Continuous contracts: Format ROOT.RULE.POSITION (e.g., ES.v.0)
    • expiry_type: Rollover type ('volume', 'expiry', 'last_trade', 'open_interest')
    • Returns: dict with 'instruments' and 'count'
  • get_options(symbols=None, chains=None, start_date=None, end_date=None, expiration_date=None, strike=None, option_type=None, exchange=None, currency=None, limit=None, offset=None, root=None): Get options contracts

    • Returns: dict with 'instruments' and 'count'
  • get_indices(symbols=None, date=None, exchange=None, currency=None, limit=None, offset=None): Get index metadata

    • Returns: dict with 'instruments' and 'count'
  • get_instruments(symbols=None, asset_class=None, start_date=None, end_date=None, exchange=None, currency=None, limit=None, offset=None): Generic method

    • Routes to asset-specific methods based on asset_class ('equity', 'futures', 'options', 'index')

Metadata Client

hd.Metadata(api_key=None, base_url=None)

Client for accessing HiveQ metadata API.

Methods:

  • get_datasets(): Get list of available datasets

    • Returns: dict with available datasets
  • get_schemas(dataset): Get schemas for specified datasets

    • dataset (str or list): Dataset name(s) or ["*"] for all datasets
    • Returns: dict with schema information
  • get_schema(schema, dataset): Get detailed info for a specific schema

    • schema (str): Schema name
    • dataset (str): Dataset name
    • Returns: dict with schema details
  • refresh_schema(): Refresh the schema cache

    • Returns: dict with refresh result

LiveStream Client

hd.LiveStream(host='localhost', port=8765, auto_reconnect=True, reconnect_interval=1.0, status_log_interval=30.0)

WebSocket-based client for streaming live market data.

Parameters:

  • host (str, optional): WebSocket server host (default: 'localhost')
  • port (int, optional): WebSocket server port (default: 8765)
  • auto_reconnect (bool, optional): Auto-reconnect on disconnect (default: True)
  • reconnect_interval (float, optional): Seconds between reconnection attempts (default: 1.0)
  • status_log_interval (float, optional): Seconds between status log messages (default: 30.0)

Methods:

  • async connect(): Connect to the WebSocket server
  • async disconnect(): Disconnect from the server
  • async subscribe(topic, keys, callback): Subscribe to a topic with key(s) and callback
  • async unsubscribe(topic, key, callback=None): Unsubscribe from a (topic, key) pair
  • async wait_until_disconnected(): Wait until the client is fully stopped
  • is_connected(): Check if currently connected (returns bool)

Advanced Usage

Context Manager Support

All clients support context managers for automatic cleanup:

import hiveq_data as hd

# Credentials auto-loaded from env
with hd.Historical() as historical:
    data = historical.get_data(
        dataset='HIVEQ_US_EQ',
        schema='bars_1s',
        symbols='AAPL',
        start='2025-08-01',
        end='2025-08-30'
    )
    print(data)
# Session automatically closed

Per-Client Credentials

Override auto-loaded credentials for specific clients:

import hiveq_data as hd

# Use .env credentials for most clients
prod_historical = hd.Historical()

# Use different credentials for specific client (e.g., local testing)
dev_historical = hd.Historical(
    api_key='dev_api_key',
    base_url='http://localhost:3000'
)

Override Base URL for Development

import hiveq_data as hd

# Override base URL while keeping API key from env
hd.configure(base_url='http://localhost:3000')

# All clients now use localhost
client = hd.Historical()

License

MIT License

On this page