Readers
Read data from an external or internal data source into the Stream Processor
Reading data allows for both streaming and batch data to be ingested into kdb Insights Enterprise.
See APIs for more details
A q interface can be used to build pipelines programatically. See the q API for API details.
A Python interface is included along side the q interface and can be used if PyKX is enabled. See the Python API for API details.
The pipeline builder uses a drag-and-drop interface to link together operations within a pipeline. For details on how to wire together a transformation, see the building a pipeline guide.
Callback
Receives data from a callback from the local process or over IPC
See APIs for more details
q API: .qsp.read.fromCallback
•
Python API: kxi.sp.read.from_callback
Required Parameters:
name | description | default |
---|---|---|
Callback | The name of the callback function where data will be received |
Example: Publishing from a tickerplant
A tickerplant is a message bus that typically sends data to subscribers in the form of (table; data)
. In this example, we will subscribe to a tickerplant with a trades and a quotes table with independent callback readers.
First, drag out a callback reader and set the name to be callback name to trade
.
Second, drag out a callback reader and set the name to be callback name to quote
.
Finally, setup some global code for establishing a connection to the tickerplant and dispatching to the appropriate callback function based on the table name.
Expression
Runs a kdb+ expression
See APIs for more details
q API: .qsp.read.fromExpr
•
Python API: kxi.sp.read.from_expr
The expression reader executes a snippet of kdb+ code and passes the result into the pipeline. This operator is mostly used for testing pipelines with mock data.
Required Parameters:
name | description | default |
---|---|---|
Expression | A snippet of kdb+ code or a Python function to be evaluated. The last statement in this code snippet will be treated as the data source. |
Example: Generating data
The code snippet below will generate a sample table with 2000 rows for testing.
n:2000;
([] date:n?(reverse .z.d-1+til 10);
instance:n?`inst1`inst2`inst3`inst4;
sym:n?`USD`EUR`GBP`JPY;
cnt:n?10)
import pandas as pd
import numpy as np
from datetime import datetime,timedelta
def gen():
n=2000
dates = [datetime.today().date() - timedelta(10-i,0,0) for i in range(10)]
return pd.DataFrame({'date' : np.random.choice(dates,n),
'instance' : np.random.choice(["inst1","inst2","inst3","inst4"],n),
'sym' : np.random.choice(["EUR","USD","GBP","JPY"],n),
'cnt' : np.random.randint(0,10,n)})
Google Cloud Storage
Reads an object from Google Cloud Storage
See APIs for more details
q API: .qsp.read.fromGoogleStorage
•
Python API: kxi.sp.read.from_google_storage
Required Parameters:
name | description | default |
---|---|---|
Path | Click the "+" button to add one or more paths to read from a Google cloud storage bucket |
Optional Parameters:
name | description | default |
---|---|---|
Project ID | Google Cloud Storage Project ID, if applicable. | |
File Mode | The file read mode. Setting the file mode to Binary will read content as a byte vector. When reading a file as text , data will be read as strings and be split on newlines. Each string vector represents a single line. Note that reading as Binary will work for both text and binary data, but reading as Text will only work for text data. |
Binary |
Offset | A byte or character offset to start reading at. | 0 |
Chunking | Splits file into smaller batches. Auto will read file size and determine if it should be chunked. |
Auto |
Chunk Size | File size of chunks when chunking is enabled. | 1MB |
Use Authentication | Enable Kubernetes secret authentication. | No |
Kubernetes Secret* | The name of a Kubernetes secret to authenticate with Google Cloud Storage. |
*
If Use Authentication
is enabled.
Google Cloud Storage Authentication
Google Cloud Storage Authentication uses kurl
for credential validation.
Environment variable authentication
To setup authentication using an environment variable, set GOOGLE_STORAGE_TOKEN
in Google's gcloud CLI
gcloud auth print-access token
Kubernetes secrets for authentication
A Kubernetes secret may be used to authenticate files. This secret needs to be created in the same namespace as the kdb Insights Enterprise install. The name of that secret is then used in the Kubernetes Secret field when configuring the reader.
To create a Kubernetes secret with Google Cloud credentials:
kubectl create secret generic SECRET_NAME DATA
SECRET_NAME
is the name of the secret used in the reader. DATA
is the data to add to the secret; for example, a path to a directory containing one or more configuration files using --from-file
or -from-env-file
flag, or key-value pairs each specified using --from-literal
flags.
I want to learn more about Kubernetes Secrets
HTTP
Requests data from an HTTP(S) endpoint
See APIs for more details
q API: .qsp.read.fromHTTP
•
Python API: kxi.sp.read.from_http
Required Parameters:
name | description | default |
---|---|---|
URL | The URL to send the request to. The URL must include the protocol (http or https ), the domain name (example.com ) and port if necessary (example.com:8080 ) as well as the request path. For example: http://code.kx.com/q/ref |
|
Method | The method to use for sending the HTTP request. This must be one of GET , POST PUT , HEAD or DELETE |
GET |
Optional Parameters:
name | description | default |
---|---|---|
Body | The payload of the HTTP request. The body of the request is any content that needs to be included when sending the request to the server. | |
Header | A map of header fields to send as part of the request. | |
On Response | After a response, allow the response to be preprocessed or to trigger another request. Returning :: will process the return from the original request immediately. A return of a string will issue another request with the return value as the URL. A return of a dictionary allows for any of the operator parameters to be reset and a new HTTP request issued. A special 'response' key can be used in the return dictionary to change the payload of the response. If the response key is set to :: , no data is pushed into the pipeline. |
|
Follow Redirects | When checked, any redirects will automatically be followed up to the Max Redirects value. |
Yes |
Max Redirects | The maximum number of redirects to follow before reporting an error. | 5 |
Max Retry Attempts | The maximum number of times to retry a request that fails due to a timeout. | 10 |
Timeout | The duration in milliseconds to wait for a request to complete before reporting an error. | 5000 |
Tenant | The request tenant to use for providing request authentication details. | |
Insecure | When checked, any unverified server SSL/TLS certificates will be considered trustworthy | No |
Binary | When checked, payload will be returned as binary data, otherwise payload is treated as text data. | No |
Sync | When checked, a synchronous request will block the process until the request is completed. Default is an asynchronous request. | No |
Reject Errors | When checked, a non-successful response will generate an error and stop the pipeline. | Yes |
Kafka
Consume data from a Kafka topic
See APIs for more details
q API: .qsp.read.fromKafka
•
Python API: kxi.sp.read.from_kafka
Kafka is a scalable streaming message queue for processing streaming data. See this tutorial for details about getting setup with a Kafka broker and ingesting data.
Required Parameters:
name | description | default |
---|---|---|
Broker | Location of the Kafka broker as host:port. If multiple brokers are available, they can be entered as a comma separated list | |
Topic | The name of the Kafka topic to subscribe to. |
Optional Parameters:
name | description | default |
---|---|---|
Offset | Read data from the Start of the Topic; i.e. all data, or the End ; i.e. new data only. |
End |
Use TLS | Enable TLS for encrypting the Kafka connection. When selected, certificate details must be provided with a Kubernetes TLS Secret | No |
Kubernetes Secret* | The name of a Kubernetes secret that is already available in the cluster and contains your TLS certificates. | |
Certificate Password* | TLS certificate password, if required. | |
Use Schema Registry | Use the schema registry to automatically decode data in a Kafka stream for JSON and Protocol Buffer payloads. When enabled, will automatically deserialize data based on the latest schema. | No |
Registry** | Kafka Schema Registry URL | |
As List** | Set to true for Kafka Schema Registry messages to omit field names when decoding Protocol Buffer schemas, and instead return only the list of values. | No |
*
If TLS
enabled.
**
If Schema Registry
enabled.
See this guide for more details on setting up TLS Secrets
MQTT
Subscribe to an MQTT topic
See APIs for more details
q API: .qsp.read.fromMQTT
•
Python API: kxi.sp.read.from_mqtt
Read messages from an MQTT broker. MQTT is a lightweight, publish-subscribe, machine to machine network protocol for message queuing service. It is designed for connections with remote locations that have devices with resource constraints or limited network bandwidth.
name | description | default |
---|---|---|
Broker | Location of the MQTT broker as protocol://host:port . |
|
Topic | The name of the MQTT topic to subscribe to. |
Optional Parameters:
name | description | default |
---|---|---|
Username | Username for the MQTT broker connection | |
Use TLS | Enable TLS for encrypting the Kafka connection. When selected, certificate details must be provided with a Kubernetes TLS Secret | No |
Kubernetes Secret* | The name of a Kubernetes secret that is already available in the cluster and contains your TLS certificates. | |
Certificate Password* | TLS certificate password, if required. |
*
If TLS
enabled.
See this guide for more details on setting up TLS Secrets
Microsoft Azure Storage
Reads an object from Microsoft Azure Storage
See APIs for more details
q API: .qsp.read.fromAzureStorage
•
Python API: kxi.sp.read.from_azure_storage
Required Parameters:
name | description | default |
---|---|---|
Path | Click the "+" button to add one or more paths to read from an Azure cloud storage bucket | |
Account | The name of the Azure Storage Account hosting the data | $AZURE_STORAGE_ACCOUNT |
Optional Parameters:
name | description | default |
---|---|---|
File Mode | The file read mode. Setting the file mode to Binary will read content as a byte vector. When reading a file as text , data will be read as strings and be split on newlines. Each string vector represents a single line. Note that reading as Binary will work for both text and binary data, but reading as Text will only work for text data. |
Binary |
Offset | A byte or character offset to start reading at. | 0 |
Chunking | Splits file into smaller batches. Auto will read file size and determine if it should be chunked. |
Auto |
Chunk Size | File size of chunks when chunking is enabled. | 1MB |
Use Authentication | Enable Kubernetes secret authentication. | No |
Kubernetes Secret* | The name of a Kubernetes secret to authenticate with Azure Cloud Storage. |
*
If Use Authentication
is enabled.
Microsoft Azure Authentication
Microsoft Azure Storage Authentication uses kurl
for credential validation.
Environment variable Authentication
To set up authentication with an environment variable, set AZURE_STORAGE_ACCOUNT
to the name of the storage account to read from, and AZURE_STORAGE_SHARED_KEY
to the one of the keys of the account. Additional details on shared keys usage available here.
Kubernetes secrets for authentication
A Kubernetes secret may be used to authenticate files. This secret needs to be created in the same namespace as the kdb Insights Enterprise install. The name of that secret is then used in the Kubernetes Secret field when configuring the reader.
To create a Kubernetes secret with Azure credentials:
kubectl create secret generic --from-literal=token="$AZURE_STORAGE_TOKEN" ms-creds
I want to learn more about Kubernetes Secrets.
kdb Insights Stream
Reads data from a kdb Insights Stream
See APIs for more details
q API: .qsp.read.fromStream
•
Python API: kxi.sp.read.from_stream
Insights streams
Optional Parameters:
name | description | default |
---|---|---|
Table | Filter incoming messages to show only messages for the specified table. If no filter is provided, all messages in the stream are consumed |
To read data from a stream, an assembly must be provided. This is done at pipeline deploy time. When you click the Deploy button and select the Assembly Integration tab. On this page, you need to set the assembly name and topic name.
Amazon S3
Reads an object from Amazon S3
See APIs for more details
q API: .qsp.read.fromAmazonS3
•
Python API: kxi.sp.read.from_amazon_s3
Required Parameters:
name | description | default |
---|---|---|
Path | Click the "+" button to add one or more paths to read from an Amazons S3 cloud storage bucket |
Optional Parameters:
name | description | default |
---|---|---|
Region | The AWS region of the bucket to authenticate against | us-east-1 |
File Mode | The file read mode. Setting the file mode to Binary will read content as a byte vector. When reading a file as text , data will be read as strings and be split on newlines. Each string vector represents a single line. Note that reading as Binary will work for both text and binary data, but reading as Text will only work for text data. |
Binary |
Offset | A byte or character offset to start reading at. | 0 |
Chunking | Splits file into smaller batches. Auto will read file size and determine if it should be chunked. |
Auto |
Chunk Size | File size of chunks when chunking is enabled. | 1MB |
Use Authentication | Enable Kubernetes secret authentication. | No |
Kubernetes Secret* | The name of a Kubernetes secret to authenticate with Azure Cloud Storage. |
*
If Use Authentication
is enabled.
Amazon S3 Authentication
To access private buckets or files, a Kubernetes secret needs to be created that contains valid AWS credentials. This secret needs to be created in the same namespace as the kdb Insights Enterprise install. The name of that secret is then used in the Kubernetes Secret field when configuring the reader.
To create a Kubernetes secret containing AWS credentials:
kubectl create secret generic --from-file=credentials=<path/to/.aws/credentials> <secret-name>
Where <path/to/.aws/credentials>
is the path to an AWS credentials file, and <secret-name>
is the name of the Kubernetes secret to create.
Note that this only needs to be done once, and each subsequent usage of the Amazon S3 reader can re-use the same Kubernetes secret.
PostgreSQL
Execute a query on a PostgreSQL database
See APIs for more details
q API: .qsp.read.fromPostgres
•
Python API: kxi.sp.read.from_postgres
Required Parameters:
name | description | default |
---|---|---|
Server | The hostname of the database to issue a query against. | |
Port | The port to use for connecting to the database server. | |
Database | The name of the database to issue a query against in the target server. | |
Query | An SQL query to issue against the target database for pulling data into the pipeline. |
Optional Parameters:
name | description | default |
---|---|---|
Username | The user account to use when establishing a connection to a PostgreSQL server. Note that if the username is provided, a password prompt will appear during deploy time. |
SQL Server
Execute a query on a SQL Server database
See APIs for more details
q API: .qsp.read.fromSQLServer
•
Python API: kxi.sp.read.from_sql_server
Required Parameters:
name | description | default |
---|---|---|
Server | The hostname of the database to issue a query against. | |
Port | The port to use for connecting to the database server. | |
Database | The name of the database to issue a query against in the target server. | |
Query | An SQL query to issue against the target database for pulling data into the pipeline. |
Optional Parameters:
name | description | default |
---|---|---|
Username | The user account to use when establishing a connection to SQL Server instance. Note that if the username is provided, a password prompt will appear during deploy time. |