Skip to content

Pipelines

A pipeline is a customizable data ingestion service. Open a pipeline workspace from the Document bar or Perspective menu to define a pipeline.

Nodes

Click-and-drag nodes from the left-hand entity tree into the central workspace.

Actions

When a node is added, the right-click menu has a number of actions which can be done on the node.

An example expression pipeline with reader, decoder, transformation and writer node; right-click for menu options.
An example expression pipeline with reader, decoder, transformation and writer node; right-click for menu options.

  • Click a node in the Pipeline workspace to edit; this will update the Pipeline settings with details of the node.

  • Right-click a node in the Pipeline workspace to rename, remove- or duplicate a node. A duplicated node will retain the contents of the original, and will have the name appended with a numeric tag; -n.

Right click menu on node editing.
Right click menu on node editing.

Types

Select from the following types of node:

  1. Readers
  2. Writers
  3. Functions
  4. Encoders
  5. Decoders
  6. Windows
  7. Transform
  8. Machine Learning

Connect nodes together and configure each node's parameters to build the pipeline. Create a database using the simple wizard - associate your pipeline to an assembly before deploying, or deploy the pipeline after completion to an active assembly. Explore your data with the scratchpad.

Set up

  1. Access the UI

  2. From the Document menu bar or Perspective icon menu on the left, create a new Pipeline by clicking Screenshot

  3. Click-and-drag a node listed in the left-hand entity tree into the central workspace; for example, start with a Reader node.

    Alternatively, quick start with Import

  4. Build the pipeline by adding nodes to the workspace and joining them with the node-anchors. Nodes can be removed on a right-click, or added by click-and-drag from the entity tree, or a right-click in an empty part of the workspace. To change a connection, rollover to display the move icon and drag-connect, remove with a right-click. A successful pipeline requires a reader and writer node as a minimum.

    A connection of nodes by linking to the connector edges together.
    A connection of nodes by linking to the connector edges together.

  5. Configure each of the pipeline nodes; click-select to display node parameters in the panel on the right.

    Example Pipeline

    Reader Node: Expression

    n:2000;
    ([] date:n?(reverse .z.d-1+til 10); 
    instance:n?`inst1`inst2`inst3`inst4; 
    sym:n?`USD`EUR`GBP`JPY;  
    cnt:n?10)
    

    Transform Node: Apply Schema

    Leave the Data Format as Any (required).

    Column Name Column Type Parse Strings
    date Timestamp Auto
    instance Symbol Auto
    sym Symbol Auto
    cnt Integer Auto

    Preferably, a table schema should be loaded from a database created using the wizard; select the database and schema table from their respective dropdowns. Use the above schema for the database table. The reason to define the schema as part of a database, as when we go to define the writer node, we have to select a table from the database.

    Load a schema from a database.
    Load a schema from a database.

    will load

    Predefined schema from database.
    Predefined schema from database.

    Writer Node: Kx Insights Database

    Use the existing table name to write the data too. If not already done, create a database.

    • Database: yourdatabasename
    • Table: testtable
    • Deduplicate Stream: checked

    Completed expression pipeline after deployment.
    Completed expression pipeline after deployment.

    Schemas Require a Timestamp Partition Column

    All schemas require a timestamp data column. In addition, the table should be partitioned and sorted (interval, historic and/or real-time) by this timestamp column. This can be configured as part of the set up wizard, or with the essential and advanced properties of a schema.

    Table naming

    Avoid using reserved keywords for SQL or q when naming tables as this can return an error; ; e.g. do not give a table in the schema a name of table.

  6. Click Save to save the pipeline. A pipeline can be associated with an assembly prior to deployment, or deployed directly to an active assembly.

    Database selection

    If not previously created, use the wizard to create a database assembly. The initial database will have a default table (and schema); additional tables can be added to the database schema.

  7. Deploy the assembly associated with the pipeline. When the assembly is ready to be queried it shows a green circle with a tick next to it. The pipeline will appear under Pipelines of the Overview page, and show as Running.

    Test Deploy

    A test deploy can be done by clicking Test deploy before a full deploy; results will de displayed in the lower panel of the pipeline editor - click on a node to see the results at each step along the pipeline. Test deploys will automatically be torn down after completion of the test.

    Results from a test deployment of an Expression pipeline.
    Results from a test deployment of an Expression pipeline.

    A running pipeline as shown in the Overview page.
    A running pipeline as shown in the Overview page.

  8. Data from a successfully deployed pipeline can then be queried; open an Query workspace in the Document menu bar from the "+" menu, or from the perspective icon bar on the left. Select the assembly from the dropdown - it may be necessary to refresh the browser to populate the dropdown list. Open the SQL tab, and in the SQL editor add for the aforementioned Expression example:

    SQL query

    SELECT * FROM testtable

  9. Get Data

    Console output of SQL query.
    Console output of SQL query.

  10. A running pipeline consumes memory and cpu resources. When done with a pipeline, tear it down to free up resources.

Set up

In order to use Kafka and Postgres pipelines, relevant charts must be installed, running, and accessible on the cluster. To install, refer to installation instructions.

Lower Case required

When configuring pipelines, lower case should be used in all instances of names and references.

Take a look at some of the guided walkthrough pipeline examples.

Deploying, Testing, and Saving

