Skip to content

Stream Processor q API

.qsp.

Configuring operators use modify behavior of an operator

General push publish data to all downstream operators run install and run a pipeline teardown tear down a pipeline configPath return the path of mounted configuration files getPartitions return the current assigned partitions getPartitionCount return the count of all partitions setTrace enables trace logging clearTrace clears trace logging and resets logging level

Lifecycle finish call finish on an operator finishTask mark a task as finished onError set the onError event handler onCheckpoint set the onCheckpoint handler onOperatorCheckpoint set the onCheckpoint event handler onOperatorPostCheckpoint set the onPostCheckpoint event handler onOperatorRecover set the onRecover event handler onPostCheckpoint set the onPostCheckpoint handler onRecover set the onRecover handler onSetup set the onSetup event handler onStart set the onStart event handler onFinish set the onFinish event handler onTeardown set the onTeardown event handler registerTask register a task for an operator

Operators accumulate aggregates a stream into an accumulator apply apply a function to incoming batches in the stream filter filter some or all elements from a batch map apply a function to data passing through the operator merge merge two data streams reduce aggregate partial windows split split the current stream sql execute an SQL query on tables in a stream union unite two streams

Readers read.fromAmazonS3 reads data from Amazon Web Services S3 buckets read.fromAzureStorage reads data from Azure Blob Storage read.fromCallback define callback in the q global namespace read.fromDatabase query an Insights database read.fromExpr evaluate expression or function into the pipeline read.fromFile read file contents into pipeline read.fromKafka consume data from a Kafka topic read.fromGoogleStorage reads data from Google Cloud Storage read.fromPostgres execute a query against a PostgreSQL database read.fromSQLServer execute a query against a SQL Server database read.fromStream read data using a KX Insights stream

Decoders decode.csv parse CSV data to a table decode.json parse JSON data decode.protobuf parse Protocol Buffer messages

Encoders encode.json encode data in JSON format encode.protobuf encode Protocol Buffer messages

State get cached state of an operator set store state of an operator

Stats (Beta) sma calculate a simple moving average ema calculate an exponential moving average twa calculate a time weighted average

Transform transform.replaceInfinity replaces infinite values with min/max values transform.replaceNull replaces null values with the median value transform.timeSplit decomposes time columns into subdivisions of mins/hours etc. transform.schema transforms data to match a provided schema

Windows window.count aggregate stream into evenly sized windows window.global aggregate stream using a custom trigger function window.sliding aggregate stream in potentially overlapping windows window.timer aggregate stream by processing time window.tumbling aggregate stream into non-overlapping windows

Writers write.toConsole write to the console write.toKafka publish data on a Kafka topic write.toProcess write data to a kdb+ process write.toStream write data using a KX Insights stream write.toVariable write to a local variable

Machine Learning ml.dropConstant drops constant columns from incoming data ml.featureHasher hashes feature names into sparse matrices ml.freshCreate turns batches of data into features based on aggregated statistics ml.labelEncode encodes symbolic data into numerical values ml.linearRegression fit a linear regression model to batches of data ml.logClassifier fits a logistic classification model on batches of data using stochastic gradient descent ml.minMaxScaler min-max scale a supplied dataset ml.oneHot replaces symbolic values with numerical vector representations ml.registry.fit fits a model to batches of data, saving a model to a registry ml.registry.predict predicts a target variable using a trained model from the registry ml.registry.update trains a model incrementally, returning predictions for all records ml.score evaluates a model's predictions ml.sequentialKMeans fits a sequential k means model on batches of data ml.standardize standardize a supplied dataset

Operator syntax

Pipeline API operators are designed to be chained, as in the examples, in a form that will be familiar to users of libraries such as jQuery.

APIs are executed from left to right (or top to bottom when newlines are added) and are designed to be composed for human readability. For example, in the following case, data would be read from Kafka, transformed through a JSON decoder, windowed and written to an Insights stream.

.qsp.run
    .qsp.read.fromKafka[`trades]
    .qsp.decode.json[]
    .qsp.window.tumbling[00:00:05; `time]
    .qsp.write.toStream[]

Implicit last argument

Each .qsp operator returns an object representing a ‘node’ configuration; and takes a node configuration as its last argument. That last argument is left implicit in the API documentation: each operator therefore has a rank higher than documented.

Pipeline API operators are invoked as projections on their implicit last arguments and therefore must be applied using bracket notation only, never prefix.

Implicit penultimate argument

Many Pipeline API operators are variadic; most take a penultimate argument of custom configuration options.

If .qsp.foo is a Pipeline API operator that takes arguments x and y and optionally cco a dictionary of custom configuration options, its true signatures are

.qsp.foo[x;y;node]
.qsp.foo[x;y;cco;node]

To limit duplication in the documentation, neither the cco nor the node arguments are documented. The signature would be documented as

.qsp.foo[x;y]

An important consequence is that

.qsp.foo[x;y]
.qsp.foo[x;y;cco]

are both unary projections.

Order of evaluation

Successive Pipeline API operators modify the node configuration and pass it along the chain for eventual evaluation, reversing the apparent evaluation order.

In

prd
  {x where x>5}
  til 8

q evaluates first til 8, then the lambda, then prd, but in

.qsp.run
  .qsp.read.fromExpr["refData"]
  .qsp.write.toConsole[]

because actual evaluation is handled by .qsp.run, the table refData is read before its contents are written to the console.