Web-sockets tutorial
Introduction
kdb Insights services allow data to be streamed from a pipeline to a client application using web-sockets. A client application can access the web-sockets to populate charts in a graphing application.
This tutorial shows you how to kdb Insights to:
- Deploy the Stream Processor, Service Gateway and Reliable Transport in a Kubernetes cluster.
- Publish data, using a q publisher, to a Stream Processor pipeline which includes a streaming subscription writer.
- Connect a client application through an ingress to the external service on the Service Gateway, which acts as a scalable proxy for serving web-sockets to the client application.
More detailed information on each of these services are available here.
All the details of the client protocol are specified here.
The steps in the process are:
- Download the charts
- Create a namespace and secrets
- Install the charts
- Create the stream processor pipeline
- Connect to the web-socket
The following sections provide further details for the above steps.
Download the charts
The charts can be found on an external registry, assuming the appropriate access has been granted.
-
Add the helm repo credentials:
$ helm repo add kx-insights https://nexus.dl.kx.com/repository/kx-insights-charts --username **** --password **** "kx-insights" has been added to your repositories
-
Search for the charts and determine the appropriate chart and app version:
$ helm search repo kx-insights/kxi-rt NAME CHART VERSION APP VERSION DESCRIPTION kx-insights/kxi-rt 1.8.0 1.8.0 A Helm chart for Kubernetes
-
Run the following commands to download the charts:
helm fetch kx-insights/kxi-rt --version 1.8.0 --untar helm fetch kx-insights/kxi-rt-q-pub --version 1.8.0 --untar helm fetch kx-insights/kxi-sp --version 1.8.0 --untar helm fetch kx-insights/sg-gateway --version 1.8.0 --untar
Create a namespace and secrets
For simplicity, create a dedicated namespace for this quickstart guide.
-
Create a namespace:
export NS_NAME="websock-demo" kubectl create namespace $NS_NAME
-
Create a docker secret:
kubectl -n $NS_NAME create secret docker-registry docker-reg-secret --docker-username=<username> --docker-password=<password> --docker-server=registry.dl.kx.com
To gain access to
registry.dl.kx.com
please contact KX at licadmin@kx.com -
Create a license secret:
See licensing to find out how to obtain a kdb Insights microservices license.
base64 -w0 <path to kc.lic> > license.txt kubectl create secret generic kxi-license --from-file=license=./license.txt -n $NS_NAME
Install the charts
Configuring entities with a global settings file
You might find it useful to have a global settings file to configure entities such as the kdb+ license file.
The details below show you how a kubernetes secret can be created and added to the relevant namespace, before subsequently being referenced in the helm chart deploy.
-
Create the global settings files using the following command:
cat <<EOF > global_settings.yaml global: packages: enabled: false image: repository: registry.dl.kx.com imagePullSecrets: - name: docker-reg-secret license: secretName: kxi-license asFile: false onDemand: true auth: enabled: false EOF
-
Create the global_settings file needed for installing the publisher chart:
cat <<EOF > global_settings_pub.yaml stream: prefix: rt- EOF
These global settings files can then be used when installing the chart by using the
-f
argument. -
Create the global_settings file needed for installing the gateway chart:
cat <<EOF > global_settings_sg.yaml replicaCount: 1 EOF
These global settings files can be used when installing the chart by using the
-f
argument. -
Install the charts using the following commands. Ensure you have the global_settings yaml files:
# The helm install commands are in the following format # helm -n <kubernetes_namespace> install <release-name> <chart-name> -f <global_settings_file> helm -n $NS_NAME install rt kxi-rt -f global_settings.yaml helm -n $NS_NAME install sp kxi-sp -f global_settings.yaml # Note the use of both global_settings files in the next command to install the publisher helm -n $NS_NAME install sg sg-gateway -f global_settings.yaml -f global_settings_sg.yaml # Note the use of both global_settings files in the next command to install the publisher helm -n $NS_NAME install pub kxi-rt-q-pub -f global_settings.yaml -f global_settings_pub.yaml
Create the stream processor pipeline
This section details creating and installing a Stream Processor pipeline. This pipeline achieves the following:
- ingests raw market data sent by the q publisher kxi-rt-q-pub
,
- derives and maintains the latest position for each instrument traded, and
- writes this derived data to a web-socket.
The SP pipeline makes use of a set of Stream Processor APIs
These include readers, writers and operators. Further details on these can be found here.
-
Create a
spec.q
file that contains the pipeline specification:cat <<'EOF' > spec.q // define schema for data streaming into RT to filter map from trade:([]time:`timestamp$(); sym:`$(); exch:`$(); price:`float$(); size:`long$(); side:`$(); id:`long$()); position:([sym:`$()] time:`timestamp$(); pos:`long$()); .data.updPosition:{[op;md;x] res:$[98h=type x; raze .data.updPosition0[op;md]each x;99h=type x; .data.updPosition0[op;md;x];`] ;res} .data.updPosition0:{[op;md;x] sz:$[x[`side]=`buy; x`size; neg x`size]; old:$[99h = type .qsp.get[op; md];`sym xkey enlist .qsp.get[op; md];`sym xkey .qsp.get[op; md]]; new_sz:$[(99h<>type old) or null first exec pos from old where sym in x`sym; sz; sz+first exec pos from old where sym in x`sym]; new:`sym`time`pos!(x`sym;x`time;new_sz); upsert[`position] pos:cols[position]!(x`sym;x`time;new_sz); .qsp.set[op; md] 0!old,res:select by sym from enlist new; 0!res} streamA: .qsp.read.fromStream["trade";"mystream"] .qsp.map[.data.updPosition; .qsp.use enlist[`state]!enlist first 0!position] streamB: streamA .qsp.write.toSubscriber[`position;`sym] .qsp.run streamB EOF
-
Install the pipeline:
kubectl -n $NS_NAME port-forward svc/sp-kxi-sp 5000:5000 &
Port-forwarding to communicate with the microservices
You might find it handy to use port-forwarding as shown above. Here, the port-foward is set up to run in the background, and you should get a response like
Forwarding from [::1]:5000 -> 5000
. Just pressEnter
to return to command prompt.Note
sp-kxi-sp
used in the last command is<release-name>-<chart-name>
from thehelm install <release-name> <chart-name>
that was used in the step for installing stream processor microservice chart.cat <<EOF > create_pipeline.sh curl -X POST localhost:5000/pipeline/create -d \ "\$(jq -n --arg spec "\$(cat < spec.q)" \ '{ name : "pipeline1", type : "spec", config : { content: \$spec }, settings : { minWorkers: "1", maxWorkers: "10" }, env : {"KXI_SP_WRITEBACK_STREAM": "mystream", "RT_TOPIC_PREFIX": "rt-", "RT_SINK": "mystream" } }' | jq -asR .)" EOF
chmod u+x create_pipeline.sh && ./create_pipeline.sh
You are using the Stream Processor control port 5000 to install this pipeline
Data now starts flowing from the publisher to the Stream Processor and can subsequently be accessed using web-sockets.
There are further details on interacting with the pipeline here. This includes how to check the status of the pipeline and teardown when you are finished.
Connect to the web-socket
-
Connect to the web-socket:
Note
Get the gateway pod name
GW_POD=$(kubectl -n $NS_NAME get pods | grep sg-gateway | awk {'print $1'})
Note
Portforward any local port (8082 in this example) to the 8080 port of gateway pod
kubectl -n $NS_NAME port-forward ${GW_POD} 8082:8080 &
Note
The suffix of the websocket end-point (
sp-pipeline1
) in the next command is the<release-name>-<pipeline-name>
of the stream processor microservice chart.wscat -c ws://localhost:8082/ws/v1/subscribe/sp-pipeline1
-
You can send commands to control the web-socket streaming behaviour. For example this command starts the streaming:
{"type":"subsnap", "id":1, "payload":{"topic":"position"}}
This subscribes to the contents of the position table which is derived in the Stream Processor pipeline. The Stream Processor will publish data updates to the client subscriber every 5 seconds.