Pipelines
The KX Insights Stream Processor is a stream processing service for transforming, validating, processing, enriching and analyzing real-time data in context. It is used for building data-driven analytics and closed loop control applications optimized for machine generated data from data sources, and delivering data and insights to downstream applications and persistent storage.
The Stream Processor allows stateful custom code to be executed over stream or batch data sources in a resilient manner, scaling up when data sources allow. This code can be used to perform a wide range of ingest, transform, process, enrich, and general analytics operations.
Stream processing
Stream processing is the practice of taking one or more actions on a series of data that originate from one or more systems that are continuously generating data. Actions can include a combination of ingestion (e.g. inserting the data into a downstream database), transformation (e.g. changing a string into a date), aggregations (e.g. sum, mean, standard deviation), analytics (e.g. predicting a future event based on data patterns), and enrichment (e.g. combining data with other data, whether historical or streaming, to create context or meaning).
These actions may be performed serially, in parallel, or both, depending on the use case and data source capabilities. This workflow of ingesting, processing, and outputting data is called a stream processing pipeline.
This document describes how to use the stream processor in kdb Insights using the drag-and-drop, low code user interface.
Operators
Pipelines can be built by dragging operators from the left-hand entity tree and linking them together. Connect operators together and configure each operator's parameters to build the pipeline. Click an operator to configure it using the configuration tray on the right.
Operators are the building block for pipelines. Each operator is capable of operating on data in a stream but each has a specific function. To begin, drag-and-drop operators from the left-hand entity tree into the canvas.
Operators and nodes
The word operators and nodes are used interchangably within this document and within the user interface.
Types
Select from the following types of operator:
- Readers read data from an external or internal data source.
- Writers send data data to an internal or external data sink.
- Functions provide building blocks for transforming data by mapping, filtering or merging streams.
- Encoders map data to serialize data formats that can be shared out of the system with writers.
- Decoders deserialize incoming data into a kdb+ representation for manipulation.
- Windows group data into time blocks for aggregating streams.
- Transform operators provide low code and no code transformations for manipulating data.
- Machine Learning operators can be used to build prediction models of data in a stream.
Actions
When an operator is added, the right-click menu has a number of actions which can be done on the operator.

An example expression pipeline with reader, decoder, transformation and writer operator; right-click for menu options.
- Click a operator in the pipeline canvas to edit; this will update the pipeline settings with details of the operator.
- Right-click a operator in the pipeline canvas to rename, remove or duplicate an operator.  A duplicated operator will retain the contents of the original, and will have the name appended with a numeric tag; -n.

Right click menu on operator editing.
Building a pipeline
- 
From the Document menu bar or Perspective icon menu on the left, create a new Pipeline by clicking the plus icon. 
- 
Click-and-drag an operator listed in the left-hand entity tree into the central workspace; for example, start with a Reader operator. Alternatively, quick start with the Import wizard 
- 
Build the pipeline by adding operators to the workspace and joining them with the operator-anchors. Operators can be removed on a right-click, or added by click-and-drag from the entity tree, or a right-click in an empty part of the workspace. To change a connection, rollover to display the move icon and drag-connect, remove with a right-click. A successful pipeline requires a reader and writer operator as a minimum.  
 A connection of operators by linking to the connector edges together.
- 
Configure each of the pipeline operators; click-select to display operator parameters in the panel on the right. Example Pipeline Reader operator: Expression n:2000; ([] date:n?(reverse .z.d-1+til 10); instance:n?`inst1`inst2`inst3`inst4; sym:n?`USD`EUR`GBP`JPY; cnt:n?10)Transform operator: Apply Schema Leave the Data FormatasAny(required).Column Name Column Type Parse Strings date Timestamp Auto instance Symbol Auto sym Symbol Auto cnt Integer Auto Preferably, a table schema should be loaded from a database created using the database builder; select the database and schema table from their respective dropdowns. Use the above schema for the database table. The reason to define the schema as part of a database, as when we go to define the writer operator, we have to select atablefrom the database. 
 Load a schema from a database.will load  
 Predefined schema from database.Writer operator: Kx Insights Database Use the existing tablename to write the data too. If not already done, create a database.- Database: yourdatabasename
- Table: testtable
- Deduplicate Stream: checked
  
 Completed expression pipeline after deployment.Schemas Require a Timestamp Partition Column All schemas require a timestamp data column. In addition, the table should be partitioned and sorted (interval, historic and/or real-time) by this timestamp column. This can be configured as part of the set up database, or with the essential and advanced properties of a schema. 
