Skip to content

Web-sockets tutorial

Introduction

kdb Insights services allow data to be streamed from a pipeline to a client application using web-sockets. A client application can access the web-sockets to populate charts in a graphing application.

KX Nexus

The KX Nexus repository will be sunsetted in the future. Nexus links on this page are provided as a temporary alternative to the KX Downloads Portal for existing users. The KX Downloads Portal is the preferred repository for all use cases and KX Nexus links will be removed once the KX Nexus repository is decommissioned.

This tutorial shows you how to kdb Insights to:

  1. Deploy the Stream Processor, Service Gateway and Reliable Transport in a Kubernetes cluster.
  2. Publish data, using a q publisher, to a Stream Processor pipeline which includes a streaming subscription writer.
  3. Connect a client application through an ingress to the external service on the Service Gateway, which acts as a scalable proxy for serving web-sockets to the client application.

More detailed information on each of these services are available here.

All the details of the client protocol are specified here.

The steps in the process are:

  1. Download the charts
  2. Create a namespace and secrets
  3. Install the charts
  4. Create the stream processor pipeline
  5. Connect to the web-socket

The following sections provide further details for the above steps.

Download the charts

The charts can be found on the KX Downloads Portal, or from the KX Nexus repository until Nexus is removed in the future, assuming the appropriate access has been granted.

  1. Add the helm repo credentials:

    $ helm repo add kx-insights https://portal.dl.kx.com/assets/helm/ --username **** --password ****
    
    "kx-insights" has been added to your repositories
    

    Use the following command to access the helm repo credentials through KX Nexus instead:

    $ helm repo add kx-insights https://nexus.dl.kx.com/repository/kx-insights-charts --username **** --password ****
    
    "kx-insights" has been added to your repositories
    
  2. Search for the charts and determine the appropriate chart and app version:

    $ helm search repo kx-insights/kxi-rt
    NAME                            CHART VERSION   APP VERSION     DESCRIPTION
    kx-insights/kxi-rt              1.8.0           1.8.0           A Helm chart for Kubernetes
    
  3. Run the following commands to download the charts:

    helm fetch kx-insights/kxi-rt          --version  1.8.0 --untar
    helm fetch kx-insights/kxi-rt-q-pub    --version  1.8.0 --untar
    helm fetch kx-insights/kxi-sp          --version  1.8.0 --untar
    helm fetch kx-insights/sg-gateway      --version  1.8.0 --untar
    

Create a namespace and secrets

For simplicity, create a dedicated namespace for this quickstart guide.

  1. Create a namespace:

    export NS_NAME="websock-demo"
    kubectl create namespace $NS_NAME
    
  2. Create a docker secret:

    kubectl -n $NS_NAME  create secret docker-registry docker-reg-secret --docker-username=<username> --docker-password=<password> --docker-server=registry.dl.kx.com
    

    To gain access to registry.dl.kx.com please contact KX at licadmin@kx.com

  3. Create a license secret:

    See licensing to find out how to obtain a kdb Insights microservices license.

    base64 -w0 <path to kc.lic> > license.txt
    kubectl create secret generic kxi-license --from-file=license=./license.txt -n $NS_NAME
    

Install the charts

Configuring entities with a global settings file

You might find it useful to have a global settings file to configure entities such as the kdb+ license file.

The details below show you how a kubernetes secret can be created and added to the relevant namespace, before subsequently being referenced in the helm chart deploy.

  1. Create the global settings files using the following command:

    cat <<EOF > global_settings.yaml
    global:
        packages:
            enabled: false
        image:
            repository: registry.dl.kx.com
        imagePullSecrets:
        - name: docker-reg-secret
        license:
            secretName: kxi-license
            asFile: false
            onDemand: true
    
    auth:
        enabled: false
    EOF
    
  2. Create a global_settings file, which is needed for installing the publisher chart:

    cat <<EOF > global_settings_pub.yaml
    stream:
        prefix: rt-
    EOF
    

    These global settings files can then be used when installing the chart by using the -f argument.

  3. Create a global_settings file, which is needed for installing the gateway chart:

    cat <<EOF > global_settings_sg.yaml
    replicaCount: 1
    EOF
    

    These global settings files can then be used when installing the chart by using the -f argument.

  4. Install the charts using the following commands. Ensure you have the global_settings yaml files:

    # The helm install commands are in the following format
    # helm -n <kubernetes_namespace> install <release-name> <chart-name> -f <global_settings_file>
    helm -n $NS_NAME install rt kxi-rt -f global_settings.yaml
    helm -n $NS_NAME install sp kxi-sp -f global_settings.yaml
    # Note the use of both global_settings files in the next command to install the publisher
    helm -n $NS_NAME install sg sg-gateway -f global_settings.yaml -f global_settings_sg.yaml
    # Note the use of both global_settings files in the next command to install the publisher
    helm -n $NS_NAME install pub kxi-rt-q-pub -f global_settings.yaml -f global_settings_pub.yaml
    

Create the stream processor pipeline

This section details creating and installing a Stream Processor pipeline. This pipeline achieves the following:

  1. Ingests raw market data sent by the q publisher kxi-rt-q-pub

  2. Derives and maintains the latest position for each instrument traded

  3. Writes this derived data to a web-socket

