Skip to content

Streaming analytics

Streaming Analytics is a highly configurable subscription-based framework for streaming data to clients in real time. This allows users to visualize and efficiently make decisions on fast data.

Streaming analytic

The main component of the framework is the streaming analytic itself. This is a parameterized function called by an end client to initiate a subscription. Once this is complete, the client takes no further action other than to listen for updates. Different parameters will result in different data streams.

  • Client initiates a subscription with the analytic name and parameters
  • This is routed to a target process in the backend
  • Analytic is called on that process and sets up some global state
  • Confirmation returned back to client

Example

An example streaming analytic, relating to server monitoring. Would likely be called on a CEP process consuming the monitoring stats.

  • It takes a dictionary with a single parameter; a list of symbols
  • This corresponds to server names
  • A unique ID is generated for the subscription
  • Subscription added to internal table
  • ID returned to framework
// Define the streaming analytic returning user-generated long ID
.monPublic.sub:{[param]
  syms:(), param`syms;
  .mon.streamingID+:1j;
  `.mon.streamingSubs upsert `id`syms!(.mon.streamingID; syms);
  .mon.streamingID }

Streaming data

Once the subscription state is setup, it is up to the underlying process to decide when to stream the data. This might be event-based, i.e. through a upd function, where incoming data triggers updates published to streaming subscriptions. It could also be triggered on a timer to throttle or aggregate by time.

The .sa.pub API is used to publish data to a stream.

Example

The code below shows how a process would use the event-based approach to trigger streaming analytic updates from incoming data.

It assumes the upd or realTimeMsgFunct does something like:

upd:{[tab; data]
  // ..
  if[tab=`monCPU; .mon.pubStreaming[data]]; }

Code:

  • The streaming analytic takes a list of syms the client is interested in
  • Updates to the underlying CPU data will be passed into the streaming logic
  • The syms filter will be applied to the update for each active subscription
  • Data published if any rows for that sub
// Set up subscription ID and state table (with dummy row to ensure correct type)
.mon.streamingSubs:([id:`u#enlist -1j] syms:enlist "s"$())
.mon.streamingID:0j

// Check for streaming subscriptions and filter incoming data for each active one
.mon.pubStreaming:{[data]
  toRun:1_ 0!.mon.streamingSubs;
  if[not count toRun; :()];

  .mon.filter[data] each toRun; };

// Function to select subscribed syms
.mon.filter:{[data; x]
  s:x`syms;
  w:();
  if[count s; w:enlist(in; `sym; enlist s)];

  t:?[data; w; 0b; ()];
  if[count t; .sa.pub[x`id; t] ]; }

Unsubscribe and snapshot

There are two other important components of the streaming framework; unsubscribe and snapshot.

The unsubscribe is triggered when a client shuts down the subscription. This removes the subscription state from the target process.

The snapshot concept is powerful where a stream is keyed. In the example so far, CPU data is the subject of the streams. In this case, the data will be keyed by the server name or sym column (ignoring that most servers are multi-core). If the client is only interested in the current state, then only the latest by sym is relevant. One approach to this is to send the current state for all servers with every update, however this sends larger datasets with every update, resulting in more processing, rendering etc.

Initial update:

sym      cpu
-------------
server1  10.1
server2  12.2
server3  9.9
..

Server2 updates:

sym      cpu
-------------
server1  10.1
server2  14.2
server3  9.9
..

The other approach is to send only what has changed. This is what the example has done to this point. When an update arrives, the data published is only the syms contained in the incoming data. The client should merge this to the existing state for the remaining syms. This is a much more efficient approach as it saves on bandwidth, processing etc.

Current state:

sym      cpu 
-------------
server1  10.1
server2  12.2
server3  9.9 
.. 

Server2 updates:

sym      cpu 
-------------
server2  14.2

However consider the case where a subscription is initiated after the system is already running. The client will only see the full universe once each sym has updated. To solve this, a snapshot analytic runs to provide the current view of the universe. In this case the snapshot returns the current state table from above.

Snapshots are also used when multiple clients share the same subscription. The framework combines these to a single database subscription. However the late joiners will need to run a snapshot to get up to speed.

The last line below is required as it registers the association between the streaming analytic and its unsubscription, and snapshot equivalents.

Example

// remove the subscription for a particular ID
.mon.unsub:{[x] delete from `.mon.streamingSubs where id=x}

// run snapshot for a subscription
.mon.snapshot:{[x]
  if[not count s:exec from .mon.streamingSubs where id=x; :() ];
  data:0!select from monCPU;
  .mon.filter[data; s];
 };

.sa.registerfuncs[`.monPublic.sub; `.mon.unsub; `.mon.snapshot];

API

Function Parameters Called By Description
(Streaming Analytic) (defined by analytic signature) QM Actual name will match analytic name. Must return a user-generated long ID.
(Unsubscription Analytic) uID (long) QM Called to remove a subscription
(Snapshot Analytic) uID (long) QM Called when subscription initialized to provide an initial dataset
.sa.registerfuncs subF (symbol)
unsubF (symbol)
snapF (symbol)
User code Used to register unsubscription and snapshot functions for a given streaming analytic.
.sa.pub uID (long)
data (table)
User code Called to publish stream data. uID matches ID returned from streaming analytic

Query manager

Streaming analytics are managed by the Query Manager (QM) process. Clients connect to the QM, register their subscriptions and it manages the database connections, initialization etc.

The primary purpose is to manage overlapping subscriptions to reduce load on the database layer. It can also provide load-balancing functionality across connection groups according to the group mode.

The main client interface to the QM is via the App Server.

Uniqueness

If non-unique subscriptions are initialized by multiple clients, instead of setting up a unique subscription per sub, only one will be created and results will be broadcast to each client. Uniqueness is defined by the following components of the subscriptions:

  • Analytic name
  • Parameters
  • Target process
  • End-user

Standalone mode

The streaming analytics interface is available in all process instances by default. However it is possible to use it outside Kx Control. Processes that want to use it should manually load the streamingapi.q file located in ${DELTACONTROL\_HOME}/shared/q folder.

The existing API are still used to publish data and register subscription handlers. There are two additional points to note.

Disconnection handler

The interface overwrites the kdb+ .z.pc handler in order to manage disconnects from the QM.

  • If an existing handler it exists, it will overload it so the existing logic gets run and then .sa.disconnect.
  • Otherwise, .z.pc will be set to .sa.disconnect

The overwrite behavior can be disabled by setting a global variable called .sa.zpc before loading streamingapi.q.

If disabling the overwrite, call .sa.disconnect directly to clean up handles.

Debug logging

Debug logging can be enabled or disabled.

.sa.log.setDebug[1b]  // enable
.sa.log.setDebug[0b]  // disable