Skip to content

Readers

Read data from an external or internal data source into the Stream Processor

Reading data allows for both streaming and batch data to be ingested into kdb Insights Enterprise.

See APIs for more details

A q interface can be used to build pipelines programatically. See the q API for API details.

A Python interface is included along side the q interface and can be used if PyKX is enabled. See the Python API for API details.

The pipeline builder uses a drag-and-drop interface to link together operations within a pipeline. For details on how to wire together a transformation, see the building a pipeline guide.

Callback

Receives data from a callback from the local process or over IPC

Callback reader properties

See APIs for more details

q API: .qsp.read.fromCallback Python API: kxi.sp.read.from_callback

Required Parameters:

name description default
Callback The name of the callback function where data will be received

Example: Publishing from a tickerplant

A tickerplant is a message bus that typically sends data to subscribers in the form of (table; data). In this example, we will subscribe to a tickerplant with a trades and a quotes table with independent callback readers.

First, drag out a callback reader and set the name to be callback name to trade.

Callback reader with trade callback

Second, drag out a callback reader and set the name to be callback name to quote.

Callback reader with quote callback

Finally, setup some global code for establishing a connection to the tickerplant and dispatching to the appropriate callback function based on the table name.

Callback reader global code

Expression

Runs a kdb+ expression

Expression reader properties

Expression reader properties

Python expression properties

See APIs for more details

q API: .qsp.read.fromExpr Python API: kxi.sp.read.from_expr

The expression reader executes a snippet of kdb+ code and passes the result into the pipeline. This operator is mostly used for testing pipelines with mock data.

Required Parameters:

name description default
Expression A snippet of kdb+ code or a Python function to be evaluated. The last statement in this code snippet will be treated as the data source.

Example: Generating data

The code snippet below will generate a sample table with 2000 rows for testing.

n:2000;
([] date:n?(reverse .z.d-1+til 10);
  instance:n?`inst1`inst2`inst3`inst4;
  sym:n?`USD`EUR`GBP`JPY;
  cnt:n?10)
import pandas as pd
import numpy as np
from datetime import datetime,timedelta

def gen():
    n=2000
    dates = [datetime.today().date() - timedelta(10-i,0,0) for i in range(10)]
    return pd.DataFrame({'date' : np.random.choice(dates,n),
              'instance' : np.random.choice(["inst1","inst2","inst3","inst4"],n),
              'sym' : np.random.choice(["EUR","USD","GBP","JPY"],n),
              'cnt' : np.random.randint(0,10,n)})

Google Cloud Storage

Reads an object from Google Cloud Storage

Google cloud storage properties

See APIs for more details

q API: .qsp.read.fromGoogleStorage Python API: kxi.sp.read.from_google_storage

Required Parameters:

name description default
Path Click the "+" button to add one or more paths to read from a Google cloud storage bucket

Optional Parameters:

name description default
Project ID Google Cloud Storage Project ID, if applicable.
File Mode The file read mode. Setting the file mode to Binary will read content as a byte vector. When reading a file as text, data will be read as strings and be split on newlines. Each string vector represents a single line. Note that reading as Binary will work for both text and binary data, but reading as Text will only work for text data. Binary
Offset A byte or character offset to start reading at. 0
Chunking Splits file into smaller batches. Auto will read file size and determine if it should be chunked. Auto
Chunk Size File size of chunks when chunking is enabled. 1MB
Use Authentication Enable Kubernetes secret authentication. No
Kubernetes Secret* The name of a Kubernetes secret to authenticate with Google Cloud Storage.

* If Use Authentication is enabled.

Google Cloud Storage Authentication

Google Cloud Storage Authentication uses kurl for credential validation.

Kurl

Environment variable authentication

To setup authentication using an environment variable, set GOOGLE_STORAGE_TOKEN in Google's gcloud CLI

gcloud auth print-access token

Kubernetes secrets for authentication

A Kubernetes secret may be used to authenticate files. This secret needs to be created in the same namespace as the kdb Insights Enterprise install. The name of that secret is then used in the Kubernetes Secret field when configuring the reader.

To create a Kubernetes secret with Google Cloud credentials:

kubectl create secret generic SECRET_NAME DATA

SECRET_NAME is the name of the secret used in the reader. DATA is the data to add to the secret; for example, a path to a directory containing one or more configuration files using --from-file or -from-env-file flag, or key-value pairs each specified using --from-literal flags.

I want to learn more about Kubernetes Secrets

HTTP

Requests data from an HTTP(S) endpoint

HTTP reader properties

See APIs for more details

q API: .qsp.read.fromHTTP Python API: kxi.sp.read.from_http

