Skip to content

Kafka stream ML model application

Fitting and applying a clustering model on a Kafka stream

Motivation

The purpose of this example is provide a user with an example workflow showing an online machine learning model fit on the first N records within a stream and subsequently is using this model for prediction and online updates. In this example we are making use of an streaming implementation of K-Means clustering to filter out one of the generated clusters for persistance to the database.

Example

This example follows closely the Kafka Ingest example provided. This example walks through an example use of functionality provided with the SP ML Insights integration by:

  1. Deploying a sequential K-Means model to be fit on the first 1000 records provided
  2. Applying the fit model to subsequent data to make predictions
  3. Filter the stream based on these predictions and only publish data that meets specified conditions.

This is achieved through filtering the incoming Kafka stream for the trades topic produced using the example Producer. This stream is decoded and windowed before the above analysis is applied.

To run an ingestion pipeline using the KX Insights platform, first deploy the base system. Next, download the kafka_assembly.yaml assembly by following the download instructions. This assembly file will need to be modified to deploy a q and Python pipeline containing ML functionality as follows:

The streaming program to fit a model on a Kafka stream should be defined as follows through updating the sp section of kafka_assembly.yaml

sp:
  description: Kafka Model Clustering with SP
  pipelines:
    kafka-transform:
      env:
       - name: KXI_SP_KAFKA_BROKERS
         value: kafka:9092
      destination: south
      minWorkers: 1
      maxWorkers: 1
      workerThreads: 1
      base: "q-ml"
      spec: |-
        .qsp.run
          .qsp.read.fromKafka["trades"]
          .qsp.decode.json[]  
          .qsp.map[{enlist `time`sym`bid`ask!"PSff"$x}]
          .qsp.window.timer[00:00:05]
          .qsp.ml.sequentialKMeans[
            `bid`ask;
            .qsp.use ``bufferSize!(`;1000)]
          .qsp.sql["select x from $ where cluster=0"]
          .qsp.map[{select time,sym,bid,ask from x where cluster=0}]
          .qsp.write.toStream[`trades]

The streaming program to fit a model on the Kafka stream defined using Python should be defined as follows through updating the sp section of kafka_assembly.yaml

sp:
  description: Kafka Model Clustering with SP
  pipelines:
    kafka-transform:
      env:
       - name: KXI_SP_KAFKA_BROKERS
         value: kafka:9092
      destination: south
      minWorkers: 1
      maxWorkers: 1
      workerThreads: 1
      base: "py-ml"
      spec: |-
        from datetime import timedelta

        import kxi.sp as sp
        import pykx as kx

        sp.run(sp.read.from_kafka('trades')
            | sp.decode.json()
            | sp.map(kx.q('{enlist `time`sym`bid`ask!"PSff"$x}'))
            | sp.window.timer(timedelta(seconds=5))
            | sp.ml.sequential_k_means(['bid', 'ask'], buffer_size = 1000)
            | sp.op.sql('select x from $1 where cluster=0')
            | sp.write.to_stream('trades'))

Once the kafka_assembly.yaml files have been updated they should be applied to the users Insights installation

kubectl apply -f kafka_assembly.yaml