Skip to content

Querying

kxi.query.Query provides an authenticated connection to kdb Insights Enterprise for running SQL queries, getData API calls, UDAs and metadata inspection.

Creating a connection

Pass credentials explicitly or rely on environment variables:

import kxi.query

# From environment variables (INSIGHTS_URL, INSIGHTS_CLIENT_ID, INSIGHTS_CLIENT_SECRET)
conn = kxi.query.Query()

# Explicit credentials
conn = kxi.query.Query(
    host="https://my-insights.kx.com",
    client_id="my-client-id",
    client_secret="my-client-secret",
)

# Return JSON instead of pykx objects
conn = kxi.query.Query(data_format="application/json")

Connection parameters:

Parameter Type Default Description
host str $INSIGHTS_URL Base URL of the kdb Insights Enterprise instance
client_id str $INSIGHTS_CLIENT_ID Keycloak client ID
client_secret str $INSIGHTS_CLIENT_SECRET Keycloak client secret
realm str $INSIGHTS_REALM or insights Keycloak realm
data_format str application/octet-stream Response format; use application/json without pykx
timeout int 20 Token request timeout in seconds

sql

Run a SQL query against the kdb Insights query environment.

# Basic SELECT
result = conn.sql("SELECT * FROM my_table LIMIT 100")

# With a WHERE clause
result = conn.sql("SELECT sym, price FROM trades WHERE side = 'BID'")

# Aggregation
result = conn.sql("SELECT sym, AVG(price) AS avg_price FROM trades GROUP BY sym")

Returns a pykx.Table:

result = conn.sql("SELECT * FROM my_table LIMIT 5")
# pykx.Table(pykx.q('...'))
df = result.pd()   # convert to pandas DataFrame

Returns a dict with a payload key containing a list of row dicts:

conn = kxi.query.Query(data_format="application/json")
result = conn.sql("SELECT * FROM my_table LIMIT 5")
rows = result["payload"]


get_data

get_data provides structured access to kdb+ timeseries tables in kdb Insights with built-in support for time ranges, filters, grouping, and aggregations.

Basic usage

result = conn.get_data("my_table")

Time range

result = conn.get_data(
    "my_table",
    start_time="2024.01.01D09:00:00.000000000",
    end_time="2024.01.01D17:00:00.000000000",
)

Filtering

Filters are triadic lists: [function, column, value].

result = conn.get_data(
    "my_table",
    filter=[
        ["=", "sym", "AAPL"],
        ["within", "price", [100.0, 200.0]],
    ],
)

Grouping and aggregations

result = conn.get_data(
    "my_table",
    group_by=["sym"],
    aggregations=[
        ["avg_price", "avg", "price"],
        ["total_qty", "sum", "qty"],
    ],
)

All parameters

Parameter Type Description
table str Table name to query
start_time str Inclusive start time (q timestamp format)
end_time str Exclusive end time (q timestamp format)
input_timezone str Timezone for start/end times. Default: UTC
output_timezone str Timezone for returned timestamps. Default: UTC
filter list Filter conditions as [['fn', 'col', value], ...]
group_by list[str] Columns to group aggregations by
aggregations list Columns to return, or [['name', 'agg', 'col'], ...]
fill str Null handling: forward or zero
temporality str Time interpretation: slice or snapshot
slice list[str] Intra-day time range used with temporality='slice'
sort_columns list[str] Columns to sort results by
labels dict[str, str] Assembly label filters for DAP selection

get_meta

Retrieve metadata about assemblies, schemas, and available APIs on the deployment.

meta = conn.get_meta()

assemblies = meta["assembly"]   # deployed assemblies
schemas = meta["schema"]        # table schemas with column types
apis = meta["api"]              # available API endpoints
daps = meta["dap"]              # data access process instances

Error handling

Query errors raise typed exceptions from kxi.exceptions:

from kxi.exceptions import ClientQueryError, UnauthorizedError

try:
    result = conn.sql("SELECT * FROM non_existent_table")
except ClientQueryError as e:
    print(f"Query error: {e.info.get('ai')}")
except UnauthorizedError:
    print("Authentication failed — check client credentials")
Back to top