Web-sockets tutorial
Introduction
kdb Insights services allow data to be streamed from a pipeline to a client application using web-sockets. A client application can access the web-sockets to populate charts in a graphing application.
KX Nexus
The KX Nexus repository will be sunsetted in the future. Nexus links on this page are provided as a temporary alternative to the KX Downloads Portal for existing users. The KX Downloads Portal is the preferred repository for all use cases and KX Nexus links will be removed once the KX Nexus repository is decommissioned.
This tutorial shows you how to kdb Insights to:
- Deploy the Stream Processor, Service Gateway and Reliable Transport in a Kubernetes cluster.
- Publish data, using a q publisher, to a Stream Processor pipeline which includes a streaming subscription writer.
- Connect a client application through an ingress to the external service on the Service Gateway, which acts as a scalable proxy for serving web-sockets to the client application.
More detailed information on each of these services are available here.
All the details of the client protocol are specified here.
The steps in the process are:
- Download the charts
- Create a namespace and secrets
- Install the charts
- Create the stream processor pipeline
- Connect to the web-socket
The following sections provide further details for the above steps.
Download the charts
The charts can be found on the KX Downloads Portal, or from the KX Nexus repository until Nexus is removed in the future, assuming the appropriate access has been granted.
-
Add the helm repo credentials:
$ helm repo add kx-insights https://portal.dl.kx.com/assets/helm/ --username **** --password **** "kx-insights" has been added to your repositories
Use the following command to access the helm repo credentials through KX Nexus instead:
$ helm repo add kx-insights https://nexus.dl.kx.com/repository/kx-insights-charts --username **** --password **** "kx-insights" has been added to your repositories
-
Search for the charts and determine the appropriate chart and app version:
$ helm search repo kx-insights/kxi-rt NAME CHART VERSION APP VERSION DESCRIPTION kx-insights/kxi-rt 1.8.0 1.8.0 A Helm chart for Kubernetes
-
Run the following commands to download the charts:
helm fetch kx-insights/kxi-rt --version 1.8.0 --untar helm fetch kx-insights/kxi-rt-q-pub --version 1.8.0 --untar helm fetch kx-insights/kxi-sp --version 1.8.0 --untar helm fetch kx-insights/sg-gateway --version 1.8.0 --untar
Create a namespace and secrets
For simplicity, create a dedicated namespace for this quickstart guide.
-
Create a namespace:
export NS_NAME="websock-demo" kubectl create namespace $NS_NAME
-
Create a docker secret:
kubectl -n $NS_NAME create secret docker-registry docker-reg-secret --docker-username=<username> --docker-password=<password> --docker-server=registry.dl.kx.com
To gain access to
registry.dl.kx.com
please contact KX at licadmin@kx.com -
Create a license secret:
See licensing to find out how to obtain a kdb Insights microservices license.
base64 -w0 <path to kc.lic> > license.txt kubectl create secret generic kxi-license --from-file=license=./license.txt -n $NS_NAME
Install the charts
Configuring entities with a global settings file
You might find it useful to have a global settings file to configure entities such as the kdb+ license file.
The details below show you how a kubernetes secret can be created and added to the relevant namespace, before subsequently being referenced in the helm chart deploy.
-
Create the global settings files using the following command:
cat <<EOF > global_settings.yaml global: packages: enabled: false image: repository: registry.dl.kx.com imagePullSecrets: - name: docker-reg-secret license: secretName: kxi-license asFile: false onDemand: true auth: enabled: false EOF
-
Create a global_settings file, which is needed for installing the publisher chart:
cat <<EOF > global_settings_pub.yaml stream: prefix: rt- EOF
These global settings files can then be used when installing the chart by using the
-f
argument. -
Create a global_settings file, which is needed for installing the gateway chart:
cat <<EOF > global_settings_sg.yaml replicaCount: 1 EOF
These global settings files can then be used when installing the chart by using the
-f
argument. -
Install the charts using the following commands. Ensure you have the global_settings yaml files:
# The helm install commands are in the following format # helm -n <kubernetes_namespace> install <release-name> <chart-name> -f <global_settings_file> helm -n $NS_NAME install rt kxi-rt -f global_settings.yaml helm -n $NS_NAME install sp kxi-sp -f global_settings.yaml # Note the use of both global_settings files in the next command to install the publisher helm -n $NS_NAME install sg sg-gateway -f global_settings.yaml -f global_settings_sg.yaml # Note the use of both global_settings files in the next command to install the publisher helm -n $NS_NAME install pub kxi-rt-q-pub -f global_settings.yaml -f global_settings_pub.yaml
Create the stream processor pipeline
This section details creating and installing a Stream Processor pipeline. This pipeline achieves the following:
-
Ingests raw market data sent by the q publisher
kxi-rt-q-pub
-
Derives and maintains the latest position for each instrument traded
-
Writes this derived data to a web-socket
Publish Frequency
The data derived in the stream processor pipeline is published to the WebSocket on a timer. The frequency at which the data is published is set in milliseconds using the publishFrequency value in the pipeline node. If not set, it defaults to 500 milliseconds.
The SP pipeline makes use of a set of Stream Processor APIs
These include readers, writers and operators. Further details on these can be found here.
-
Create a
spec.q
file that contains the pipeline specification:cat <<'EOF' > spec.q // define schema for data streaming into RT to filter map from trade:([]time:`timestamp$(); sym:`$(); exch:`$(); price:`float$(); size:`long$(); side:`$(); id:`long$()); position:([sym:`$()] time:`timestamp$(); pos:`long$()); .data.updPosition:{[op;md;x] res:$[98h=type x; raze .data.updPosition0[op;md]each x;99h=type x; .data.updPosition0[op;md;x];`] ;res} .data.updPosition0:{[op;md;x] sz:$[x[`side]=`buy; x`size; neg x`size]; old:$[99h = type .qsp.get[op; md];`sym xkey enlist .qsp.get[op; md];`sym xkey .qsp.get[op; md]]; new_sz:$[(99h<>type old) or null first exec pos from old where sym in x`sym; sz; sz+first exec pos from old where sym in x`sym]; new:`sym`time`pos!(x`sym;x`time;new_sz); upsert[`position] pos:cols[position]!(x`sym;x`time;new_sz); .qsp.set[op; md] 0!old,res:select by sym from enlist new; 0!res} streamA: .qsp.read.fromStream["trade";"mystream"] .qsp.map[.data.updPosition; .qsp.use enlist[`state]!enlist first 0!position] streamB: streamA .qsp.write.toSubscriber[`position;`sym; .qsp.use enlist[`publishFrequency]!enlist 100] .qsp.run streamB EOF
-
Install the pipeline:
kubectl -n $NS_NAME port-forward svc/sp-kxi-sp 5000:5000 &
Port-forwarding to communicate with the microservices
You might find it handy to use port-forwarding as shown above. Here, the port-forward is set up to run in the background, and you should get a response like
Forwarding from [::1]:5000 -> 5000
. Just pressEnter
to return to command prompt.Note
sp-kxi-sp
used in the last command is<release-name>-<chart-name>
from thehelm install <release-name> <chart-name>
that was used in the step for installing stream processor microservice chart.cat <<EOF > create_pipeline.sh curl -X POST localhost:5000/pipeline/create -d \ "\$(jq -n --arg spec "\$(cat < spec.q)" \ '{ name : "pipeline1", type : "spec", config : { content: \$spec }, settings : { minWorkers: "1", maxWorkers: "10" }, env : {"KXI_SP_WRITEBACK_STREAM": "mystream", "RT_TOPIC_PREFIX": "rt-", "RT_SINK": "mystream" } }' | jq -asR .)" EOF
chmod u+x create_pipeline.sh && ./create_pipeline.sh
You are using the Stream Processor control port 5000 to install this pipeline
Data now starts flowing from the publisher to the Stream Processor and can subsequently be accessed using web-sockets.
There are further details on interacting with the pipeline here. This includes how to check the status of the pipeline and teardown when you are finished.
Connect to the web-socket
-
Connect to the web-socket:
Note
Get the gateway pod name
GW_POD=$(kubectl -n $NS_NAME get pods | grep sg-gateway | awk {'print $1'})
Note
Portforward any local port (8082 in this example) to the 8080 port of gateway pod
kubectl -n $NS_NAME port-forward ${GW_POD} 8082:8080 &
Note
The suffix of the websocket end-point (
sp-pipeline1
) in the next command is the<release-name>-<pipeline-name>
of the stream processor microservice chart.wscat -c ws://localhost:8082/ws/v1/subscribe/sp-pipeline1
-
You can send commands to control the web-socket streaming behaviour. For example this command starts the streaming:
{"type":"subsnap", "id":1, "payload":{"topic":"position"}}
This subscribes to the contents of the position table which is derived in the Stream Processor pipeline. The Stream Processor will publish data updates to the client subscriber every 5 seconds.
Notes on running outside of Kubernetes
It is possible to introduce a web socket client using Docker. However, due to the absence of the Kubernetes control plane, the Service Gateway service needs to be told where to find the Stream Processor endpoints. This can be done using the SP_STREAM_HOST_LIST
environment variable. This is a comma separated list, where each entry is in the format stream_name:sp_hostname:sp_port
. The stream_name
part determines the suffix of the URL that you need to connect to.
For example:
SP_STREAM_HOST_LIST="pipeline-1:sphost-1:7000,pipeline-1:sphost-1:7000"
You can connect to the pipeline that streams from sphost-1
with:
wscat -c ws://localhost:8082/ws/v1/subscribe/pipeline1
You can connect to the pipeline that streams from sphost-2
with:
wscat -c ws://localhost:8082/ws/v1/subscribe/pipeline2
Note
This example assumes port forwarding is set up to point from port 8080 on the Service Gateway to port 8082 on localhost.