Skip to content

Stream Processor q API

.qsp.

Configuring operators use modify behavior of an operator

General push publish data to all downstream operators run install and run a pipeline teardown tear down a pipeline configPath return the path of mounted configuration files getPartitions return the current assigned partitions getPartitionCount return the count of all partitions setTrace enables trace logging clearTrace clears trace logging and resets logging level enableDataTracing captures data flowing in a pipeline disableDataTracing disables data tracing in a pipeline clearDataTrace clears the current trace data cache getDataTrace returns a point-in-time data trace capture

Lifecycle finish call finish on an operator finishTask mark a task as finished onError set the onError event handler onCheckpoint set the onCheckpoint handler onOperatorCheckpoint set the onCheckpoint event handler onOperatorPostCheckpoint set the onPostCheckpoint event handler onOperatorRecover set the onRecover event handler onPostCheckpoint set the onPostCheckpoint handler onRecover set the onRecover handler onSetup set the onSetup event handler onStart set the onStart event handler onFinish set the onFinish event handler onTeardown set the onTeardown event handler registerTask register a task for an operator subscribe add a subscriber for an event unsubscribe remove a subscriber or all subscribers

Operators accumulate aggregates a stream into an accumulator apply apply a function to incoming batches in the stream filter filter some or all elements from a batch map apply a function to data passing through the operator merge merge two data streams parallel (Beta) applies multiple functions in parallel over a stream reduce aggregate partial windows rolling (Beta) a moving-window function to a stream split split the current stream sql execute an SQL query on tables in a stream union unite two streams

Readers read.fromAmazonS3 reads data from Amazon Web Services S3 buckets read.fromAzureStorage reads data from Azure Blob Storage read.fromCallback define callback in the q global namespace read.fromExpr evaluate expression or function into the pipeline read.fromFile read file contents into pipeline read.fromKafka consume data from a Kafka topic read.fromGoogleStorage reads data from Google Cloud Storage read.fromPostgres execute a query against a PostgreSQL database read.fromSQLServer execute a query against a SQL Server database read.fromStream read data using a KX Insights stream

Decoders decode.csv parse CSV data to a table decode.arrow (Beta) decode Arrow streams decode.gzip (Beta) decode gzipped data decode.json parse JSON data decode.protobuf parse Protocol Buffer messages

Encoders arrow (Beta) encode a stream as Arrow data csv encode tables as CSV data encode.json encode data in JSON format encode.protobuf encode Protocol Buffer messages

State get cached state of an operator set store state of an operator

Stats (Beta) sma calculate a simple moving average ema calculate an exponential moving average twa calculate a time weighted average

Transform transform.replaceInfinity replaces infinite values with min/max values transform.replaceNull replaces null values with the median value transform.timeSplit decomposes time columns into subdivisions of mins/hours etc. transform.fill (Beta) fill in null values in a table transform.schema transforms data to match a provided schema

Windows window.count aggregate stream into evenly sized windows window.global aggregate stream using a custom trigger function window.sliding aggregate stream in potentially overlapping windows window.timer aggregate stream by processing time window.tumbling aggregate stream into non-overlapping windows

Writers write.toAmazonS3 write to an object in Amazon S3 write.toConsole write to the console write.toDatabase write to KX Insights Database write.toKafka publish data on a Kafka topic write.toKDB (Beta) write data to an on-disk partitioned table write.toProcess write data to a kdb+ process write.toStream write data using a KX Insights stream write.toVariable write to a local variable

Machine Learning

Fresh ml.freshCreate turns batches of data into features based on aggregated statistics

Classification ml.adaBoostClassifier fits an adaBoost classification model ml.decisionTreeClassifier fits a decision tree classification model ml.gaussianNB fits a gaussian naive bayes model ml.kNeighborsClassifier fits a k-nearest neighbors classification model ml.logClassifier fits a logistic classification model using stochastic gradient descent ml.quadraticDiscriminantAnalysis fits a quadratic discriminant analysis model ml.randomForestClassifier fits a random forest classification model

Clustering ml.affinityPropagation fits an affinity propagation clustering model ml.birch fits a BIRCH clustering model ml.cure fits a CURE clustering model ml.dbscan fits a DBSCAN clustering model ml.sequentialKMeans fits a sequential k-means model

Regression ml.adaBoostRegressor fits an adaBoost regression model ml.gradientBoostingRegressor fits a gradient boosting regression model ml.kNeighborsRegressor fits a k-nearest neighbors regression model ml.lasso fits a lasso-linear regression model ml.linearRegression fits a linear regression model ml.randomForestRegressor fits a random forest regression model

Metrics ml.score evaluates a model's predictions

Preprocessing ml.dropConstant drops constant columns from incoming data ml.featureHasher encodes categorical data as numeric vectors ml.labelEncode encodes symbolic data into numerical values ml.minMaxScaler min-max scale a supplied dataset ml.oneHot replaces symbolic values with numerical vector representations ml.standardize standardizes a supplied dataset

Registry ml.registry.fit fits a model to batches of data, saving a model to a registry ml.registry.predict predicts a target variable using a trained model from the registry ml.registry.update trains a model incrementally, returning predictions for all records

Operator syntax

Pipeline API operators are designed to be chained, as in the examples, in a form that will be familiar to users of libraries such as jQuery.

APIs are executed from left to right (or top to bottom when newlines are added) and are designed to be composed for human readability. For example, in the following case, data would be read from Kafka, transformed through a JSON decoder, windowed and written to an Insights stream.

.qsp.run
    .qsp.read.fromKafka[`trades]
    .qsp.decode.json[]
    .qsp.window.tumbling[00:00:05; `time]
    .qsp.write.toStream[]

Implicit last argument

Each .qsp operator returns an object representing a ‘node’ configuration; and takes a node configuration as its last argument. That last argument is left implicit in the API documentation: each operator therefore has a rank higher than documented.

Pipeline API operators are invoked as projections on their implicit last arguments and therefore must be applied using bracket notation only, never prefix.

Implicit penultimate argument

Many Pipeline API operators are variadic; most take a penultimate argument of custom configuration options.

If .qsp.foo is a Pipeline API operator that takes arguments x and y and optionally cco a dictionary of custom configuration options, its true signatures are

.qsp.foo[x;y;node]
.qsp.foo[x;y;cco;node]

To limit duplication in the documentation, neither the cco nor the node arguments are documented. The signature would be documented as

.qsp.foo[x;y]

An important consequence is that

.qsp.foo[x;y]
.qsp.foo[x;y;cco]

are both unary projections.

Order of evaluation

Successive Pipeline API operators modify the node configuration and pass it along the chain for eventual evaluation, reversing the apparent evaluation order.

In

prd
  {x where x>5}
  til 8

q evaluates first til 8, then the lambda, then prd, but in

.qsp.run
  .qsp.read.fromExpr["refData"]
  .qsp.write.toConsole[]

because actual evaluation is handled by .qsp.run, the table refData is read before its contents are written to the console.