Stream Processor q API
.qsp.
Configuring operators use modify behavior of an operator
General push publish data to all downstream operators run install and run a pipeline teardown tear down a pipeline configPath return the path of mounted configuration files getPartitions return the current assigned partitions getPartitionCount return the count of all partitions setTrace enables trace logging clearTrace clears trace logging and resets logging level
Lifecycle finish call finish on an operator finishTask mark a task as finished onError set the onError event handler onCheckpoint set the onCheckpoint handler onOperatorCheckpoint set the onCheckpoint event handler onOperatorPostCheckpoint set the onPostCheckpoint event handler onOperatorRecover set the onRecover event handler onPostCheckpoint set the onPostCheckpoint handler onRecover set the onRecover handler onSetup set the onSetup event handler onStart set the onStart event handler onFinish set the onFinish event handler onTeardown set the onTeardown event handler registerTask register a task for an operator
Operators accumulate aggregates a stream into an accumulator apply apply a function to incoming batches in the stream filter filter some or all elements from a batch map apply a function to data passing through the operator merge merge two data streams reduce aggregate partial windows split split the current stream sql execute an SQL query on tables in a stream union unite two streams
Readers read.fromAmazonS3 reads data from Amazon Web Services S3 buckets read.fromAzureStorage reads data from Azure Blob Storage read.fromCallback define callback in the q global namespace read.fromDatabase query an Insights database read.fromExpr evaluate expression or function into the pipeline read.fromFile read file contents into pipeline read.fromKafka consume data from a Kafka topic read.fromGoogleStorage reads data from Google Cloud Storage read.fromPostgres execute a query against a PostgreSQL database read.fromSQLServer execute a query against a SQL Server database read.fromStream read data using a KX Insights stream
Decoders decode.csv parse CSV data to a table decode.json parse JSON data decode.protobuf parse Protocol Buffer messages
Encoders encode.json encode data in JSON format encode.protobuf encode Protocol Buffer messages
State get cached state of an operator set store state of an operator
Stats (Beta) sma calculate a simple moving average ema calculate an exponential moving average twa calculate a time weighted average
Transform transform.replaceInfinity replaces infinite values with min/max values transform.replaceNull replaces null values with the median value transform.timeSplit decomposes time columns into subdivisions of mins/hours etc. transform.schema transforms data to match a provided schema
Windows window.count aggregate stream into evenly sized windows window.global aggregate stream using a custom trigger function window.sliding aggregate stream in potentially overlapping windows window.timer aggregate stream by processing time window.tumbling aggregate stream into non-overlapping windows
Writers write.toConsole write to the console write.toKafka publish data on a Kafka topic write.toProcess write data to a kdb+ process write.toStream write data using a KX Insights stream write.toVariable write to a local variable
Machine Learning
Fresh ml.freshCreate turns batches of data into features based on aggregated statistics
Classification ml.adaBoostClassifier fits an adaBoost classification model ml.decisionTreeClassifier fits a decision tree classification model ml.gaussianNB fits a gaussian naive bayes model ml.kNeighborsClassifier fits a k-nearest neighbors classification model ml.logClassifier fits a logistic classification model using stochastic gradient descent ml.quadraticDiscriminantAnalysis fits a quadratic discriminant analysis model ml.randomForestClassifier fits a random forest classification model
Clustering ml.affinityPropagation fits an affinity propagation clustering model ml.birch fits a BIRCH clustering model ml.cure fits a CURE clustering model ml.dbscan fits a DBSCAN clustering model ml.sequentialKMeans fits a sequential k-means model
Regression ml.adaBoostRegressor fits an adaBoost regression model ml.gradientBoostingRegressor fits a gradient boosting regression model ml.kNeighborsRegressor fits a k-nearest neighbors regression model ml.lasso fits a lasso-linear regression model ml.linearRegression fits a linear regression model ml.randomForestRegressor fits a random forest regression model
Metrics ml.score evaluates a model's predictions
Preprocessing ml.dropConstant drops constant columns from incoming data ml.featureHasher encodes categorical data as numeric vectors ml.labelEncode encodes symbolic data into numerical values ml.minMaxScaler min-max scale a supplied dataset ml.oneHot replaces symbolic values with numerical vector representations ml.standardize standardizes a supplied dataset
Registry ml.registry.fit fits a model to batches of data, saving a model to a registry ml.registry.predict predicts a target variable using a trained model from the registry ml.registry.update trains a model incrementally, returning predictions for all records
Operator syntax
Pipeline API operators are designed to be chained, as in the examples, in a form that will be familiar to users of libraries such as jQuery.
APIs are executed from left to right (or top to bottom when newlines are added) and are designed to be composed for human readability. For example, in the following case, data would be read from Kafka, transformed through a JSON decoder, windowed and written to an Insights stream.
.qsp.run
.qsp.read.fromKafka[`trades]
.qsp.decode.json[]
.qsp.window.tumbling[00:00:05; `time]
.qsp.write.toStream[]
Implicit last argument
Each .qsp
operator returns an object representing a ‘node’ configuration;
and takes a node configuration as its last argument. That last argument is left implicit in the API documentation: each operator therefore has a rank higher than documented.
Pipeline API operators are invoked as projections on their implicit last arguments and therefore must be applied using bracket notation only, never prefix.
Implicit penultimate argument
Many Pipeline API operators are variadic; most take a penultimate argument of custom configuration options.
If .qsp.foo
is a Pipeline API operator that takes arguments x
and y
and optionally cco
a dictionary of custom configuration options, its true signatures are
.qsp.foo[x;y;node]
.qsp.foo[x;y;cco;node]
To limit duplication in the documentation, neither the cco
nor the node
arguments are documented. The signature would be documented as
.qsp.foo[x;y]
An important consequence is that
.qsp.foo[x;y]
.qsp.foo[x;y;cco]
are both unary projections.
Order of evaluation
Successive Pipeline API operators modify the node configuration and pass it along the chain for eventual evaluation, reversing the apparent evaluation order.
In
prd
{x where x>5}
til 8
q evaluates first til 8
, then the lambda, then prd
, but in
.qsp.run
.qsp.read.fromExpr["refData"]
.qsp.write.toConsole[]
because actual evaluation is handled by .qsp.run
, the table refData
is read before its contents are written to the console.