Publish Frequency

The data derived in the stream processor pipeline is published to the WebSocket on a timer. The frequency at which the data is published is set in milliseconds using the publishFrequency value in the pipeline node. If not set, it defaults to 500 milliseconds.

The SP pipeline makes use of a set of Stream Processor APIs

These include readers, writers and operators. Further details on these can be found here.

  1. Create a spec.q file that contains the pipeline specification:

    cat <<'EOF' > spec.q
    // define schema for data streaming into RT to filter map from
    trade:([]time:`timestamp$(); sym:`$(); exch:`$(); price:`float$(); size:`long$(); side:`$(); id:`long$());
    position:([sym:`$()] time:`timestamp$(); pos:`long$());
    
    .data.updPosition:{[op;md;x]
      res:$[98h=type x; raze .data.updPosition0[op;md]each x;99h=type x; .data.updPosition0[op;md;x];`]
      ;res}
    
    .data.updPosition0:{[op;md;x]
      sz:$[x[`side]=`buy; x`size; neg x`size];
      old:$[99h = type .qsp.get[op; md];`sym xkey enlist .qsp.get[op; md];`sym xkey .qsp.get[op; md]];
      new_sz:$[(99h<>type old) or null first exec pos from old where sym in x`sym; sz; sz+first exec pos from old where sym in x`sym];
      new:`sym`time`pos!(x`sym;x`time;new_sz);
      upsert[`position] pos:cols[position]!(x`sym;x`time;new_sz);
      .qsp.set[op; md] 0!old,res:select by sym from enlist new;
      0!res}
    
    streamA: .qsp.read.fromStream["trade";"mystream"] .qsp.map[.data.updPosition; .qsp.use enlist[`state]!enlist first 0!position]
    streamB: streamA .qsp.write.toSubscriber[`position;`sym; .qsp.use enlist[`publishFrequency]!enlist 100]
    .qsp.run streamB
    EOF
    
  2. Install the pipeline:

    kubectl -n $NS_NAME port-forward svc/sp-kxi-sp 5000:5000 &
    

    Port-forwarding to communicate with the microservices

    You might find it handy to use port-forwarding as shown above. Here, the port-forward is set up to run in the background, and you should get a response like Forwarding from [::1]:5000 -> 5000. Just press Enter to return to command prompt.

    Note

    sp-kxi-sp used in the last command is <release-name>-<chart-name> from the helm install <release-name> <chart-name> that was used in the step for installing stream processor microservice chart.

    cat <<EOF > create_pipeline.sh
    curl -X POST localhost:5000/pipeline/create -d \
            "\$(jq -n --arg spec "\$(cat < spec.q)" \
            '{
                name     : "pipeline1",
                type     : "spec",
                config   : { content: \$spec },
                settings : { minWorkers: "1", maxWorkers: "10" },
                env      : {"KXI_SP_WRITEBACK_STREAM": "mystream", "RT_TOPIC_PREFIX": "rt-", "RT_SINK": "mystream" }
            }' | jq -asR .)"
    EOF
    
    chmod u+x create_pipeline.sh && ./create_pipeline.sh
    

    You are using the Stream Processor control port 5000 to install this pipeline

Data now starts flowing from the publisher to the Stream Processor and can subsequently be accessed using web-sockets.

There are further details on interacting with the pipeline here. This includes how to check the status of the pipeline and teardown when you are finished.

Connect to the web-socket

  1. Connect to the web-socket:

    Note

    Get the gateway pod name

    GW_POD=$(kubectl -n $NS_NAME get pods | grep sg-gateway | awk {'print $1'})
    

    Note

    Portforward any local port (8082 in this example) to the 8080 port of gateway pod

    kubectl -n $NS_NAME port-forward ${GW_POD} 8082:8080 &
    

    Note

    The suffix of the websocket end-point (sp-pipeline1) in the next command is the <release-name>-<pipeline-name> of the stream processor microservice chart.

    wscat -c ws://localhost:8082/ws/v1/subscribe/sp-pipeline1
    
  2. You can send commands to control the web-socket streaming behaviour. For example this command starts the streaming:

    {"type":"subsnap", "id":1, "payload":{"topic":"position"}}
    

    This subscribes to the contents of the position table which is derived in the Stream Processor pipeline. The Stream Processor will publish data updates to the client subscriber every 5 seconds.

Notes on running outside of Kubernetes

It is possible to introduce a web socket client using Docker. However, due to the absence of the Kubernetes control plane, the Service Gateway service needs to be told where to find the Stream Processor endpoints. This can be done using the SP_STREAM_HOST_LIST environment variable. This is a comma separated list, where each entry is in the format stream_name:sp_hostname:sp_port. The stream_name part determines the suffix of the URL that you need to connect to.

For example:

SP_STREAM_HOST_LIST="pipeline-1:sphost-1:7000,pipeline-1:sphost-1:7000"

You can connect to the pipeline that streams from sphost-1 with:

wscat -c ws://localhost:8082/ws/v1/subscribe/pipeline1

You can connect to the pipeline that streams from sphost-2 with:

wscat -c ws://localhost:8082/ws/v1/subscribe/pipeline2

Note

This example assumes port forwarding is set up to point from port 8080 on the Service Gateway to port 8082 on localhost.