Writers
This page presents methods to store outflowing data from a pipeline using writers.
Writers are a specialized type of operator that allow users to push data from a streaming pipeline to different external data sources.
Amazon S3
Writes data to an object in an Amazon S3 bucket.
.qsp.write.toAmazonS3[path; isComplete; region]
.qsp.write.toAmazonS3[path; isComplete; region; .qsp.use (!) . flip (
(`tenant ; tenant);
(`credentials ; credentials);
(`onTeardown ; onTeardown))]
Parameters:
name | type | description | default |
---|---|---|---|
path | string or symbol or function | The path of an object to write to S3. A function can also be provide that can dynamically set the file path for a given batch of data. | Required |
isComplete | :: or function | A binary function that accepts a metadata dictionary as its first argument and data as its second. The return is a boolean to indicate if the upload of the current file should be marked as completed. This will cause any further writes to the given file path to fail. Writes will finish when the first non-partial batch is received for a given path. | Required |
region | string | The AWS region to authenticate against. | us-east-1 |
options:
name | type | description | default |
---|---|---|---|
tenant | string | The authentication tenant | |
domain | string | The name of your domain (private config) | "" |
credentials | string | Name of a credentials secret to mount. See the authentication section below for details | "" |
onTeardown | symbol | For an in-flight uploads at the time of upload, this indicates the behavior. By default, all in-flight uploads will be left pending (none ). This is useful for pipelines that need to resume an upload between teardowns (ex. an upgrade). Alternatively, you can either chose to abort any pending uploads or complete any pending uploads |
none |
For all common arguments, refer to configuring operators
This operator reads an object from an Amazon Web Services S3 bucket as either text or binary.
Teardown Behavior
By default, teardown will leave pending uploads queued so that when the pipeline is restarted the pending uploads are resumed. This behaviour is useful for allowing pipelines to stop and restart for regularly scheduled maintenance (i.e. version upgrades). When an upload remains in a pending state, it will consume resources in Amazon S3 as a partial file upload. By default, Amazon will expunge these partial uploads after 7 days.
If this behavior is undesired, the teardown behavior can be modified to be either abort
or complete
.
If marked as abort
, any pending uploads will be terminated. If the upload is resumed, any already processed
data will be lost. If marked as complete
, any pending uploads will be pushed to S3. However, if the pipeline
was to resume, it will start overwriting the previous object with a new object. It is recommended to only
use the complete
behavior when a path
function is provided that will ensure a new file path is used
when resuming.
Examples
Uploading a local file:
Permissions
When running this example inside a kxi-sp-worker
image, you must first run
system "cd /tmp"
to avoid encountering permissions errors.
t:([] date: .z.d; name: 1000?10?`3; id: 1000?0ng; units: 1000?100f; price: 1000?1000f);
`:data.csv 0: csv 0: t;
.qsp.run
.qsp.read.fromFile[`:data.csv]
.qsp.write.toAmazonS3[`:s3://mybucket/file.csv; ::; "us-west-1"]
Uploading a stream:
The second argument to `write.toAmazonS3` indicates that data should not be uploaded
until an empty batch is received in the stream.
.qsp.run
.qsp.read.fromCallback[`pub]
.qsp.write.toAmazonS3[`:s3://mybucket/file.txt; ::; "us-west-1"; {[md;data] 0 ~ count data}]
pub "Lorem ipsum dolor sit amet, consectetur adipiscing elit, "
pub "sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. "
pub "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat."
At this point, the chunks have been buffered and are waiting to be converted to
an S3 object. Flush the file and write it to S3.
pub ()
sp.write.to_amazon_s3('s3://mybucket/file.csv', region='us-east-1')
Parameters:
name | type | description | default |
---|---|---|---|
path | string or symbol or function | The path to write objects to in S3 or a function to change the path for each message that is processed in the stream. If a function is provided, the return must be a string of the file path to write the current batch of data to. When a function is provided, message data is the argument by default, however parameters can be configured to include metadata. | Required |
is_complete | :: or function | A binary function that accepts a metadata dictionary as the first argument and data as its second. This function is invoked on each message that is processed and must return a boolean indicating if the current file processing is complete. | Required |
region | string | The AWS region to authenticate against. | us-east-1 |
options:
name | type | description | default |
---|---|---|---|
tenant | string | The authorization tenant. | "" |
domain | string | A custom Amazon S3 domain. | "" |
credentials | string | The secret name for the Amazon S3 credentials. Refer to the authentication section below for more information. | "" |
on_teardown | symbol | An AmazonS3Teardown behavior indicating what to do with any partial uploads that are incomplete when a teardown occurs. |
none |
Returns:
A to_amazon_s3
writer, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.encode.csv(',')
| sp.write.to_amazon_s3('s3://mybucket/file.csv', region='us-east-1'))
>>> data = pd.DataFrame({
'name': ['a', 'b', 'c'],
'id': [1, 2, 3],
'quantity': [4, 5, 6]
})
>>> kx.q('publish', data)
name,id,quantity
a,1,4
b,2,5
c,3,6
Authentication
The S3 writer uses kurl
for credential discovery.
See credential discovery
in the AWS tab for more information.
Environment variable authentication:
To setup authentication using environment variables, set AWS_ACCESS_KEY_ID
and
AWS_SECRET_ACCESS_KEY
.
For Docker based configurations, set the variables in the worker image configuration.
docker-compose
:
version: "3.3"
services:
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.13.0
environment:
AWS_ACCESS_KEY_ID: "abcd"
AWS_SECRET_ACCESS_KEY: "iamasecret"
Docker credentials file for authentication:
To load a custom configuration file into a Docker deployment, mount the credentials file
directory into the container and set KXI_SP_CONFIG_PATH
to point to the configuration
directory.
version: "3.3"
services:
worker:
image: portal.dl.kx.com/kxi-sp-worker:1.13.0
volumes:
- $HOME/.aws/:/config/awscreds
environment:
KXI_SP_CONFIG_PATH: "/config"
Next, add the secret name to the S3 reader configuration
.qsp.write.toAmazonS3[`:s3://bucket/hello; ::; "us-east-1"; .qsp.use``credentials!``awscreds]
Now deploying the pipeline will read the credentials from the AWS credentials file. Note
that the credentials file name must be credentials
.
For Kubernetes deployments, environment variables can be passed via the REST request sent when launching a job.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "s3-writer",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
env : { AWS_ACCESS_KEY_ID: "abcd", AWS_SECRET_ACCESS_KEY: "iamasecret" }
}' | jq -asR .)"
Kubernetes secrets for authentication:
When using a Kubernetes deployment, Kubernetes secrets can be used to install credentials into the worker.
First, create a secret using an AWS credentials file. Take care to ensure the secret is created in the correct namespace.
kubectl create secret generic --from-file credentials=$HOME/.aws/credentials awscreds
Note that the secret must be formatted so that the variable names are lower case and there is a space either side of the equals sign like so:
[default]
aws_access_key_id = abcd1234
aws_secret_access_key = abcd1234
Next, add the secret name to the S3 reader configuration
.qsp.write.toAmazonS3[`:s3://bucket/hello; ::; "us-west-1"; .qsp.use``credentials!``awscreds]
Lastly, when deploying the worker, add a secret to the Kubernetes configuration in the request.
curl -X POST http://localhost:5000/pipeline/create -d \
"$(jq -n --arg spec "$(cat spec.q)" \
'{
name : "s3-writer",
type : "spec",
config : { content: $spec },
settings : { minWorkers: "1", maxWorkers: "10" },
kubeConfig : { secrets : ["awscreds"] }
}' | jq -asR .)"
Console
Writes to the console.
.qsp.write.toConsole[]
.qsp.write.toConsole[prefix]
.qsp.write.toConsole[prefix; .qsp.use (!) . flip (
(`split ; split);
(`timestamp; timestamp);
(`qlog ; qlog))]
Parameters:
name | type | description | default |
---|---|---|---|
prefix | string | A prefix for output messages. | "" |
options:
name | type | description | default |
---|---|---|---|
split | boolean | Controls how vectors are printed (see below). | 0b |
timestamp | symbol | Either local ,utc , ` for no timestamp, or . to use the UTC time when qlog is false, and no timestamp when it is true. |
`. |
qlog | boolean | Prints all console logs to a QLog stream. | 0b |
For all common arguments, refer to configuring operators
This operator adds a console writer to the current stream.
A console writer outputs content to standard out on the current process prefixed with the event timestamp.
Splitting output:
By default, all vectors (lists of the same type) are printed on a single line. By setting the
split
option to 1b
, vectors will be printed on separate lines. Lists of mixed types are
always printed on separate lines.
Examples:
Basic writer:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toConsole[];
Callback takes lists of messages, so enlist the single message
publish enlist "hi!"
2021.03.08D19:19:08.171805000 | "hi!"
Write with prefix:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toConsole["INFO: "];
publish enlist "hi!"
INFO: 2021.03.08D19:19:08.171805000 | "hi!"
Write with split vectors:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toConsole[.qsp.use ``split!(::;1b)];
publish enlist 1 2 3 4
2021.09.30D14:01:50.359012847 | 1
2021.09.30D14:01:50.359012847 | 2
2021.09.30D14:01:50.359012847 | 3
2021.09.30D14:01:50.359012847 | 4
Write output to QLog format
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toConsole[.qsp.use ``qlog!(::;1b)]
publish ("hello"; "world")
publish "==="
publish ([] date: .z.p+til 2; open: 2?100f; high: 2?100f; low:2?100f; close: 2?100f)
2021-10-04 11:35:33.189 [console_f08427d1] INFO "hello"
2021-10-04 11:35:33.189 [console_f08427d1] INFO "world"
2021-10-04 11:35:33.189 [console_f08427d1] INFO "==="
2021-10-04 11:36:12.193 [console_f08427d1] INFO date open high low close
2021-10-04 11:36:12.193 [console_f08427d1] INFO -----------------------------------------------------------------
2021-10-04 11:36:12.194 [console_f08427d1] INFO 2021.10.04D11:36:12.193211900 74.84699 20.68577 85.28164 46.82838
2021-10-04 11:36:12.194 [console_f08427d1] INFO 2021.10.04D11:36:12.193211901 97.91903 36.07874 69.44221 4.251175
sp.write.to_console()
Parameters:
name | type | description | default |
---|---|---|---|
prefix | string | A prefix for each line of output. | "" |
options:
name | type | description | default |
---|---|---|---|
split | boolean | Whether a vector should be printed on a single line, or split across multiple lines with one element per line. | 0b |
timestamp | symbol | A ConsoleTimestamp enum value, or the string equivalent. |
`. |
qlog | boolean | Whether the output should be through qlog. | 0b |
Returns:
A to_console
writer, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_console(timestamp = 'none'))
>>> data = pd.DataFrame({
'name': ['a', 'b', 'c'],
'id': [1, 2, 3],
'quantity': [4, 5, 6]
})
>>> kx.q('publish', data)
name id quantity
----------------
a 1 4
b 2 5
c 3 6
Kafka
Publishes data on a Kafka topic.
.qsp.write.toKafka[topic]
.qsp.write.toKafka[topic; brokers]
.qsp.write.toKafka[topic; brokers; .qsp.use (!) . flip (
(`retries ; retries);
(`retryWait ; retryWait);
(`options ; options);
(`topicConfig ; topicConfig);
(`registry ; registry);
(`subject ; subject);
(`autoRegister; autoRegister);
(`schemaType ; schemaType))]
Parameters:
name | type | description | default |
---|---|---|---|
topic | symbol | A Kafka topic name to publish on. | ` |
brokers | string or string[] | One or more brokers as host:port. | "localhost:9092" |
options:
name | type | description | default |
---|---|---|---|
retries | long | Max retry attempts for Kafka API calls. | 10 |
retryWait | timespan | Period to wait between retry attempts. | 0D00:00:02 |
options | dictionary | A dictionary of Kafka producer configuration options | ()!() |
topicConfig | dictionary | A dictionary of Kafka topic configuration options. | ()!() |
registry | string | Optional URL to a Kafka Schema Registry. When provided, Kafka Schema Registry mode is enabled, allowing for automatic payload encoding. | "" |
subject | string | A Kafka subject to read schemas from and publish schemas to. If none is provided, uses " |
"" |
autoRegister | boolean | Controls whether or not to generate and publish schemas automatically. | 0b |
schemaType | string | Schema type to generate, one of: "JSON", "PROTOBUF", "AVRO" | "JSON" |
For all common arguments, refer to configuring operators
This operator acts as a Kafka producer, pushing data to a Kafka topic.
A Kafka producer will publish data to a Kafka broker which can then be consumed by any downstream listeners. All data published to Kafka must be encoded as either strings or serialized as bytes. If data reaches the Kafka publish point that is not encoded, it is converted to q IPC serialization representation. Data can also be linked to Kafka Schema Registry to use automatic type conversion.
Beside the above keys, all available Kafka producer
configuration options
are supported using the options
dictionary.
This includes properties such as socket.timeout.ms
, fetch.message.max.bytes
, etc.
To configure the Kafka topic, you should use the topic configuration options.
AVRO is not supported
Encoding messages with AVRO using the Kafka Schema Registry integration is currently not supported.
Reserved keys
metadata.broker.list
and producer.id
are reserved values and are maintained
directly by the Kafka writer. The Kafka writer will error if these options are used.
Localhost broker:
Write data from a function called 'publish' and write to a Kafka topic
'words' with a default broker at 'localhost:9092'.
.qsp.run .qsp.read.fromCallback[`publish] .qsp.write.toKafka[`words]
Both 'Hello' and 'World' are sent to the Kafka topic as independent events.
publish "Hello"
publish "World"
Multiple brokers:
Write data from a function called 'publish' and write to a Kafka topic
'words' to multiple Kafka brokers.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toKafka[`words; ("localhost:1234"; "localhost:1235")]
Both 'Hello' and 'World' are sent to the Kafka topic as independent events.
publish "Hello"
publish "World"
Advanced producer configuration:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toKafka[.qsp.use (!) . flip (
(`topic ; `words);
(`options ; (!) . flip (
(`socket.timeout.ms ; 60000);
(`socket.send.buffer.bytes ; 4194304);
(`max.in.flight ; 100000)
));
(`topicConfig ; (!) . flip (
(`compression.type; `gzip);
(`delivery.timeout.ms; `100000)
))
)];
Publish 'Hello' and 'World' as a single batch publish
publish ("Hello"; "World")
Kafka Schema Registry with existing schema:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toKafka[.qsp.use (!) . flip (
(`topic ; `words);
(`brokers ; "localhost:9092");
(`registry ; "http://localhost:8081"))];
Kafka Schema Registry with generated schema:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toKafka[.qsp.use (!) . flip (
(`topic ; `words);
(`brokers ; "localhost:9092");
(`registry ; "http://localhost:8081");
(`autoRegister ; 1b);
(`schemaType ; "JSON"))];
Kafka Schema Registry with custom subject:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toKafka[.qsp.use (!) . flip (
(`topic ; `words);
(`brokers ; "localhost:9092");
(`registry ; "http://localhost:8081");
(`subject ; "custom-subject"))];
Writing to explicit partitions
Future versions of the Kafka writer will allow writing to explicit partitions/keys.
However, the current Kafka writer will write to unassigned partitions without using an explicit key.
sp.write.to_kafka('topicName')
Reserved keys
metadata.broker.list
and producer.id
are reserved values and are maintained directly by
the Kafka writer. The Kafka writer will error if these options are used.
Kafka configuration options
All available Kafka producer
configuration options
are supported using the options
dictionary.
This includes properties such as socket.timeout.ms
, fetch.message.max.bytes
, etc.
To configure the Kafka topic, you should use the topic configuration options.
Parameters:
name | type | description | default |
---|---|---|---|
topic | symbol | The name of a topic. | ` |
brokers | string or string[] | Brokers identified a 'host:port' string, or a list of 'host:port' strings. |
"localhost:9092" |
options:
name | type | description | default |
---|---|---|---|
retries | long | Maximum number of retries that will be attempted for Kafka API calls. | 10 |
retry_wait | timespan | How long to wait between retry attempts. | 0D00:00:02 |
options | dictionary | Dictionary of Kafka publisher configuration options | ()!() |
topic_config | dictionary | Dictionary of Kafka topic configuration options. | ()!() |
registry | string | Optional URL to a Kafka Schema Registry. When provided, Kafka Schema Registry mode is enabled, allowing for automatic payload encoding. | "" |
subject | string | A Kafka subject to read schemas from and publish schemas to. If none is provided, uses " |
"" |
autoRegister | boolean | Controls whether or not to generate and publish schemas automatically. | 0b |
schemaType | string | Schema type to generate, one of: "JSON", "PROTOBUF", "AVRO". | "JSON" |
Returns:
A to_kafka
writer, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_kafka('topicName'))
>>> data = pd.DataFrame({
'name': ['a', 'b', 'c'],
'id': [1, 2, 3],
'quantity': [4, 5, 6]
})
>>> kx.q('publish', data)
"name,id,quantity"
"a,1,4"
"b,2,5"
"c,3,6"
Advanced Kafka configuration
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_kafka('topicName', options = {
'socket.timeout.ms': 60000,
'socket.send.buffer.bytes': 4194304,
'max.in.flight': 100000
},
topic_config = {
'compression.type': 'gzip',
'delivery.timeout.ms': 100000
}
))
>>> data = pd.DataFrame({
'name': ['a', 'b', 'c'],
'id': [1, 2, 3],
'quantity': [4, 5, 6]
})
>>> kx.q('publish', data)
"name,id,quantity"
"a,1,4"
"b,2,5"
"c,3,6"
kdb Insights Database
Writes data to a kdb Insights Database.
Multiple database writers
A single pipeline with multiple database writers using direct write is not currently supported. This will be resolved in a future release.
v2
.qsp.v2.write.toDatabase[table; .qsp.use (!) . flip (
(`database ; database);
(`deduplicate; deduplicate);
(`directWrite; directWrite);
(`overwrite ; overwrite))]
.qsp.v2.write.toDatabase[table; .qsp.use (!) . flip (
(`target ; target);
(`deduplicate; deduplicate);
(`directWrite; directWrite);
(`overwrite ; overwrite))]
Parameters:
name | type | description | required | default |
---|---|---|---|---|
table | symbol | Table in kdb Insights Database. | Y |
options:
name | type | description | required | default |
---|---|---|---|---|
database | symbol or string | kdb Insights Database name. The service endpoint is created by appending '-sm:10001' to the database name. | Required if target is null or empty. |
|
target | symbol or string | The target name. Fully qualified service address in the format {host}:{port}. | Required if database is null or empty. |
|
retries | long | Maximum number of retries that will be attempted for establishing the initial connection, or reconnecting after the connection has been lost. | N | 60 |
retryWait | timespan | How long to wait between retry attempts. | N | 0D00:00:03 |
deduplicate | boolean | If the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events. |
N | 1b |
directWrite | boolean | Write data directly to the database instead of streaming. Direct table writing is useful for large backfill ingestion where data is mostly historical. When using direct write, the system will use significantly less resources but the data will not be available for query until the entire ingest is complete. At present direct write is only intended for writing to partitioned tables. Attempts to write other table types will error. | N | 0b |
overwrite | boolean | When using direct write, indicates if data should overwrite existing date partitions. If disabled, data will be merged with existing data. | N | 0b |
For all common arguments, refer to configuring operators
Examples
Use streaming mode to write to a specified database.
The following examples assume a package called taq
has been deployed with a matching database name.
The database name resolves to to taq-sm:10001
to match the runtime service.
quotes: flip `time`ticker`bid`bidSize`ask`askSize!enlist each (.z.P-1D; `test; 99.9; 100i; 100.1; 200i)
.qsp.run
.qsp.read.fromExpr["quotes"]
.qsp.v2.write.toDatabase[`quotes; `taq];
Write directly to the database using the specified target.
Set target
to be the fully qualified name of the target service.
Set directWrite=True
to write quotes
data directly to the database.
quotes: flip `time`ticker`bid`bidSize`ask`askSize!enlist each (.z.P-1D; `test; 99.9; 100i; 100.1; 200i)
.qsp.run
.qsp.read.fromExpr["quotes"]
.qsp.v2.write.toDatabase[`quotes; `taq-sm:10001; .qsp.use enlist[`directWrite]!enlist 1b];
Using the database and target parameters
These parameters specify the endpoint for the running database.
This is used to discover required information. The database
parameter is usually used in
kdb Insights Enterprise and generally matches the package name being deployed.
It derives an endpoint from the package name, e.g. a package called demo
would derive an endpoint
of demo-sm:10001
which should match the running database. The target
parameter is generally
used for kdb Insights SDK deployments and should be the fully qualified endpoint of the
database service, e.g. demo-sm:10001
.
For more information on using the database writer in an end-to-end deployment, see either the kdb Insights SDK example or the Enterprise example.
Operator versions
This operator supports two versions; v1 (default) and v2. You can switch between versions using the api_version
parameter.
Unbounded input
The v2 writer provides much improved functionality when dealing with unbounded data streams. However it requires you to manually trigger writedowns. For more detailed information see the Database Ingest page.
sp.write.to_database(table='trade', database='taq', api_version=2)
name | type | description | required | default |
---|---|---|---|---|
table | CharString | Table in kdb Insights Database. | Y | None |
database | CharString | kdb Insights Database name. The service address is created by appending '-sm:10001' to the database name. | Required if target is None |
None |
target | CharString | Fully qualified service address in the format {host}:{port}. | Required if database is None |
None |
retries | pykx.LongAtom, pykx.IntAtom, int | Maximum number of retries that will be attempted for establishing the initial connection, or reconnecting after the connection has been lost. | N | None |
retry_wait | Timedelta | How long to wait between retry attempts. | N | None |
deduplicate | pykx.BooleanAtom, bool | If the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events. |
N | None |
directWrite | pykx.BooleanAtom, bool | Write data directly to the database instead of streaming. Direct table writing is useful for large backfill ingestion where data is mostly historical. When using direct write, the system will use significantly less resources but the data will not be available for query until the entire ingest is complete. At present direct write is only intended for writing to partitioned tables. Attempts to write other table types will error. | N | None |
overwrite | pykx.BooleanAtom, bool | When using direct write, indicates if data should overwrite existing date partitions. If disabled, data will be merged with existing data. | N | False |
api_version | int | Used to distinguish version of operator. | Y | None |
Returns:
A to_database
writer, which can be joined to other operators or pipelines.
Examples:
Write to database using v2 and database
parameter.
This example assumes a package called taq
has been deployed with a matching database name.
The database name resolves to to taq-sm:10001
to match the deployed service endpoint.
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> kx.q('quotes:flip `time`ticker`bid`bidSize`ask`askSize!enlist each (.z.P-1D; `test; 99.9; 100i; 100.1; 200i)')
>>> sp.run(sp.read.from_expr('quotes')
| sp.write.to_database(table='trade', database='taq', api_version=2))
Write to database using v2 API and target.
Substitute service-sm:10001
with the fully qualified name of the target service.
Set directWrite=True
to write directly to the database.
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> kx.q('quotes:flip `time`ticker`bid`bidSize`ask`askSize!enlist each (.z.P-1D; `test; 99.9; 100i; 100.1; 200i)')
>>> sp.run(sp.read.from_expr('quotes')
| sp.write.to_database(table='tableName', target='service-sm:10001', directWrite=True, api_version=2))
Trigger a write-down using v2 and sp.trigger_write
.
>>> from kxi import sp
>>> import pykx as kx
>>> kx.q('quotes:flip `time`ticker`bid`bidSize`ask`askSize!enlist each (.z.P-1D; `test; 99.9; 100i; 100.1; 200i)')
>>> sp.run(sp.read.from_expr('quotes')
| sp.write.to_database(table='tableName', target='service-sm:10001', directWrite=True, api_version=2, name='dbWriter'))
>>> sp.trigger_write(["dbWriter"])
For more information on using the database writer in an end-to-end deployment, see either the kdb Insights SDK example or the Enterprise example.
v1
.qsp.write.toDatabase[table; assembly]
.qsp.write.toDatabase[table; assembly; .qsp.use (!) . flip (
(`retries ; retries);
(`retryWait ; retryWait);
(`timeout ; timeout);
(`deduplicate; deduplicate);
(`directWrite; directWrite);
(`overwrite ; overwrite);
(`retries ; retries);
(`retryWait ; retryWait);
(`mountName ; mountName);
(`statusTable; statusTable))]
Parameters:
name | type | description | default |
---|---|---|---|
table | symbol | A table name. Note that at present if using directWrite table name must be a symbol. |
Required |
assembly | symbol or string | The assembly name | Required, unless the pipeline has been deployed as part of an assembly, in which case the pipeline will default to writing to that assembly. |
options:
name | type | description | default |
---|---|---|---|
retries | long | Number of times to retry the connection to the storage manager. | 60 |
retryWait | timespan | Time to wait between storage manager connection attempts. | 0D00:00:03 |
timeout | int | Timeout value | 0Ni (no timeout) |
deduplicate | boolean | If the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events. |
1b |
directWrite | boolean | Bypass RDB & IDB and write to HDB directly for historical data. Direct table writing is useful for large backfill ingestion where data is mostly historical. When using direct write, the system will use significantly less resources but the data will not be available for query until the entire ingest is complete. At present direct write is only intended for writing to partitioned tables. Attempts to write other table types will error. | 0b |
overwrite | boolean | When using direct write, indicates if data should overwrite existing date partitions. If disabled, data will be merged with existing data. | 1b |
mountName | symbol | Name of mount. | hdb |
statusTable | symbol | If using direct write with an unbounded stream, this is the name of a table to store session statuses. | ` |
For all common arguments, refer to configuring operators
This operator writes data using kdb Insights Reliable Transport streams, except when doing a direct write.
Automatic Assembly Discovery
If the assembly
parameter is not set, and the pipeline is deployed as part of an assembly,
the assembly will be automatically set to the assembly containing the pipeline.
kdb Insights Enterprise only
The .qsp.write.toDatabase
writer operates only within kdb Insights Enterprise deployments of
the Stream Processor at this time. This functionality does not operate within the kdb Insights
Stream Processor Microservice. It is dependant on the ability to query deployed assemblies within
Kubernetes.
directWrite
should not to be used with a reader using the watch
option
When using the directWrite
option it is not possible to pair this with a reader using the watch
option since directWrite
relies on a definitive finish point which is not guaranteed when using a watcher.
Basic example publishing streaming data:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toDatabase[`quotes; `assembly];
quotes: flip `time`ticker`bid`bidSize`ask`askSize!enlist each (.z.P; `test; 99.9; 100i; 100.1; 200i)
publish quotes
Using directWrite
for historical data:
quotesOld: flip `time`ticker`bid`bidSize`ask`askSize!enlist each (.z.P-1D; `test; 99.9; 100i; 100.1; 200i)
quotesNew: flip `time`ticker`bid`bidSize`ask`askSize!enlist each (.z.P; `test2; 99.9; 100i; 100.1; 200i)
quotes: quotesOld, quotesNew;
.qsp.run
.qsp.read.fromExpr["quotes"]
.qsp.write.toDatabase[`quotes; `assembly; .qsp.use enlist[`directWrite]!enlist 1b];
// quotesOld will be written to hdb directly bypassing RDB & IDB.
// quotesNew will be treated as stream data as it will be newer than the HDB purview.
Direct write purviews
When using direct write, data that is older than the purview of the HDB will go directly to disk but will not be available for query until the pipeline completes. All other data will be streamed to the database and be ready for query immediately.
Direct write compatibility
The direct write functionality is intended for batch processing only. As such when
using direct write it should be paired with a reader operator with a finish point.
Examples would include - read from file, read from cloud storage, read from database
or read from expression. Reading from continuous streaming sources is not supported.
Some reader operators support partitioning and will automatically scale up the number
of workers. Using this with direct write results in undefined behavior. Therefore set
the maxWorkers
setting to 1
to ensure no scaling.
Direct write overwrite
When using direct write, overwrite is enabled by default. This is to preserve the old behavior of having each date partition in the ingest data overwrite the existing date partition. In a future release, this will be disabled by default making merge the default option. If you intend to continue to use the overwrite behavior, it is recommended that you set it explicitly in your pipeline configuration.
statusTable
statusTable
is required when using direct write in an unbounded stream.
When using direct write in an unbounded stream, data which has been received but not yet written down is organized into a session. Triggering a writedown will write an entire session to the database.
If batches come in with no kx_DWSessionID field in the metadata, they will all be staged as a single session, with an automatically generated session ID, prefixed with "kx_default_", that gets updated after each writedown.
If batches come in with the kx_DWSessionID
field in the metadata set to a symbol,
they will be staged under this session ID and writedown messages must include the
kx_DWSessionID
field in the metadata, indicating which session ID to write down.
Without a finish event to trigger the writedown, writedowns must be triggered manually by
sending a message with the kx_DWWritedown
field in its metadata set to 1b.
Warnings:
- When kx_DWWritedown
is present, the data for that batch is ignored.
- session IDs cannot be reused. Reusing session ID will result in those sessions being discarded.
- This symbol must only contain characters valid in a linux file name.
- The overwrite option must be false when using statusTable
.
- keyed streams are unsupported and will be treated as unkeyed streams.
The status of completed sessions will be written to the status table, which must be defined, and must have the schema ([] sessionID: string; isComplete: boolean). Once the direct write is complete, the status table will be updated to include that session ID, with isComplete set to 1b.
The statusTable
field can only be used with direct write.
Operator versions
This operator supports two versions; v1 (default) and v2. You can switch between versions using the api_version
parameter.
sp.write.to_database('tableName', 'assembly')
Parameters:
name | type | description | required | default |
---|---|---|---|---|
table | CharString | Table in kdb Insights Database. | Y | None |
assembly | CharString | Assembly defining kdb Insights Database. When a pipeline is deployed within an assembly this parameter will default to the name of that assembly. | Y | None |
retries | pykx.LongAtom, pykx.IntAtom, int | Maximum number of retries that will be attempted for establishing the initial connection, or reconnecting after the connection has been lost. | N | None |
retry_wait | Timedelta | How long to wait between retry attempts. | N | None |
timeout | pykx.IntAtom, int | Timeout value. | N | None |
deduplicate | pykx.BooleanAtom, bool | If the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events. |
N | None |
directWrite | pykx.BooleanAtom, bool | Write data directly to the database instead of streaming. Direct table writing is useful for large backfill ingestion where data is mostly historical. When using direct write, the system will use significantly less resources but the data will not be available for query until the entire ingest is complete. At present direct write is only intended for writing to partitioned tables. Attempts to write other table types will error. | N | None |
overwrite | pykx.BooleanAtom, bool | When using direct write, indicates if data should overwrite existing date partitions. If disabled, data will be merged with existing data. | N | True |
mountName | CharString | Name of mount. | N | None |
statusTable | CharString | Required when using direct write with an unbounded stream, this is the name of a table to store session statuses. | N | None |
api_version | int | Used to distinguish version of operator. | N | None |
Returns:
A to_database
writer, which can be joined to other operators or pipelines.
Examples:
Write to database using v1 streaming mode.
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> kx.q('quotes:flip `time`ticker`bid`bidSize`ask`askSize!enlist each (.z.P-1D; `test; 99.9; 100i; 100.1; 200i)')
>>> sp.run(sp.read.from_expr('quotes')
| sp.write.to_database('tableName', 'assembly'))
kdb Insights Stream
Writes data using a kdb Insights Stream.
.qsp.write.toStream[table]
.qsp.write.toStream[table; stream]
.qsp.write.toStream[table; stream; .qsp.use (!) . flip (
(`prefix ; prefix);
(`deduplicate; deduplicate))]
Parameters:
name | type | description | default |
---|---|---|---|
table | symbol or string | A table name. | Required |
stream | symbol or string | The outbound stream | $RT_PUB_TOPIC |
options:
name | type | description | default |
---|---|---|---|
prefix | string | Stream prefix for URL mapping | $RT_TOPIC_PREFIX |
deduplicate | boolean | If the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events. |
1b |
For all common arguments, refer to configuring operators
This operator writes data using the kdb Insights Stream.
Note
Writing to a kdb Insights Streams facilitates two principal use cases, chaining of pipelines and the ability to write to a database. If explicitly intending to write to a database however it is strongly suggested that you use Database Writer as this includes functionality for the validation of incoming schemas.
sp.write.to_stream('eqOrder')
Stream versus Writer
This functionality will write directly to a reliable transport stream. If wishing to persist
to a database you should consider using the to_database
writer.
Parameters:
name | type | description | default |
---|---|---|---|
table | symbol or string | Name of the table to filter the stream on. By default, no filtering is performed. | Required |
stream | symbol or string | Name of stream to publish to. By default, the stream specified by the $RT_PUB_TOPIC environment variable is used. |
$RT_PUB_TOPIC |
options:
name | type | description | default |
---|---|---|---|
prefix | string | Prefix to add to the hostname for RT cluster. By default, the prefix given by the $RT_TOPIC_PREFIX environment variable is used. |
$RT_TOPIC_PREFIX |
assembly (DEPRECATED) | Kdb Insights assembly to write to. By default, no assembly is used. | ||
insights (DEPRECATED) | Whether the stream being published to uses Insights message formats. | ||
deduplicate | boolean | Whether the outbound stream should drop duplicate messages that may have been created during a failure event. If enabled, the pipeline must produce deterministic data. If $KXI_ALLOW_NONDETERMINISM is set, this value will be forced to be false, potentially resulting in duplicate data after failure recover events. |
1b |
Returns:
A to_stream
writer, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_stream('tableName'))
>>> data = pd.DataFrame({
'name': ['a', 'b', 'c'],
'id': [1, 2, 3],
'quantity': [4, 5, 6]
})
>>> kx.q('publish', data)
name id quantity
----------------
a 1 4
b 2 5
c 3 6
KDB
(Beta Feature) Writes data to an on-disk partition table.
Beta Features
To enable beta features, set the environment variable KXI_SP_BETA_FEATURES
to true
. See Beta Feature Usage Terms.
.qsp.write.toKDB[path;prtnCol;table]
Parameters:
name | type | description | default |
---|---|---|---|
path | symbol | The database path. | Required |
prtnCol | symbol | The name of the partition column. | Required |
table | symbol | The name of the table. | Required |
For all common arguments, refer to configuring operators
This operator writes an on-disk partition table.
Writing keyed columns
Plugin supports keyed columns but writes to disk as an unkeyed table.
Stream data into a table:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toKDB[`db;`date;`myTable];
publish ([] date: 2022.01.01 2022.01.02 2022.01.03 2022.01.04 2022.01.05; open: 1 2 3 4 5; close: 6 7 8 9 10);
sp.write.to_kdb('database', 'date', 'trade')
Parameters:
name | type | description | default |
---|---|---|---|
path | symbol | The database path. | Required |
prtn_col | symbol | The name of the partition column. | Required |
table | symbol | The name of the table. | Required |
Returns:
A to_kdb
writer, that writes an on-disk partition table.
Examples:
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_kdb('database', 'id', 'tableName'))
>>> data = pd.DataFrame({
'name': ['a', 'b', 'c'],
'id': [1, 2, 3],
'quantity': [4, 5, 6]
})
>>> kx.q('publish', data)
"name,id,quantity"
"a,1,4"
"b,2,5"
"c,3,6"
Process
Write data to a kdb+ process.
.qsp.write.toProcess[handle]
.qsp.write.toProcess[.qsp.use (!) . flip (
(`mode ; mode);
(`target ; target);
(`async ; async);
(`queueLength; queueLength);
(`queueSize ; queueSize);
(`spread ; spread);
(`handle ; handle);
(`retries ; retries);
(`retryWait ; retryWait))]
Parameters:
name | type | description | default |
---|---|---|---|
handle | symbol | Handle of destination process to write to. | ` |
options:
name | type | description | default |
---|---|---|---|
mode | symbol | `function (call) or `table (upsert) (symbol). | `function |
target | symbol | Target function name or table name. | "" |
async | boolean | Whether writing should be async. | 1b |
queueLength | long | Max async message queue length before flush. | 0Wj |
queueSize | long | Max number of bytes to buffer before flushing. | 1MB |
spread | boolean | Treat the pipeline data as a list of arguments in function mode (can not be set with params also set). |
0b |
params | symbol[] | List of any parameters that should appear before the message data in function mode. | () |
retries | long | Number of times to retry the connection to the process. | 60 |
retryWait | timespan | Time to wait between process connection attempts. | 0D00:00:03 |
For all common arguments, refer to configuring operators
This operator writes data to another kdb+ process using IPC.
Dropped connections
During a retry loop, on connection loss and on startup, all processing on the worker is halted until the connection to the output process is re-established or the max number of retries is reached.
Publish to table:
q -p 1234
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toProcess[.qsp.use `handle`mode`target!`::1234`table`output]
q)publish ([] til 2)
q)`::1234 `output
x
-
0
1
Publish to function:
q -p 1234 <<< "upd: {x set y}"
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.map[{ (`table; x) }]
.qsp.write.toProcess[.qsp.use `handle`target`spread!(`::1234; `upd; 1b)]
q)publish ([] til 2)
q)`::1234 `data
x
-
0
1
Re-connection to output process:
# Quit the process after some time to force a reconnect
q -p 1234
Set number of retries to 2 and wait time between retries to 2 seconds
.qsp.run .qsp.read.fromCallback[`publish]
.qsp.write.toProcess[.qsp.use (!) . flip (
(`handle ; `::1234);
(`mode ; `table);
(`target ; `destTable);
(`retries ; 2);
(`retryWait; 0D00:00:02) ]
WARN [] SPWRITE Connection lost to IPC output process, attempting to reconnect, writer=ipc-91f585df
INFO [] CONN Closing connection, h=9
INFO [] SPWRITE Connection re-established, handle=::1234
sp.write.to_process('::8001', 'eqExecution')
Connection
Worker is halted until an IPC connection is established or given up on during a retry loop (on connection loss and on startup), all processing on the Worker is halted until the connection to the output process is re-established or the max number of retries is reached.
Parameters:
name | type | description | default |
---|---|---|---|
handle | symbol | The kdb+ connection handle in the form ':host:port' . Note the leading colon. The host can be omitted to use localhost, e.g. '::5000' . |
` |
options:
name | type | description | default |
---|---|---|---|
target | symbol | The name of the function to be called, or the table to be upserted, as defined in the remote kdb+ process. | "" |
mode | symbol | Whether the target is a function to be called, or a table to be upserted. | `function |
sync | boolean | Whether writing should be synchronous or asynchronous. | 1b |
queue_length | long | Maximum number of async messages permitted in the queue before flushing. | 0Wj |
queue_size | long | Maximum number of bytes to buffer before flushing the queue. Defaults to one mebibyte. | 1MB |
spread | boolean | Whether the pipeline data should be treated as a list of arguments for the target function. This option can only be set if in function mode, and cannot be set if params is set. |
0b |
params | symbol[] | A list of parameters that should appear before the message data in function mode. | () |
retries | long | Maximum number of retries that will be attempted for establishing the initial connection, or reconnecting after the connection has been lost. | 60 |
retry_wait | timespan | How long to wait between retry attempts. | 0D00:00:03 |
Returns:
A to_process
writer, which can be joined to other operators or pipelines.
Examples:
>>> from kxi import sp
>>> import pandas as pd
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
| sp.write.to_process('::1234', 'tableName'))
>>> data = pd.DataFrame({
'name': ['a', 'b', 'c'],
'id': [1, 2, 3],
'quantity': [4, 5, 6]
})
>>> kx.q('publish', data)
name id quantity
----------------
a 1 4
b 2 5
c 3 6
Subscriber
This node is used to publish data to websocket subscribers, such as kdb Insights Enterprise Views. It buffers data and publishes at a configurable frequency. Filters can be applied to focus on a specific subset of the data and optionally keys can be applied to see the last value by key. When a subscriber connects, they receive an initial snapshot with the current state of the subscription and then receive periodic updates as the node publishes them.
When configuring the operator you must decide whether it is keyed or not.
- Keyed: When configured with keyed columns, it stores and publishes the latest record for each unique key combination. For example, a subscriber wanting to see the latest stock price by symbol.
- Unkeyed: When configured without any key columns, the node publishes every row it receives and store a configurable buffer size. This buffer is used for snapshots and stores a sliding window of the latest records received. For example, a subscriber to the last 100 trades for a symbol. Once the buffer size limit is reached, it drops older records to store new ones.
For more information on using the Subscriber writer in an end-to-end deployment, see either the kdb Insights SDK example or the Enterprise example
.qsp.write.toSubscriber[table; keyCols]
Parameters:
name | type | description | default |
---|---|---|---|
table | symbol | The name of the subscription. This is used to subscribe to data emitted from this node. | Required |
keyCol | symbol or symbol[] | A key column, or list of key columns. This can be an empty list for unkeyed subscriptions. | Required |
For all common arguments, refer to configuring operators
options:
name | type | description | default |
---|---|---|---|
publishFrequency | long | The interval at which the node sends data to its subscribers (milliseconds) | 500 |
cacheLimit | long | The max size (number of rows) for an unkeyed snapshot cache. This is not required in keyed mode. | 2000 |
Examples:
Keyed subscription
This example deploys a subscriber node keyed on the sym
and exch
column, and configured to publish every 500ms.
Three rows are received in quick succession for the same key combination. Each one overwrites the previous and if all occur
within the same timer period, then only a single row is published to subscribers. If the incoming records were across
timer periods, then a single row would be published for each period.
The .qsp.getSnapshotCache API is used to show the state of the snapshot cache. If a new client subscribed, this is what they receive as an initial snapshot.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toSubscriber[`data; `sym`exch; .qsp.use `name`publishFrequency!(`subscriber; 500)];
publish enlist`sym`exch`time`pos!(`FD;`LSE;2023.06.25D15:00:00.001;100);
publish enlist`sym`exch`time`pos!(`FD;`LSE;2023.06.25D15:00:00.002;100);
publish enlist`sym`exch`time`pos!(`FD;`LSE;2023.06.25D15:00:00.003;100);
.qsp.getSnapshotCache[`subscriber;enlist[`table]!enlist[`data]]
sym exch| time pos
--------| ---------------------------------
FD LSE | 2023.06.25D15:00:00.003000000 100
Unkeyed subscription
This example deploys an unkeyed subscriber node configured to publish every 500ms. Eight rows are received in quick succession and all eight are published to any subscribers. However the snapshot cache only stores the last 5 rows so a new subscription only receives those last 5 in their snapshot.
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toSubscriber[`data; "s"$(); .qsp.use `name`publishFrequency`cacheLimit!(`subscriber; 500; 5)];
publish flip `time`sym`exch`pos!(2023.06.25D15:00:00. + 1000000 * til 8;`FD;`LSE;100);
.qsp.getSnapshotCache[`subscriber;enlist[`table]!enlist[`data]]
time sym exch pos
------------------------------------------
2023.06.25D15:00:00.003000000 FD LSE 100
2023.06.25D15:00:00.004000000 FD LSE 100
2023.06.25D15:00:00.005000000 FD LSE 100
2023.06.25D15:00:00.006000000 FD LSE 100
2023.06.25D15:00:00.007000000 FD LSE 100
Updating the cache
Occasionally you might want to replace the data in the snapshot cache, either to empty it or replace outdated information.
By adding a cacheOperation
key to the message metadata, this signals to the subscriber node to replace its cache with the incoming message.
You can do this for both keyed and unkeyed nodes.
This example empties the cache when the day changes. It does this by tracking the current date
and when it changes, it adds the cacheOperation
key to the message metadata with a value of replace
.
It then publishes an empty table to the subscriber. To empty the cache you can also send a generic null (::)
.
Apply node
Use an apply to edit the metadata. The map node does not allow you to persist changes to the metadata.
DATE:.z.d;
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.apply[{[op;md;data]
if[DATE < .z.d;
DATE::.z.d;
md[`cacheOperation]:`replace;
data:0#data;
];
.qsp.push[op;md;data]
}]
.qsp.write.toSubscriber[`data; "s"$()]
The previous example cleared the cache, however you may want to replace the cache with a specific dataset. This can be done by feeding the subscriber node from multiple readers. The example below creates a pipeline with two callback readers and joined together with a union to both feed a subscriber node.
- The first (configured with the
replace
callback) is connected to an apply node which sets thecacheOperation
metadata to clear the cache with any data it receives. - The second (configured with the
upd
callback) is joined with the first callback - Both are feeding the subscriber node on the
data
topic
Two rows are published to the regular publish callback and the snapshot cache contains them both. Then the replace function is called and the snapshot cache shows only that latest row.
Set non-determinism environment variable
To use the union operator you must set the KXI_ALLOW_NONDETERMINISM
to "true"
.
This can be done in code or in the pipeline specification.
setenv[`KXI_ALLOW_NONDETERMINISM; "true"]
rep:.qsp.read.fromCallback[`replace]
.qsp.apply[{[op;md;data]
md[`cacheOperation]:`replace;
.qsp.push[op;md;data]
}]
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.union[rep]
.qsp.write.toSubscriber[`data; "s"$(); .qsp.use enlist[`name]!enlist `subscriber]
publish enlist`sym`exch`time`pos!(`FD;`LSE;2023.06.25D15:00:00.001;100);
publish enlist`sym`exch`time`pos!(`FD;`LSE;2023.06.25D15:00:00.002;100);
.qsp.getSnapshotCache[`subscriber;enlist[`table]!enlist[`data]]
sym exch time pos
------------------------------------------
FD LSE 2023.06.25D15:00:00.001000000 100
FD LSE 2023.06.25D15:00:00.002000000 100
replace enlist`sym`exch`time`pos!(`FD;`LSE;2023.06.25D15:00:00.003;100);
.qsp.getSnapshotCache[`subscriber;enlist[`table]!enlist[`data]]
sym exch time pos
------------------------------------------
FD LSE 2023.06.25D15:00:00.003000000 100
Other readers
For simplicity this example used two callback readers but other readers can be used to achieve the same behaviour depending on your data sources. For example, you might use an upload to manually upload your desired replace dataset. Alternatively you could replace the snapshot cache using a payload from the database reader after EOD.
sp.write.to_subscriber('data', 'sym')
Parameters:
name | type | description | default |
---|---|---|---|
table | symbol | Name of the subscription. Used to subscribe to data emitted from this node. | Required |
key_col | symbol or symbol[] | A key column, or list of key columns. Can be an empty list for unkeyed subscriptions. | Required |
For all common arguments, refer to configuring operators
options:
name | type | description | default |
---|---|---|---|
publishFrequency | long | The interval at which the node sends data to its subscribers (milliseconds) | 500 |
cache_limit | int | The max size (number of rows) for an unkeyed snapshot cache. This is not required in keyed mode | 2000 |
Examples:
Keyed subscription
This example deploys a subscriber node keyed on the sym
and exch
column, and configured to publish every 500ms.
Three rows are received in quick succession for the same key combination. Each one overwrites the previous and if all occur
within the same timer period, then only a single row is published to subscribers. If the incoming records were across
timer periods, then a single row would be published for each period.
The get_snapshot_cache API is used to show the state of the snapshot cache. If a new client subscribed, this is what they receive as an initial snapshot.
import pykx as kx
from kxi import sp
sp.run(sp.read.from_callback('publish')
| sp.write.to_subscriber('data', ['sym', 'exch'], name='subscriber', publishFrequency=500))
kx.q('publish', kx.Dictionary({'sym': 'FD', 'exch': 'LSE', 'time': kx.q('2023.06.25D15:00:00.001'), 'pos': 100}))
kx.q('publish', kx.Dictionary({'sym': 'FD', 'exch': 'LSE', 'time': kx.q('2023.06.25D15:00:00.002'), 'pos': 100}))
kx.q('publish', kx.Dictionary({'sym': 'FD', 'exch': 'LSE', 'time': kx.q('2023.06.25D15:00:00.003'), 'pos': 100}))
sp.get_snapshot_cache('subscriber', {'table': 'data'})
pykx.KeyedTable(pykx.q('
sym exch| time pos
--------| ---------------------------------
FD LSE | 2023.06.25D15:00:00.003000000 100
'))
Unkeyed subscription
The example below deploys an unkeyed subscriber node configured to publish every 500ms. Eight rows are received in quick succession and all eight are published to any subscribers. However the snapshot cache only stores the last 5 rows so a new subscription will only receive those last 5 in their snapshot.
import pykx as kx
from kxi import sp
sp.run(sp.read.from_callback('publish')
| sp.write.to_subscriber('data', [], name='subscriber', publishFrequency=500, cache_limit=5))
kx.q('publish', kx.q('flip `time`sym`exch`pos!(2023.06.25D15:00:00. + 1000000 * til 8;`FD;`LSE;100)'))
sp.get_snapshot_cache('subscriber', {'table': 'data'})
pykx.Table(pykx.q('
time sym exch pos
------------------------------------------
2023.06.25D15:00:00.003000000 FD LSE 100
2023.06.25D15:00:00.004000000 FD LSE 100
2023.06.25D15:00:00.005000000 FD LSE 100
2023.06.25D15:00:00.006000000 FD LSE 100
2023.06.25D15:00:00.007000000 FD LSE 100
'))
Updating the cache
Occasionally you might want to replace the data in the snapshot cache, either to empty it or replace outdated information.
By adding a cacheOperation
key to the message metadata, this signals to the subscriber node to replace its cache with the incoming message.
You can do this for both keyed and unkeyed nodes.
The example below empties the cache when the day changes. It does this by tracking the current date
and when it changes, it adds the cacheOperation
key to the message metadata with a value of replace
.
It then publishes an empty table to the subscriber. To empty the cache you can also send a generic null (::)
.
Apply node
Use an apply to edit the metadata. The map node does not allow you to persist changes to the metadata.
import pykx as kx
from kxi import sp
import datetime as dt
DATE = dt.date.today()
def updates(op, md, data):
global DATE
if DATE < dt.date.today():
DATE = dt.date.today()
md['cacheOperation'] = 'replace'
data = kx.q('{0#x}', data)
sp.push(op, md, data)
sp.run(sp.read.from_callback('publish')
| sp.apply(updates)
| sp.write.to_subscriber('data', []))
The previous example cleared the cache, however you may want to replace the cache with a specific dataset. This can be done by feeding the subscriber node from multiple readers. The example below creates a pipeline with two callback readers and joined together with a union to both feed a subscriber node.
- The first (configured with the
replace
callback) is connected to an apply node which sets thecacheOperation
metadata to clear the cache with any data it receives. - The second (configured with the
upd
callback) is joined with the first callback - Both are feeding the subscriber node on the
data
topic
Two rows are published to the regular publish callback and the snapshot cache contains them both. Then the replace function is called and the snapshot cache shows only that latest row.
import pykx as kx
from kxi import sp
import datetime as dt
kx.q('setenv', 'KXI_ALLOW_NONDETERMINISM', kx.CharVector("true"))
def apply(op, md, data):
md['cacheOperation'] = 'replace'
sp.push(op, md, data)
rep = (sp.read.from_callback('replace')
| sp.apply(apply))
sp.run(sp.read.from_callback('publish')
| sp.union(rep)
| sp.write.to_subscriber('data', [], name='subscriber'))
kx.q('publish', kx.Dictionary({'sym': 'FD', 'exch': 'LSE', 'time': kx.q('2023.06.25D15:00:00.001'), 'pos': 100}))
kx.q('publish', kx.Dictionary({'sym': 'FD', 'exch': 'LSE', 'time': kx.q('2023.06.25D15:00:00.002'), 'pos': 100}))
sp.get_snapshot_cache('subscriber', {'table': 'data'})
pykx.Table(pykx.q('
sym exch time pos
------------------------------------------
FD LSE 2023.06.25D15:00:00.001000000 100
FD LSE 2023.06.25D15:00:00.002000000 100
'))
kx.q('replace', kx.Dictionary({'sym': 'FD', 'exch': 'LSE', 'time': kx.q('2023.06.25D15:00:00.003'), 'pos': 100}))
sp.get_snapshot_cache('subscriber', {'table': 'data'})
pykx.Table(pykx.q('
sym exch time pos
------------------------------------------
FD LSE 2023.06.25D15:00:00.003000000 100
'))
Other readers
For simplicity this example used two callback readers but other readers can be used to achieve the same behaviour depending on your data sources. For example, you might use an upload to manually upload your desired replace dataset. Alternatively you could replace the snapshot cache using a payload from the database reader after EOD.
Variable
Writes data to a variable in the local process.
This operator writes to a local variable, either overwriting, appending, or upserting to the variable depending on the selected mode. The variable will be created when the pipeline is initially run. If it already exists, it will be overwritten. The variable will continue to exist after the pipeline is torn down.
.qsp.write.toVariable[variable]
.qsp.write.toVariable[variable; mode]
Parameters:
name | type | description | default |
---|---|---|---|
variable | symbol | The name of the variable to write the output to. | Required |
mode | symbol | The writing behavior. Mode can be one of the following options: - append outputs the stream into a variable- overwrite sets the variable to the last output of the pipeline- upsert performs an upsert on table data to the output variable. |
append |
For all common arguments, refer to configuring operators
Append joins data
When using append
as the write mode, values will be joined, even if they do not
have matching types. If tables are being used in the stream, the result will also be
a table. If a matching schema is desired, consider using upsert
mode to force a type
error for table schema mismatches.
Join data in a single output:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toVariable[`output]
publish 1 2 3
publish "abc"
output
1
2
3
"abc"
Capture an output value:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toVariable[`output; `overwrite];
publish 1 2 3
publish 4 5 6
output
4 5 6
Stream data into a table:
.qsp.run
.qsp.read.fromCallback[`publish]
.qsp.write.toVariable[`output; `upsert];
publish ([] x: 1 2 3; y: "abc");
publish ([] x: 4 5; y: "de");
output
x y
---
1 a
2 b
3 c
4 d
5 e
sp.write.to_variable('results')
Parameters:
name | type | description | default |
---|---|---|---|
variable | symbol | The name of the q variable the data will be written to. | Required |
mode | symbol | How to set/update the specified q variable. | append |
Returns:
A to_variable
writer, which can be joined to other operators or pipelines.
Examples:
Stream data into a list by appending to it:
>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish') | sp.write.to_variable('output'))
>>> kx.q('publish').each(range(-10, 0, 2))
>>> kx.q('output')
-10 -8 -6 -4 -2
>>> kx.q('publish').each(range(0, 11, 2))
>>> kx.q('output')
-10 -8 -6 -4 -2 0 2 4 6 8 10
Stream data into a table with upsert to get a type error if there's a schema mismatch:
>>> sp.run(sp.read.from_callback('publish') | sp.write.to_variable('output', 'upsert'))
>>> kx.q('publish each (([] x: 1 2 3; y: "abc");([] x: 4 5; y: "de"))')
>>> kx.q('output')
x y
---
1 a
2 b
3 c
4 d
5 e