Required Parameters:

name description default
URL The URL to send the request to. The URL must include the protocol (http or https), the domain name (example.com) and port if necessary (example.com:8080) as well as the request path. For example: http://code.kx.com/q/ref
Method The method to use for sending the HTTP request. This must be one of GET, POST PUT, HEAD or DELETE GET

Optional Parameters:

name description default
Body The payload of the HTTP request. The body of the request is any content that needs to be included when sending the request to the server.
Header A map of header fields to send as part of the request.
On Response After a response, allow the response to be preprocessed or to trigger another request. Returning :: will process the return from the original request immediately. A return of a string will issue another request with the return value as the URL. A return of a dictionary allows for any of the operator parameters to be reset and a new HTTP request issued. A special 'response' key can be used in the return dictionary to change the payload of the response. If the response key is set to ::, no data is pushed into the pipeline.
Follow Redirects When checked, any redirects will automatically be followed up to the Max Redirects value. Yes
Max Redirects The maximum number of redirects to follow before reporting an error. 5
Max Retry Attempts The maximum number of times to retry a request that fails due to a timeout. 10
Timeout The duration in milliseconds to wait for a request to complete before reporting an error. 5000
Tenant The request tenant to use for providing request authentication details.
Insecure When checked, any unverified server SSL/TLS certificates will be considered trustworthy No
Binary When checked, payload will be returned as binary data, otherwise payload is treated as text data. No
Sync When checked, a synchronous request will block the process until the request is completed. Default is an asynchronous request. No
Reject Errors When checked, a non-successful response will generate an error and stop the pipeline. Yes

Kafka

Consume data from a Kafka topic

Kafka reader properties

See APIs for more details

q API: .qsp.read.fromKafka Python API: kxi.sp.read.from_kafka

Kafka is a scalable streaming message queue for processing streaming data. See this tutorial for details about getting setup with a Kafka broker and ingesting data.

Required Parameters:

name description default
Broker Location of the Kafka broker as host:port. If multiple brokers are available, they can be entered as a comma separated list
Topic The name of the Kafka topic to subscribe to.

Optional Parameters:

name description default
Offset Read data from the Start of the Topic; i.e. all data, or the End; i.e. new data only. End
Use TLS Enable TLS for encrypting the Kafka connection. When selected, certificate details must be provided with a Kubernetes TLS Secret No
Kubernetes Secret* The name of a Kubernetes secret that is already available in the cluster and contains your TLS certificates.
Certificate Password* TLS certificate password, if required.
Use Schema Registry Use the schema registry to automatically decode data in a Kafka stream for JSON and Protocol Buffer payloads. When enabled, will automatically deserialize data based on the latest schema. No
Registry** Kafka Schema Registry URL
As List** Set to true for Kafka Schema Registry messages to omit field names when decoding Protocol Buffer schemas, and instead return only the list of values. No

* If TLS enabled. ** If Schema Registry enabled.

See this guide for more details on setting up TLS Secrets

MQTT

Subscribe to an MQTT topic

MQTT reader properties

See APIs for more details

q API: .qsp.read.fromMQTT Python API: kxi.sp.read.from_mqtt

Read messages from an MQTT broker. MQTT is a lightweight, publish-subscribe, machine to machine network protocol for message queuing service. It is designed for connections with remote locations that have devices with resource constraints or limited network bandwidth.

name description default
Broker Location of the MQTT broker as protocol://host:port.
Topic The name of the MQTT topic to subscribe to.

Optional Parameters:

name description default
Username Username for the MQTT broker connection
Use TLS Enable TLS for encrypting the Kafka connection. When selected, certificate details must be provided with a Kubernetes TLS Secret No
Kubernetes Secret* The name of a Kubernetes secret that is already available in the cluster and contains your TLS certificates.
Certificate Password* TLS certificate password, if required.

* If TLS enabled.

See this guide for more details on setting up TLS Secrets

Microsoft Azure Storage

Reads an object from Microsoft Azure Storage

Microsoft Azure Stoage reader properties

See APIs for more details

q API: .qsp.read.fromAzureStorage Python API: kxi.sp.read.from_azure_storage

Required Parameters:

name description default
Path Click the "+" button to add one or more paths to read from an Azure cloud storage bucket
Account The name of the Azure Storage Account hosting the data $AZURE_STORAGE_ACCOUNT

Optional Parameters:

