Skip to content

Pipelines

A pipeline is a customizable data ingestion service. Open a pipeline workspace from the Document bar or Perspective menu to define a pipeline.

Nodes

Click-and-drag nodes from the left-hand entity tree into the central workspace.

  • Click a node in the Pipeline workspace to edit.

  • Right-click a node in the Pipeline workspace to remove.

Screenshot

Select from the following types of node:

  1. Readers
  2. Writers
  3. Functions
  4. Encoders
  5. Decoders
  6. Windows
  7. Transform
  8. Machine Learning

Connect nodes together and configure each node's parameters to build the pipeline. Create a database using the simple wizard - associate your pipeline to the assembly before deploying. Explore your data with the scratchpad.

Set up

  1. Access the UI

  2. From the Document menu bar or Perspective icon menu on the left, create a new Pipeline by clicking Screenshot

  3. Click-and-drag a node listed in the left-hand entity tree into the central workspace; for example, start with a Reader node.

    Alternatively, quick start with Import

  4. Build the pipeline by adding nodes to the workspace and joining them with the node-anchors. Nodes can be removed on a right-click, or added by click-and-drag from the entity tree, or a right-click in an empty part of the workspace. To change a connection, rollover to display the move icon and drag-connect, remove with a right-click. A successful pipeline requires a reader and writer node as a minimum.

    Screenshot

  5. Configure each of the pipeline nodes; click-select to display node parameters in the panel on the right.

    Example Pipeline

    Reader Node: Expression

    n:2000;
    ([] date:n?(reverse .z.d-1+til 10); 
    instance:n?`inst1`inst2`inst3`inst4; 
    sym:n?`USD`EUR`GBP`JPY;  
    cnt:n?10)
    

    Transform Node: Apply Schema

    Column Name Column Type Parse Strings
    date Datetime Checked
    instance Symbol Unchecked
    sym Symbol Unchecked
    cnt Integer Unchecked

    Alternatively, a table schema should be loaded from a database created using the wizard; select the database and schema table from their respective dropdowns. Use the above schema for the database table.

    Screenshot

    will load

    Screenshot

    Check the Parse Strings for the Datetime column when loading a schema from a database.

    Writer Node: Kx Insights Database

    Use the existing table name to write the data too. If not already done, create a database.

    • Database: yourdatabasename
    • Table: table

    Schemas Require a Timestamp Partition Column

    Schemas require a timestamp data column. In addition, the table should be partitioned and sorted (interval, historic and/or real-time) by this timestamp column. This can be configured as part of the set up wizard, or with the essential and advanced properties of a schema.

  6. Click Save to save the pipeline. A pipeline should be associated with an assembly prior to deployment.

    Database selection

    If not previously created, use the wizard to create a database assembly. The initial database will have a default table (and schema); additional tables can be added to the database schema.

  7. Deploy the assembly associated with the pipeline. When the assembly is ready to be queried it will show a green circle next to it.

  8. Data from a successfully deployed pipeline can then be explored; open an Explore workspace in the Document menu bar from the "+" menu, or from the perspective icon bar on the left. Select the assembly from the dropdown - it may be necessary to refresh the browser to populate the dropdown list. Open the SQL tab, and in the SQL editor add for the aforementioned Expression example:

    SQL query

    SELECT * FROM table

  9. Get Data

Set up

In order to use Kafka and Postgres pipelines, relevant charts must be installed, running, and accessible on the cluster. To install, refer to installation instructions.

Lower Case required

When configuring pipelines, lower case should be used in all instances of names and references.

Free Tier

Free tier users of insights-demo can also create this custom pipeline, decoding data using a Protocol Buffer. This crime dataset is hosted on code.kx.com and stores crime events occurring in NYC with details on what precinct the event was recorded at, location coordinates of the call, dispatch times and description of the crime.

This file is a protocol buffer file and so has an associated crime.proto file-. This is required for the protocol buffer decoder node.

Protocol Buffer

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. Structured data definition is done once, then using special generated source code, one can easily write and read structured data to and from a variety of data streams using a variety of languages. Further documentation can be found here.

Set Up

Step 1. From the Document menu bar or Perspective icon menu on the left, create a new Pipeline by clicking Screenshot

Step 2. Add a kdb+ Expression node from the list of Readers nodes. The writer will take advantage of native REST interfacing functionality in KX Insights .kurl.sync. Add to the code editor:

URL:"https://code.kx.com/kxiwalkthrough/data/crime.msg";
resp:.kurl.sync[(URL;`GET;(::))];
if[200 <> first resp; ' last resp];
"\n" vs last resp

Step 3. The output of last resp will resemble a binary format without decoding. Add a Protocol Buffer from the list of Decoders nodes.

Message Name: crime
Message Definition, add to the editor:

syntax = "proto3";

message crime {
    uint64 event_id = 1;
    string incident_time = 2;
    uint64 nypd_precinct = 3;
    string borough = 4;
    string patrol_borough = 5;
    uint64 call_x_geo = 6;
    uint64 call_y_geo = 7;
    string radio_code = 8;
    string description = 9;
    string crime_in_progress = 10;
    string call_timestamp = 11;
    string dispatch_timestamp = 12;
    string arrival_timestamp = 13;
    string closing_timestamp = 14;
    double latitude = 15;
    double longitude = 16;
}

Include Fields: Remains checked

Step 4. Define the Schema, add Apply Schema node from the list of Transform nodes. Load from the insights demo database the schema for the crime table. Check Parse Strings for incident_time and description columns.

Data Format: Table
Database: insights-demo
Table: crime

column name column type parse strings
event_id Symbol Unchecked
incident_time Time Checked
nypd_precinct Symbol Unchecked
borough Symbol Unchecked
patrol_borough Symbol Unchecked
call_x_geo Long Unchecked
call_y_geo Long Unchecked
radio_code Symbol Unchecked
description String Checked

Step 5. Define the Writer, add Kx Inights Database node from the list of Writers nodes.

Database: insights-demo Table: crime

Step 6. Save the pipeline; e.g. crimepipe

Step 7. Open your Insights-Demo assembly from the Data entity tree menu. Add your crimepipe pipeline to the assembly. Save and then deploy your insights-demo assembly. When your assembly is ready to query it will show a green circle next to it.

Step 8. Open an explore tab from the Document bar. From the dropdown menu, select the insights-demo assembly - it may be necessary to refresh your browser to view the assembly in the dropdown. Open the SQL tab and use the SQL query:

SELECT * FROM crime

Deploy

Execute the pipeline

Screenshot

Pipeline Name
Name of pipeline. Pipelines names should be lowercase, alphanumeric with hyphens, and less than 40 characters.

Runtime

item description
Debug Mode Enable facility to debug pipeline.
Error Mode Select between Off, Suspend or Dump Stack. Don't use Suspend in a Production Environment.
Protected Execution Enabled for greater granularity in error reporting. When enabled, operating performance may be impacted.
Log Format Define debug reporting format; select between JSON or Text formats.
Log Level Select between Info, Trace, Debug, Warning, Error or Fatal.

Scaling

Minimum number of Workers
Maximum number of Workers
Define the minimum and maximum number of pipeline workers that can be created to run this pipeline. Default Minimum number of Workers as 1, Maximum number of Workers as 10.
Maximum number of Worker Threads
Current maximum limit is 16.

Persistence

Configure a custom image for different SP cluster components.

Controller

Kubernetes persistence configuration.

item description
Disabled Enabled by default, click to disable.
Storage Class Kubernetes storage class name; e.g. standard.
Storage Size Size volume allocated to each controller; defaults to 20Gi.
Checkpoint Frequency Check frequency in milliseconds, defaults to 5,000.

Storage Class

Ensure Storage Class is set to a valid class. This is defined in the cluster and may be different across cloud providers i.e. GCP: standard, AWS: gp2, Azure: default. This information can be retrieved by running:

kubectl get storageclass
Worker

Kubernetes persistence configuration.

item description
Disabled Enabled by default, click to disable.
Storage Class Kubernetes storage class name; e.g. standard.
Storage Size Size volume allocated to each Worker; defaults to 20Gi.
Checkpoint Frequency Check frequency in milliseconds, defaults to 5,000.
Kubernetes
Label
Add a label and value to created resources.
Image Pull Secrets
Add Kubernetes secret reference name; for example, kx-access
Environment Variables
Variable
Value
Add listed variable and values.

For more information on Kubernetes configuration.

Save

Save the pipeline.

Save As

Creates a copy of the pipeline.

Options

icon function
Screenshot Reset view
Screenshot Automatically layout pipeline

Configuration

Right-hand-panel contains details of the selected node

Screenshot

Advanced Properties

Every created pipeline has Advanced settings.

Global Code

Global Code is executed before a pipeline is started; this code may be necessary for Stream Processor hooks, globals and named functions.