- 
Click Saveto save the pipeline. A pipeline can be associated with an assembly prior to deployment, or deployed directly to an active assembly.Database selection If not previously created, create a database before configuring. The initial database will have a default table (and schema); additional tables can be added to the database schema. 
- 
Deploy the assembly associated with the pipeline. When the assembly is ready to be queried it will show a green circle with a tick next to it. The pipeline will appear under Pipelines of the Overview page, and show as Running.Test Deploy A test deploy can be done by clicking  before a full deploy; results will de displayed in the lower panel of the pipeline editor - click on a operator to see the results at each step along the pipeline.  Test deploys will automatically be torn down after completion of the test. See test deploy for more details. before a full deploy; results will de displayed in the lower panel of the pipeline editor - click on a operator to see the results at each step along the pipeline.  Test deploys will automatically be torn down after completion of the test. See test deploy for more details. 
 Results from a test deployment of an Expression pipeline. 
 A running pipeline as shown in the Overview page.
- 
Data from a successfully deployed pipeline can then be queried; open an Query workspace in the Document menu bar from the "+" menu, or from the perspective icon bar on the left. Select the assembly from the dropdown - it may be necessary to refresh the browser to populate the dropdown list. Open the SQL tab, and in the SQL editor add for the aforementioned Expression example: SQL query SELECT * FROM testtable 
- 
Get Data  
 Console output of SQL query.
Set up
In order to use Kafka and Postgres pipelines, relevant charts must be installed, running, and accessible on the cluster.
Lower Case required
When configuring pipelines, lower case should be used in all instances of names and references.
Take a look at some of the guided walkthrough pipeline examples.
Pipeline status
The status of pipeline is available on the overview screen in the "Running pipelines" table.

From this table, you can see the state of the pipeline, stop the pipeline or inspect the pipeline. Click the name of the pipeline to open the pipeline document.

To view the logs for a pipeline, click the "View Diagnostics" button.

To teardown a pipeline, click the "Teardown" button.
Deploying, Testing, and Saving
Pipelines can be deployed from a draft status, although it's best to give a pipeline a name by first saving it.
Save
Save the pipeline.
- Pipeline name
- Give the pipeline a name.  The pipeline name should be lowercase, only include letters and numbers, and exclude special characters other than -.
Duplicate
Creates a copy of the pipeline.
- Pipeline name
- Give the duplicate pipeline a name.  The duplicate pipeline name should be lowercase, only include letters and numbers and have a different name to the original pipeline, while excluding special characters other than -.
Deploy
Deployed pipelines are deployed indefinitely. If a deployed pipeline errors, it will remain deployed. A deployed pipeline can be torn down from the pipeline listing.

Pipeline deployment dialog with option for a test deploy and a full deploy.
Refer to the deployment options for the full set of deploy options. Note that it is recommend that a test deploy is run before running a pipeline normally.
Options

Pipeline canvas options and configuration properties.
| icon | function | 
|---|---|
|  | Reset view | 
|  | Automatically layout pipeline | 
Pipeline Settings
Right-hand-panel contains details of the selected operator

Pipeline properties for a Kafka import.
Advanced Properties
Every created pipeline has Advanced settings. Currently advanced properties are reserved for global code.
Global Code
Global Code is executed before a pipeline is started; this code may be necessary for Stream Processor hooks, global variables and named functions.
Python code within pipeline UI
Development of stream processor pipelines within the UI provides the ability in limited circumstances for logic to be written in Python. This functionality is not supported for all nodes or areas where q code is supported but does provide the ability for users in many circumstances to develop analytic logic in Python rather than q. The following sections of documentation outlines requirements and caveats which you should bear in mind when developing such nodes.
Supported Python code locations
Python code can be used at present in two locations within the stream processor UI.
- Within Global code
- 
Within a selection of function nodes nodes Currently Python nodes within the UI are only supported for the following node types 
Writing Python code within the pipeline UI
When defining the code which is to be used for a given node you need to take the following into account:
- The parsing logic used to determine the function used within a Python node works by retrieving for use the first function defined within the code block. The entire code block will be evaluated but it is assumed that the first function defined is that is retrieved is that used in execution.
- Only Python functions are supported at this time for definition of the execution function logic. You should not attempt to use lambdasor functions within classes to define the execution logic for a node. Additionally while functions/variables defined within the global code block can be used within individual nodes they cannot be referenced by name as the definition of the function to be used.
- All data entering and exiting an executed Python node block will receive and emit it's data as a PyKXobject. The PyKX documentation provides outlines on how these objects can be converted to common Python types and additionally includes notes on performance considerations which should be accounted for when developing code which interacts with PyKX objects.
The following provides an example of a map and filter node logic which could be used on a pipeline
import pykx as kx
def map_func(data):
    # compute a three window moving average for a table
    return kx.q.mavg(3, data)
    # Function expected to return a list of booleans
    # denoting the positions within the data which are to
    # be passed to the next node
def filter_func(data):
        return kx.q.like(data[0], b"trades")