Pipelines can be deployed from a draft status, although it's best to give a pipepline a name by first saving it.

Save

Save the pipeline.

Pipeline name
Give the pipeline a name. The pipeline name should be lowercase, only include letters and numbers, and exclude special characters other than -.

Duplicate

Creates a copy of the pipeline.

Pipeline name
Give the duplicate pipeline a name. The duplicate pipeline name should be lowercase, only include letters and numbers and have a different name to the original pipeline, while excluding special characters other than -.

Deploy

Deployed pipelines are deployed indefinitely. If a deployed pipeline errors, it will remain deployed. A deployed pipeline can be torn down from the pipeline listing.

Pipeline deployment dialog with option for a test deploy and a full deploy.
Pipeline deployment dialog with option for a test deploy and a full deploy.

Runtime

item description
Debug Mode Enable facility to debug pipeline.
Error Mode Select between Off, Suspend or Dump Stack. Don't use Suspend in a Production Environment.
Protected Execution Enabled for greater granularity in error reporting. When enabled, operating performance may be impacted.
Log Format Define debug reporting format; select between JSON or Text formats.
Log Level Select between Info, Trace, Debug, Warning, Error or Fatal.

Resources

Controller

CPU

item description
Minimum CPU Minimum amount of CPU for pipeline; 1 for one core, 0.5 is half time for one core (>=0.01 CPU).
Maximum CPU Maximum amount of CPU for pipeline; must be <= 4 CPU.

Memory

item description
Minimum Memory Minimum memory to allocate to pipeline and always be available; >= 1 MB.
Maximum Memory Maximum memory to allocate to pipeline; once reached it will return out-of-memory error; <= 2000 MB.
Worker

CPU

item description
Minimum CPU Minimum amount of CPU for pipeline; 1 for one core, 0.5 is half time for one core (>=0.01 CPU).
Maximum CPU Maximum amount of CPU for pipeline.

Memory

item description
Minimum Memory Minimum memory to allocate to pipeline and always be available; >= 1 MB.
Maximum Memory Maximum memory to allocate to pipeline; once reached it will return out-of-memory error.

Config

item description
Minimum number of workers Define the minimum number of workers that can be created to run the pipeline; >= 1.
Maximum number of workers Define the maximum number of workers that can be created to run the pipeline; <= 10.
Maximum number of worker threads Maximum numboer of worker threads; value between 1 and 16.

Persistence

Configure a custom image for different SP cluster components.

Controller

Kubernetes persistence configuration.

item description
Disabled Enabled by default, click to disable.
Storage Class Kubernetes storage class name; e.g. standard.
Storage Size Size volume allocated to each controller; defaults to 20Gi.
Checkpoint Frequency Check frequency in milliseconds, defaults to 5,000.

Storage Class

Ensure Storage Class is set to a valid class. This is defined in the cluster and may be different across cloud providers i.e. GCP: standard, AWS: gp2, Azure: default. This information can be retrieved by running:

kubectl get storageclass
Worker

Kubernetes persistence configuration.

item description
Disabled Enabled by default, click to disable.
Storage Class Kubernetes storage class name; e.g. standard.
Storage Size Size volume allocated to each Worker; defaults to 20Gi.
Checkpoint Frequency Check frequency in milliseconds, defaults to 5,000.

Kubernetes

Label
Add a label and value to created resources.
Image Pull Secrets
Add Kubernetes secret reference name; for example, kx-access

Environment Variables

Variable
Value
Add listed variable and values.

For more information on Kubernetes configuration.

Advanced

Worker Image
A registry URL that points to a custom Stream Processor worker image.
Controller Image
A registry URL that points to a custom Stream Processor controller image.

Test Deploy

A test deploy will deploy the pipeline with data tracing enabled, and then tear it down automatically. The pipeline will be torn down if it errors, or when it receives the first batch of data, or after 30 seconds of waiting for data; whichever comes first. Any erroring nodes will be highlighted, and the data being outputted by each node can be seen by selecting a given node.

Erroring nodes

When viewing data for a given node, the data being viewed is the data as it leaves that node. However, when a node errors the input data is displayed.

Pipeline deployment dialog with option for a test deploy and a full deploy.
Pipeline deployment dialog with option for a test deploy and a full deploy.

Options

Pipeline workspace options and configuration properties.
Pipeline workspace options and configuration properties.

icon function
Screenshot Reset view
Screenshot Automatically layout pipeline

Pipeline Settings

Right-hand-panel contains details of the selected node

Pipeline properties for a Kafka import.
Pipeline properties for a Kafka import.

Advanced Properties

Every created pipeline has Advanced settings.

Global Code

Global Code is executed before a pipeline is started; this code may be necessary for Stream Processor hooks, globals and named functions.

Teardown

Running pipelines consume memory and CPU resoruces. You can teardown a pipeline to return consumed resources.

Teardown a pipeline from the list of running pipelines.
Teardown a pipeline from the list of running pipelines.

An option to Clean up resources after tearing down is also offered. This deletes data generated by the pipeline, freeing up additional resources.

When selecting Teardown, an option to Clean up resources after tearing down is available. When checked, pipeline data will be deleted as part of the teardown process.
When selecting Teardown, an option to Clean up resources after tearing down is available. When checked, pipeline data will be deleted as part of the teardown process.