Quickstart
Introduction
This quickstart page guides you through streaming data from kdb Insights Enterprise over WebSockets. If you are using the kdb Insights SDK, refer to the WebSockets guide.
For more details on how the protocol works, refer to the protocol documentation. All examples below use JSON. For information on using QIPC instead, refer to the protocol documentation.
Prerequisites
- You have access to a running version of kdb Insights Enterprise. For more information, refer to the installation guide.
- You have installed the latest kdb Insights Command Line following the installation guide.
Quickstart
Authentication and Authorization
Before pushing a package or publishing data, use the KXI CLI to create users, clients, and assign roles. For more information on the CLI, refer to the kdb Insights Command Line documentation.
Administrator password
The administrator password below is defined during the installation of your kdb Insights Enterprise deployment. For more information, refer to the Administration Passwords documentation.
# Create a user
kxi user create user@domain.com --password <myPassword> --admin-password <adminPassword> --not-temporary
INSIGHTS_ROLES="insights.role.maintainer,insights.client.create,insights.client.delete,insights.package.install,insights.package.delete,insights.admin.package.system,insights.package.list,insights.package.update,insights.package.upload,insights.query.stream"
kxi user assign-roles user@domain.com --roles $INSIGHTS_ROLES --admin-password <adminPassword>
## Create a client
CLIENT_ID=svc1
INSIGHTS_ROLES="insights.client.create,insights.client.delete,insights.package.install,insights.package.delete,insights.admin.package.system,insights.package.list,insights.package.update,insights.package.upload,insights.role.developer,insights.query.stream"
kxi user create-client $CLIENT_ID --admin-password <adminPassword>
kxi user assign-roles service-account-$CLIENT_ID --roles $INSIGHTS_ROLES --admin-password <adminPassword>
CLIENT_SECRET=$(kxi user get-client-secret $CLIENT_ID --admin-password <adminPassword>)
Package Deployment
Follow the steps below to download and deploy the sample package kxi-ws-1.14.0.kxi
, available from the KX Download Portal.
Step 1 - Download the package
Run the following command to download the package:
version=1.14.0
curl -L https://portal.dl.kx.com/assets/raw/package-samples/kxi-ws/$version/kxi-ws-$version.kxi -o kxi-ws-$version.kxi
Step 2 - Push the package to kdb Insights
Push the downloaded package to kdb Insights Enterprise using the following command:
kxi pm push kxi-ws-1.14.0.kxi --client-id $CLIENT_ID --client-secret $CLIENT_SECRET
Step 3 - Deploy the package
Deploy the package to kdb Insights Enterprise:
kxi pm deploy kxi-ws/1.14.0 --client-id $CLIENT_ID --client-secret $CLIENT_SECRET
WebSocket connection and subscription
Step 1 - Establish connection
Use the Linux utility wscat
to establish a WebSocket client connection.
PACKAGE_NAME=kxi-ws
PIPELINE_NAME=wsstream
HOSTNAME=insights.kx.com ## replace with your own hostname
URL=https://$HOSTNAME/servicegateway/ws/v1/subscribe/${PACKAGE_NAME}-${PIPELINE_NAME}
# Login to kx insights with a service account
kxi --serviceaccount-id=${CLIENT_ID} --serviceaccount-secret=${CLIENT_SECRET} auth login --serviceaccount
# Obtain the access token
TOKEN=$(kxi auth print-token)
# Make the websockets connection
wscat -H "Sec-WebSocket-Protocol: Bearer $TOKEN" -c $URL
Step 2 - Send subscription
You can subscribe to data over the WebSocket client using one of three methods. Refer to the examples of each method below.
For more information on subscription requests, refer to the client protocol documentation.
-
To get a single snapshot of the current data in the table
"trace"
, set thetype
tosnap
:{"type":"snap","payload":{"topic":"trace"},"id":1}
The following response is returned:
{"payload":{"data":{"readTS":["2021-01-01 00:00:00.0","2021-01-01 00:00:01.000000001","2021-01-01 00:00:02.0","2021-01-01 00:00:03.0","2021-01-01 00:00:04.0","2021-01-01 00:00:05.0","2021-01-01 00:00:06.0","2021-01-01 00:00:07.0","2021-01-01 00:00:08.0","2021-01-01 00:00:09.0"],"valFloat":[72.0,81.0,81.0,90.0,34.0,50.0,51.0,46.0,45.0,59.0],"alarm":[0,0,1,0,1,0,0,1,1,0],"qual":[1,1,1,0,0,1,1,0,1,0],"updateTS":["2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492"],"captureTS":["2021-01-01 00:00:00.0","2021-01-01 00:00:01.000000001","2021-01-01 00:00:02.0","2021-01-01 00:00:03.0","2021-01-01 00:00:04.0","2021-01-01 00:00:05.0","2021-01-01 00:00:06.0","2021-01-01 00:00:07.0","2021-01-01 00:00:08.0","2021-01-01 00:00:09.0"],"sensorID":[1,1,1,1,1,1,1,1,1,1]}},"id":1,"type":"snapped"}
-
To subscribe to incremental refreshes of the data without receiving a snapshot, set the
type
tosubscribe
:You receive a "subscribed" response, followed by the "update" messages:{"type":"subscribe", "id":2, "payload":{"topic":"trace"}}
{"payload":{"subscription":"3f108d8c-b84b-4b2c-a13c-7906ff3037b5"},"id":2,"type":"subscribed"} {"payload":{"data":{"readTS":["2021-01-01 00:00:00.0"],"valFloat":[72.0],"alarm":[0],"qual":[1],"updateTS":["2024-08-07 13:52:03.948521052"],"captureTS":["2021-01-01 00:00:00.0"],"sensorID":[1]},"subscription":"3f108d8c-b84b-4b2c-a13c-7906ff3037b5"},"id":2,"type":"update"}
-
The
subsnap
message type combines both thesnap
andsubscribe
messages. You receive a snapshot of the data, followed by incremental updates:{"type":"subsnap", "id":3, "payload":{"topic":"trace"}}
After sending this, you receive a snapshot of the data, followed by the updates:
{"payload":{"data":{"readTS":["2021-01-01 00:00:00.0","2021-01-01 00:00:01.000000001","2021-01-01 00:00:02.0","2021-01-01 00:00:03.0","2021-01-01 00:00:04.0","2021-01-01 00:00:05.0","2021-01-01 00:00:06.0","2021-01-01 00:00:07.0","2021-01-01 00:00:08.0","2021-01-01 00:00:09.0"],"valFloat":[72.0,81.0,81.0,90.0,34.0,50.0,51.0,46.0,45.0,59.0],"alarm":[0,0,1,0,1,0,0,1,1,0],"qual":[1,1,1,0,0,1,1,0,1,0],"updateTS":["2024-08-07 13:52:03.948521052","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492","2024-08-07 13:30:32.997582492"],"captureTS":["2021-01-01 00:00:00.0","2021-01-01 00:00:01.000000001","2021-01-01 00:00:02.0","2021-01-01 00:00:03.0","2021-01-01 00:00:04.0","2021-01-01 00:00:05.0","2021-01-01 00:00:06.0","2021-01-01 00:00:07.0","2021-01-01 00:00:08.0","2021-01-01 00:00:09.0"],"sensorID":[1,1,1,1,1,1,1,1,1,1]},"subscription":"187ea364-af48-4d83-9f2a-a0db55ab6de8"},"id":3,"type":"subsnapped"} {"payload":{"data":{"readTS":["2021-01-01 00:00:00.0"],"valFloat":[72.0],"alarm":[0],"qual":[1],"updateTS":["2024-08-07 13:53:26.878540188"],"captureTS":["2021-01-01 00:00:00.0"],"sensorID":[1]},"subscription":"187ea364-af48-4d83-9f2a-a0db55ab6de8"},"id":3,"type":"update"}
To unsubscribe from a subscription or subsnap, send an unsubscribe message using the subscription UUID:
{"type":"unsubscribe", "id":4, "payload":{"subscription":"187ea364-af48-4d83-9f2a-a0db55ab6de8"}}
The following response is returned:
{"payload":{"subscription":"187ea364-af48-4d83-9f2a-a0db55ab6de8"},"id":4,"type":"unsubscribed"}
Message ID
The ID in the above messages is initially set by the client. Any messages sent from the Service Gateway to the client include the ID of the original message they are responding to. This number must be unique for every message sent from a given client to the Service Gateway and must be incremented with each message sent.
Clearing the cache
While the pipeline is running, data in the stream cache can become stale, meaning some rows may no longer receive updates. In such cases, it's often necessary to remove these records; for example, when entries have expired or, during backtesting, when you need to clear the entire cache before each new run.
In the above examples you can subscribe to the trace
table and receive a snapshot of the data, but the wsstream
pipeline cache is maintained indefinitely and you cannot interact with it. For example:
{"type":"subsnap","id":3, "payload":{"topic":"trace"}}
The kxi-ws
package also contains:
-
The
wsclear
pipeline which, when deployed, allows you to issue a POST request to theclearCache
endpoint.curl --header "Authorization: Bearer $TOKEN" --data {} https://$HOSTNAME/streamprocessor/upload/clearCache
Note
The
$TOKEN
must be permissioned forinsights.pipeline.upload
Otherwise you receive an error:
{"code":403,"message":"Token has insufficient permissions, missing insights.pipeline.upload"}
For more details on how the cacheOperation
metadata key works, refer to the stream processor documentation.