Querying¶
kxi.query.Query provides an authenticated connection to kdb Insights Enterprise for running SQL queries, getData API calls, UDAs and metadata inspection.
Creating a connection¶
Pass credentials explicitly or rely on environment variables:
import kxi.query
# From environment variables (INSIGHTS_URL, INSIGHTS_CLIENT_ID, INSIGHTS_CLIENT_SECRET)
conn = kxi.query.Query()
# Explicit credentials
conn = kxi.query.Query(
host="https://my-insights.kx.com",
client_id="my-client-id",
client_secret="my-client-secret",
)
# Return JSON instead of pykx objects
conn = kxi.query.Query(data_format="application/json")
Connection parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
str |
$INSIGHTS_URL |
Base URL of the kdb Insights Enterprise instance |
client_id |
str |
$INSIGHTS_CLIENT_ID |
Keycloak client ID |
client_secret |
str |
$INSIGHTS_CLIENT_SECRET |
Keycloak client secret |
realm |
str |
$INSIGHTS_REALM or insights |
Keycloak realm |
data_format |
str |
application/octet-stream |
Response format; use application/json without pykx |
timeout |
int |
20 |
Token request timeout in seconds |
sql¶
Run a SQL query against the kdb Insights query environment.
# Basic SELECT
result = conn.sql("SELECT * FROM my_table LIMIT 100")
# With a WHERE clause
result = conn.sql("SELECT sym, price FROM trades WHERE side = 'BID'")
# Aggregation
result = conn.sql("SELECT sym, AVG(price) AS avg_price FROM trades GROUP BY sym")
Returns a pykx.Table:
result = conn.sql("SELECT * FROM my_table LIMIT 5")
# pykx.Table(pykx.q('...'))
df = result.pd() # convert to pandas DataFrame
Returns a dict with a payload key containing a list of row dicts:
conn = kxi.query.Query(data_format="application/json")
result = conn.sql("SELECT * FROM my_table LIMIT 5")
rows = result["payload"]
get_data¶
get_data provides structured access to kdb+ timeseries tables in kdb Insights with built-in support for time ranges, filters, grouping, and aggregations.
Basic usage¶
result = conn.get_data("my_table")
Time range¶
result = conn.get_data(
"my_table",
start_time="2024.01.01D09:00:00.000000000",
end_time="2024.01.01D17:00:00.000000000",
)
Filtering¶
Filters are triadic lists: [function, column, value].
result = conn.get_data(
"my_table",
filter=[
["=", "sym", "AAPL"],
["within", "price", [100.0, 200.0]],
],
)
Grouping and aggregations¶
result = conn.get_data(
"my_table",
group_by=["sym"],
aggregations=[
["avg_price", "avg", "price"],
["total_qty", "sum", "qty"],
],
)
All parameters¶
| Parameter | Type | Description |
|---|---|---|
table |
str |
Table name to query |
start_time |
str |
Inclusive start time (q timestamp format) |
end_time |
str |
Exclusive end time (q timestamp format) |
input_timezone |
str |
Timezone for start/end times. Default: UTC |
output_timezone |
str |
Timezone for returned timestamps. Default: UTC |
filter |
list |
Filter conditions as [['fn', 'col', value], ...] |
group_by |
list[str] |
Columns to group aggregations by |
aggregations |
list |
Columns to return, or [['name', 'agg', 'col'], ...] |
fill |
str |
Null handling: forward or zero |
temporality |
str |
Time interpretation: slice or snapshot |
slice |
list[str] |
Intra-day time range used with temporality='slice' |
sort_columns |
list[str] |
Columns to sort results by |
labels |
dict[str, str] |
Assembly label filters for DAP selection |
get_meta¶
Retrieve metadata about assemblies, schemas, and available APIs on the deployment.
meta = conn.get_meta()
assemblies = meta["assembly"] # deployed assemblies
schemas = meta["schema"] # table schemas with column types
apis = meta["api"] # available API endpoints
daps = meta["dap"] # data access process instances
Error handling¶
Query errors raise typed exceptions from kxi.exceptions:
from kxi.exceptions import ClientQueryError, UnauthorizedError
try:
result = conn.sql("SELECT * FROM non_existent_table")
except ClientQueryError as e:
print(f"Query error: {e.info.get('ai')}")
except UnauthorizedError:
print("Authentication failed — check client credentials")