name description default
File Mode The file read mode. Setting the file mode to Binary will read content as a byte vector. When reading a file as text, data will be read as strings and be split on newlines. Each string vector represents a single line. Note that reading as Binary will work for both text and binary data, but reading as Text will only work for text data. Binary
Offset A byte or character offset to start reading at. 0
Chunking Splits file into smaller batches. Auto will read file size and determine if it should be chunked. Auto
Chunk Size File size of chunks when chunking is enabled. 1MB
Use Authentication Enable Kubernetes secret authentication. No
Kubernetes Secret* The name of a Kubernetes secret to authenticate with Azure Cloud Storage.

* If Use Authentication is enabled.

Microsoft Azure Authentication

Microsoft Azure Storage Authentication uses kurl for credential validation.

Kurl

Environment variable Authentication

To set up authentication with an environment variable, set AZURE_STORAGE_ACCOUNT to the name of the storage account to read from, and AZURE_STORAGE_SHARED_KEY to the one of the keys of the account. Additional details on shared keys usage available here.

Shared Keys

Kubernetes secrets for authentication

A Kubernetes secret may be used to authenticate files. This secret needs to be created in the same namespace as the kdb Insights Enterprise install. The name of that secret is then used in the Kubernetes Secret field when configuring the reader.

To create a Kubernetes secret with Azure credentials:

kubectl create secret generic --from-literal=token="$AZURE_STORAGE_TOKEN" ms-creds

I want to learn more about Kubernetes Secrets.

KX Insights Stream

Reads data from a KX Insights Stream

KX Insights Stream reader properties

See APIs for more details

q API: .qsp.read.fromStream Python API: kxi.sp.read.from_stream

Insights streams

Optional Parameters:

name description default
Table Filter incoming messages to show only messages for the specified table. If no filter is provided, all messages in the stream are consumed

To read data from a stream, an assembly must be provided. This is done at pipeline deploy time. When you click the Deploy button and select the Assembly Integration tab. On this page, you need to set the assembly name and topic name.

Deploy dialog assembly integration

Amazon S3

Reads an object from Amazon S3

Amazon S3 reader properties

See APIs for more details

q API: .qsp.read.fromAmazonS3 Python API: kxi.sp.read.from_amazon_s3

Required Parameters:

name description default
Path Click the "+" button to add one or more paths to read from an Amazons S3 cloud storage bucket

Optional Parameters:

name description default
Region The AWS region of the bucket to authenticate against us-east-1
File Mode The file read mode. Setting the file mode to Binary will read content as a byte vector. When reading a file as text, data will be read as strings and be split on newlines. Each string vector represents a single line. Note that reading as Binary will work for both text and binary data, but reading as Text will only work for text data. Binary
Offset A byte or character offset to start reading at. 0
Chunking Splits file into smaller batches. Auto will read file size and determine if it should be chunked. Auto
Chunk Size File size of chunks when chunking is enabled. 1MB
Use Authentication Enable Kubernetes secret authentication. No
Kubernetes Secret* The name of a Kubernetes secret to authenticate with Azure Cloud Storage.

* If Use Authentication is enabled.

Amazon S3 Authentication

To access private buckets or files, a Kubernetes secret needs to be created that contains valid AWS credentials. This secret needs to be created in the same namespace as the kdb Insights Enterprise install. The name of that secret is then used in the Kubernetes Secret field when configuring the reader.

To create a Kubernetes secret containing AWS credentials:

kubectl create secret generic --from-file=credentials=<path/to/.aws/credentials> <secret-name>

Where <path/to/.aws/credentials> is the path to an AWS credentials file, and <secret-name> is the name of the Kubernetes secret to create.

Note that this only needs to be done once, and each subsequent usage of the Amazon S3 reader can re-use the same Kubernetes secret.

PostgreSQL

Execute a query on a PostgreSQL database

PostgreSQL reader properties

See APIs for more details

q API: .qsp.read.fromPostgres Python API: kxi.sp.read.from_postgres

Required Parameters:

name description default
Server The hostname of the database to issue a query against.
Port The port to use for connecting to the database server.
Database The name of the database to issue a query against in the target server.
Query An SQL query to issue against the target database for pulling data into the pipeline.

Optional Parameters:

name description default
Username The user account to use when establishing a connection to a PostgreSQL server. Note that if the username is provided, a password prompt will appear during deploy time.

SQL Server

Execute a query on a SQL Server database

SQL Server reader properties

See APIs for more details

q API: .qsp.read.fromSQLServer Python API: kxi.sp.read.from_sql_server

Required Parameters:

name description default
Server The hostname of the database to issue a query against.
Port The port to use for connecting to the database server.
Database The name of the database to issue a query against in the target server.
Query An SQL query to issue against the target database for pulling data into the pipeline.

Optional Parameters:

name description default
Username The user account to use when establishing a connection to SQL Server instance. Note that if the username is provided, a password prompt will appear during deploy time.