Skip to content

kxi.sp.read

Stream Processor readers.

Readers are a specialized type of operator that allow users to feed data from different external data sources into a streaming pipeline. Each reader has its own start, stop, setup, and teardown functions to handle different streaming lifecycle events. Readers are assumed to be asynchronous and push data into a pipeline using sp.push.

Some readers, such as from_callback and from_kafka, have implemented a stream-partitioning interface. When using these readers, the partitions are distributed over multiple Workers, orchestrated by a Controller.

FileChunking Objects

class FileChunking(IntEnum)

Enum for file chunking options.

Chunking a file splits the file into smaller batches, and streams the batches through the pipeline.

These enum values can be provided as True or False for enabled and disabled respectively.

disabled

Do not split the file into chunks.

enabled

Split the file into chunks.

auto

Automatically determine the size of the target file, and if it is sufficiently large (more than a few megabytes) it will be read in chunks.

FileMode Objects

class FileMode(AutoNameEnum)

Enum for file mode options.

These enum values can be provided as enum member objects (e.g. FileMode.binary), or as strings matching the names of members (e.g. 'binary').

binary

Read the content of the file into a byte vector.

text

Read the content of the file into strings, and split on newlines.

ParquetMode Objects

class ParquetMode(AutoNameEnum)

Enum for parquet file mode options.

These enum values can be provided as enum member objects (e.g. ParquetMode.table), or as strings matching the names of members (e.g. 'table').

table

Read the content of the parquet file into a table.

lists

Read the content of the parquet file into list of arrays.

from_callback

@Reader
def from_callback(callback: Union[str, kx.SymbolAtom],
                  *,
                  partitions: List = (),
                  key: Optional[Union[str, kx.SymbolAtom]] = None,
                  replay: Union[bool, kx.BooleanAtom] = False) -> Reader

Read from a callback function defined in the global q namespace.

Notes:

To enable beta features, set the environment variable KXI_SP_BETA_FEATURES to true.

Arguments:

  • callback - The name of the callback function that will be defined in the global q namespace.
  • partitions - A list of partition identifiers to distribute over available workers.
  • key - Name of the field which contains the key of the published event, or None for unkeyed data.
  • replay - (Beta feature) If True, message replay is enabled for the Callback reader. On recovery, messages that arrived after the last checkpoint will be pushed to the pipeline.
  • external - Allow users to execute the callback function via an IPC connection.

Returns:

A from_callback reader, which can be joined to other operators or pipelines.

Notes:

This operator defines callback as a function in the q global namespace, causing data passed to the function (locally or over IPC) to enter the pipeline data flow.

Using Replay

If using replay, you must set $KXI_SP_EVENT_JOURNAL to "true", and set $KXI_SP_JOURNAL_DIR to the directory where you'd like the event journals to be stored. Event journals are files containing the messages published to the callback reader. Messages are replayed from the journals on recovery. If $KXI_SP_JOURNAL_DIR is not set, it defaults to being $KXI_SP_CHECKPOINT_DIR/journals.

Note that enabling replay adversely affects performance, and is disabled by default for this reason.

Python strings

PyKX automatically converts Python strings to q symbols which can cause issues in the case where your callback returns a string for use in a subsequent SP operation. If your data should be treated as a string, you should explicitly convert it using pykx.CharVector.

Examples:

Calling a callback multiple times, with a window to batch the data:

>>> import numpy as np
>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_callback('publish')
        | sp.window.timer(np.timedelta64(10, 's'), count_trigger=12)
        | sp.map(lambda x: x * x)
        | sp.write.to_console(timestamp='none'))
>>> kx.q('publish', range(4))
>>> kx.q('publish', range(4))
>>> kx.q('publish', range(10))
0 1 4 9 0 1 4 9 0 1 4 9 16 25 36 49 64 81

Callback with message replay:

>>> import os
>>> from kxi import sp
>>> import pykx as kx

>>> os.makedirs('/tmp/journals')
>>> os.environ['KXI_SP_JOURNAL_DIR'] = '/tmp/journals'
>>> pipeline = (sp.read.from_callback('publish', replay=True)
        | sp.write.to_variable('out'))
>>> sp.run(pipeline)
>>> kx.q('publish', 0)
>>> kx.q('publish', 1)
>>> kx.q('out')
0 1
>>> sp.teardown()
>>> sp.run(pipeline)
>>> kx.q('out')
0 1

Callback with a string as argument:

>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_callback('publish')
        | sp.decode.csv(schema={'name': 'string', 'id': 'int'})
        | sp.write.to_variable('out'))

>>> data = kx.CharVector("\n".join(
      ["name,id",
      "User1,1",
      "User2,2",
      "User3,3"]))

>>> kx.q('publish', data)
>>> kx.q('out')
name    id
----------
"User1" 1
"User2" 2
"User3" 3

Callback with a list of strings as argument, returning a list of symbols in q:

>>> import pykx as kx
>>> from kxi import sp
>>> sp.run(sp.read.from_callback('publish')
        | sp.write.to_variable('out'))
>>> kx.q('publish', ["abc", "def"])
>>> kx.q('out')
`abc`def

from_database

@Reader
def from_database(sql: Optional[CharString] = None,
                  uda: Optional[List[Union[CharString, DictSpec]]] = None,
                  table: Optional[CharString] = None,
                  start_ts: Optional[TimestampSpec] = None,
                  end_ts: Optional[TimestampSpec] = None,
                  filter: Optional[List[List[CharString]]] = None,
                  group_by: Optional[List[CharString]] = None,
                  agg: Optional[List[CharString]] = None,
                  fill: Optional[CharString] = None,
                  temporality: Optional[CharString] = None,
                  sort_cols: Optional[List[CharString]] = None,
                  labels: Optional[DictSpec] = None,
                  retries: Optional[Union[kx.LongAtom, int]] = None,
                  retry_wait: Optional[Timedelta] = None) -> Reader

Read data from a kdb Insights Database.

This operator is built on the kdb Insights query APIs. The default mode is to use the getData API to query for a specific table and time range.

It also supports the execution of SQL queries, however this is disabled by default. It can be enabled by following these steps for sql2.

For more information on supported functionality read the API docs.

Arguments:

  • sql - SQL query to execute on the database. Leave blank for getData requests or when utilizing UDAs.
  • uda - List of the UDA registered name and required UDA parameters. Omit for getData or query requests.
  • table - Table name in kdb Insights Database. Cannot be combined with above uda option.
  • start_ts - Inclusive start time of period of interest.
  • end_ts - Exclusive end time of period of interest.
  • filter - List of triadic lists of the form (function;column name;parameter).
  • group_by - List of columns to group aggregation result by.
  • agg - List of triples of aggregations or columns to select, e.g. [['c1', 'avg', 'valFloat'], ['c2', 'min', 'qual']].
  • fill - How to handle nulls in the data. Supported values are zero and forward.
  • temporality - Sets the range of data in view for each day within the query. Supports two types of temporality: snapshot which takes a continuous range of the data, and slice which grabs data between the time values of startTS and endTS parameters, for each date within the start and end timestamp.
  • sort_cols - Columns to sort result data on (ascending), e.g. ['sensorID', 'readTS'].
  • labels - Map of label selectors for specifying Assemblies.
  • retries - Maximum number of retries that will be attempted for establishing the initial connection, or reconnecting after the connection has been lost.
  • retry_wait - How long to wait between retry attempts, e.g. timedelta(milliseconds=2000) or (2, 's').

Returns:

A from_database reader, which can be joined to other operators or pipelines.

Notes:

Database Host

In an Insights Enterprise deployment, the database reader will use the deployment release name to locate the query service gateway. When using an Insights SDK configuration, or to override the host address of the database, set $KXI_SP_DATABASE_HOST. The hostname should be the full URL of the Service Gateway including the port to connect to.

Aggregation

The agg parameter can be used to select a subset of table columns in a getData query, e.g. ['valFloat','qual'].

Fills

The zero option fills numeric types with zeroes. The forward option fills nulls with previous, non-null entry. Using any value for this parameter when aggregating is ineffective if the agg parameter is a list of triples, e.g. [['c1', 'avg', 'valFloat'], ['c2', 'min', 'qual']].

Timestamps

The start_ts and end_ts parameters can be either a Python datetime object or a date-formatted string, e.g. 2022.07.29D00:00:00.000000000.

UDA parameter types

When using this operator with the UDA query mode, it's important to ensure the UDA parameters are correctly typed or they may not be handled correctly by the server. See the UDA example below.

>>> from datetime import datetime
>>> date_str = "2024-10-14 14:00:45"
>>> date_format = "%Y-%m-%d %H:%M:%S"
>>> start_ts = datetime.strptime(date_str, date_format)

getData API:

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_database( table='weather',
                                  start_ts='2022.07.28D00:00:00.000000000',
                                  end_ts='2022.07.29D00:00:00.000000000',
                                  agg=['timestamp', 'sensor', 'airtemp', 'name', 'borough', 'longitude', 'latitude'],
                                  retries=3,
                                  retry_wait=(2, 's'))
        | sp.write.to_variable('out'))
>>> kx.q('out')
timestamp                     sensor   airtemp  name                      borough       longitude latitude
----------------------------------------------------------------------------------------------------------
2022.07.28D00:00:00.000000000 Q-JC_04  75.05117 Fox Hills                 Staten Island -74.08174 40.61731
2022.07.28D00:00:00.000000000 Bk-BR_17 54.27767 Rosedale                  Queens        -73.73526 40.65982
2022.07.28D00:00:00.000000000 Bk-EF_09 54.828   Far Rockaway              Queens        -73.75498 40.60313
2022.07.28D00:00:00.000000000 Bk-EN_14 54.00883 Beechhurst                Queens        -73.80436 40.79278
2022.07.28D00:00:00.000000000 Bk-SH_08 54.364   St. George                Staten Island -74.07935 40.64498
2022.07.28D00:00:00.000000000 Bx-EC_01 52.84533 Stapleton                 Staten Island -74.0779  40.62693
2022.07.28D00:00:00.000000000 Bx-EC_02 50.88533 Rosebank                  Staten Island -74.06981 40.6153
2022.07.28D00:00:00.000000000 Bx-EC_03 52.4225  West Brighton             Staten Island -74.10718 40.63188
2022.07.28D00:00:00.000000000 Bx-EC_04 52.45167 Grymes Hill               Staten Island -74.08725 40.62418
2022.07.28D00:00:00.000000000 Bx-EC_05 51.70583 Todt Hill                 Staten Island -74.11133 40.59707

Advanced getData API:

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_database( table='weather',
                            start_ts='2022.07.28D00:00:00.000000000',
                            end_ts='2022.07.29D00:00:00.000000000',
                            filter=[['<=', 'airtemp', 60], ['within', 'longitude', [-75, -72]]],
                            fill='zero',
                            temporality='slice',
                            sort_cols=['sensor', 'timestamp'],
                            agg=['timestamp', 'sensor', 'airtemp', 'name', 'borough', 'longitude', 'latitude'],
                            retries=3,
                            retry_wait=(2, 's'))
        | sp.write.to_variable('out'))
date       label_kxname  timestamp                     sensor   airtemp  name            borough longitude latitude
-------------------------------------------------------------------------------------------------------------------
2022.07.28 insights-demo 2022.07.28D01:00:00.000000000 Bk-BR_02 59.20733 Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D02:00:00.000000000 Bk-BR_02 57.5775  Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D03:00:00.000000000 Bk-BR_02 56.317   Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D04:00:00.000000000 Bk-BR_02 55.57333 Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D05:00:00.000000000 Bk-BR_02 54.50233 Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D06:00:00.000000000 Bk-BR_02 53.62383 Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D07:00:00.000000000 Bk-BR_02 53.74717 Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D08:00:00.000000000 Bk-BR_02 55.60933 Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D09:00:00.000000000 Bk-BR_02 57.8435  Cambria Heights Queens  -73.73527 40.69277
2022.07.28 insights-demo 2022.07.28D10:00:00.000000000 Bk-BR_02 59.26417 Cambria Heights Queens  -73.73527 40.69277

SQL API:

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_database(sql="select * from weather",
                                 retries=3,
                                 retry_wait=timedelta(milliseconds=2000))
        | sp.write.to_variable('out'))
>>> kx.q('out')
timestamp                     sensor   airtemp  name                borough       longitude latitude
----------------------------------------------------------------------------------------------------
2022.07.28D07:00:00.000000000 Q-CH_19  46.256   Queens Village      Queens        -73.73871 40.71889
2022.07.28D06:00:00.000000000 Q-CH_19  46.7232  Queens Village      Queens        -73.73871 40.71889
2022.07.28D06:00:00.000000000 Q-CH_10  47.63167 Kew Gardens Hills   Queens        -73.82088 40.72258
2022.07.28D05:00:00.000000000 Q-CH_19  47.70583 Queens Village      Queens        -73.73871 40.71889
2022.07.28D07:00:00.000000000 Q-CH_10  47.88417 Kew Gardens Hills   Queens        -73.82088 40.72258
2022.07.28D00:00:00.000000000 Q-CH_29  48.04817 Madison             Brooklyn      -73.94842 40.60938
2022.07.28D00:00:00.000000000 Q-CH_31  48.47183 Bronxdale           Bronx         -73.86173 40.85272
2022.07.28D05:00:00.000000000 Q-CH_10  48.70867 Kew Gardens Hills   Queens        -73.82088 40.72258
2022.07.28D01:00:00.000000000 Q-CH_31  48.85617 Bronxdale           Bronx         -73.86173 40.85272

Utilize a registered UDA:

>>> from kxi import sp
>>> import pykx as kx

# explicitly type the UDA params to avoid issues on the server side
>>> args=['.example.multiplierAPI', {'table': kx.SymbolAtom('weather'),
        'column': kx.CharVector('airtemp'), 'multiplier': kx.FloatAtom(10.0)}]
>>> sp.run(sp.read.from_database(uda=args)
        | sp.write.to_variable('out'))
>>> kx.q('out')
original_col multiplied_col
---------------------------
46.256        462.56
46.7232       467.232
47.63167      476.3167
..

from_expr

@Reader
def from_expr(expr: Union[str, bytes, Callable]) -> Reader

Evaluate expression or function into the pipeline.

Arguments:

  • expr - An expression which produces data when evaluated. This can either be a string or a nullary Python function.

Returns:

A from_expr reader, which can be joined to other operators or pipelines.

Notes:

The expression reader supports two types of inputs: - string - executed as a q statement - Python function - executed in Python and returns an object which is converted to q. Python nullary functions (including lambdas) are supported, i.e. function arguments are not supported.

Python strings

PyKX automatically converts Python strings to q symbols which can cause issues in the case where your callable expression returns a string for use in a subsequent SP operation. If your data should be treated as a string, you should explicitly convert it using pykx.CharVector.

Read from a nullary Python function:

>>> from kxi import sp
>>> import pykx as kx

>>> def nullary_func():
        return range(0, 10)

>>> sp.run(sp.read.from_expr(nullary_func)
        | sp.write.to_variable('out'))
>>> kx.q('out')
0 1 2 3 4 5 6 7 8 9

Execute a lambda and return a q string:

>>> import pykx as kx
>>> from kxi import sp

>>> data = kx.CharVector("\n".join(
    ["name,id,quantity",
    "a,1,4",
    "b,2,5",
    "c,3,6"]))

>>> sp.run(sp.read.from_expr(lambda: data)
        | sp.decode.csv({'name': 'string', 'id': 'int', 'quantity': 'int'})
        | sp.write.to_variable('out'))
>>> kx.q('out')
name id quantity
----------------
,"a" 1  4
,"b" 2  5
,"c" 3  6

Read from from a q variable as a string:

>>> from kxi import sp
>>> sp.teardown()
>>> import pykx as kx
>>> kx.q('counter:10')
>>> sp.run(sp.read.from_expr('counter')
        | sp.write.to_variable('out'))
>>> kx.q('out')
,10

Read from a q expression as a string:

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_expr('til 10')
        | sp.write.to_variable('out'))
>>> kx.q('out')
0 1 2 3 4 5 6 7 8 9

Read from a q function as a string:

>>> from kxi import sp
>>> import pykx as kx
>>> sp.run(sp.read.from_expr('sumFunction: {[x; y] x + y} ; sumFunction[3; 5]')
    | sp.write.to_variable('out'))
>>> kx.q('out')
,8

from_file

@Reader
def from_file(path: Union[os.PathLike, List[os.PathLike]],
              mode: FileMode = FileMode.binary,
              *,
              offset: int = 0,
              chunking: FileChunking = FileChunking.auto,
              chunk_size: Union[int, CharString] = '1MB',
              watch: Union[bool, dict] = False) -> Reader

Read file contents into the pipeline.

Arguments:

  • path - A filepath or list of file paths. Glob patterns are supported.
  • mode - How the content of the file should be interpreted by the reader. This must be a member of the sp.read.FileMode enumeration.
  • offset - How many bytes into the file reading should begin.
  • chunking - If/how the file should be split into chunks.
  • chunk_size - The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.
  • watch - Either True, False, or a dictionary containing method (a string) and frequency(a timedelta). The only current method is "timer". The default frequency is timedelta(seconds=5). If watch is set to True, use the watch defaults.

Returns:

A from_file reader, which can be joined to other operators or pipelines.

Examples:

Local file watching:

>>> from kxi import sp

>>> file = 'numbers*.txt'
>>> sp.run(sp.read.from_file(file, watch=True)
        | sp.write.to_console(timestamp='none'))
,"0"
,"1"
,"2"
,"3"
,"4"

If next we upload numbersThree.txt we see:

,"5"
,"6"
,"7"
,"8"
,"9"

KafkaOffset Objects

class KafkaOffset(IntEnum)

Where to start consuming a Kafka partition.

beginning

Start consumption at the beginning of the partition.

end

Start consumption at the end of the partition.

from_kafka

@Reader
def from_kafka(topic: str,
               brokers: Union[str, List[str]] = 'localhost:9092',
               *,
               retries: int = 10,
               retry_wait: Timedelta = (1, 's'),
               poll_limit: int = 1000,
               offset: Optional[Dict[int, KafkaOffset]] = None,
               options: Optional[Dict[str, Any]] = None,
               registry: Optional[Union[CharString, str]] = '',
               as_list: Optional[bool] = False) -> Reader

Consume data from a Kafka topic.

Notes:

Maximum poll limit

The Kafka reader reads multiple messages in a single poll iteration. A global limit is imposed on the number of messages to read in a single cycle to avoid locking the process in a read loop only serving Kafka messages. By default this limit is set to 1000 messages. Setting the configuration option pollLimit will change this value. This limit is global so if multiple readers set this value, only one will be used.

Reserved keys

group.id, metadata.broker.list, and consumer.id are reserved values and are maintained directly by the Kafka reader. The Kafka reader will error if these options are used.

Running with TLS

It is recommended for all production deployments that TLS be enabled for encrypting data in transit. For more information about configuring TLS, refer to either the microservice tutorial or the platform tutorial depending on your deployment type.

Offset Committing and Group Permissions

sp.read.from_kafka will automatically commit offsets when reading from a kafka broker. This is essential for exactly once semantics and fault tolerance. The error "Local: Waiting for coordinator" will occur when attempting to commit offsets if you do not have sufficient group permissions. The broker should be configured so that the user has group Describe permissions. Since sp.read.from_kafka will generate a random group ID, ensure the user is permissioned for all group names on the broker.

  • **Note - Kafka configuration options**

Beside the above keys, all available Kafka consumer configuration options are supported using the options dictionary. This includes properties such as socket.timeout.ms, fetch.message.max.bytes, etc.

Arguments:

  • topic - The name of a topic.
  • brokers - Brokers identified a 'host:port' string, or a list of 'host:port' strings.
  • retries - Maximum number of retries that will be attempted for Kafka API calls.
  • retry_wait - How long to wait between retry attempts.
  • poll_limit - Maximum number of records to process in a single poll loop.
  • offset - Dictionary mapping from partition IDs to their offsets.
  • options - Dictionary of Kafka consumer options.
  • registry - Optional URL to a Kafka Schema Registry. When provided, Kafka Schema Registry mode is enabled, allowing for payload decoding.
  • as_list - Boolean value which, when set to True results in Kafka Schema Registry messages omitting field names when decoding Protocol Buffer schemas and returning values as a list.

Returns:

A from_kafka reader, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_kafka('numbers')
        | sp.write.to_variable('out'))
>>> kx.q('out')
Assuming the 'numbers' topic is set up at 'localhost:9092' to generate random integers between 0-10
4
7
3

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_kafka('numbers', options={
        'socket.timeout.ms': '60000',
        'socket.send.buffer.bytes': '4194304',
        'max.in.flight': '100000'})
        | sp.write.to_variable('out'))
>>> kx.q('out')
Assuming the 'numbers' topic is set up at 'localhost:9092' to generate random integers between 0-10
4
7
3

from_postgres

@Reader
def from_postgres(query: CharString,
                  database: Optional[CharString] = None,
                  server: Optional[CharString] = None,
                  port: Optional[CharString] = None,
                  username: Optional[CharString] = None,
                  password: Optional[CharString] = None) -> Reader

Execute query on a PostgreSQL database.

Any parameter (except for the query) can be omitted, in which case it will be sourced from an environment variable following the format $KXI_SP_POSTGRES_<param> where <param> is the parameter name in uppercase.

Notes:

The pipeline will be torn down after processing a Postgres query After the query has completed processing, the Postgres reader will signal a 'finish' command which will teardown the pipeline if there are no other pending requests.

Arguments:

  • query - Query to execute on the Postgres database.
  • database - Name of the database to connect to. Defaults to $KXI_SP_POSTGRES_DATABASE.
  • server - Address of the database to connect to. Defaults to $KXI_SP_POSTGRES_SERVER.
  • port - Port of the database. Defaults to $KXI_SP_POSTGRES_PORT.
  • username - Username to authenticate with. Defaults to $KXI_SP_POSTGRES_USERNAME.
  • password - Password to authenticate with. Defaults to $KXI_SP_POSTGRES_PASSWORD.

Returns:

A from_postgres reader, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_postgres('SELECT * FROM stocks')
        | sp.write.to_variable('out'))
>>> kx.q('out')
id  sym       market   name
----------------------------------------------------------------
1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."
2   "CIA"     "NYSE"   "Citizens, Inc."
3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment..."
4   "SPWR"    "NASDAQ" "SunPower Corporation"
5   "DVA"     "NYSE"   "DaVita Inc."
6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."
7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."

from_sqlserver

@Reader
def from_sqlserver(query: CharString,
                   database: Optional[CharString] = None,
                   server: Optional[CharString] = None,
                   port: Optional[Union[int, CharString]] = None,
                   username: Optional[CharString] = None,
                   password: Optional[CharString] = None) -> Reader

Execute query on a SQLServer database.

Any parameter (except for the query) can be omitted, in which case it will be sourced from an environment variable following the format $KXI_SP_SQLSERVER_<param> where <param> is the parameter name in uppercase.

Arguments:

  • query - Query to execute on the SQLServer database.
  • database - Name of the database to connect to. Defaults to $KXI_SP_SQLSERVER_DATABASE.
  • server - Address of the database to connect to. Defaults to $KXI_SP_SQLSERVER_SERVER.
  • port - Port of the database. Defaults to $KXI_SP_SQLSERVER_PORT.
  • username - Username to authenticate with. Defaults to $KXI_SP_SQLSERVER_USERNAME.
  • password - Password to authenticate with. Defaults to $KXI_SP_SQLSERVER_PASSWORD.

Returns:

A from_sqlserver reader, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_sqlserver('SELECT * FROM stocks')
        | sp.write.to_variable('out'))
>>> kx.q('out')
id  sym       market   name
----------------------------------------------------------------
1   "OXLC"    "NASDAQ" "Oxford Lane Capital Corp."
2   "CIA"     "NYSE"   "Citizens, Inc."
3   "PEI"     "NYSE"   "Pennsylvania Real Estate Investment..."
4   "SPWR"    "NASDAQ" "SunPower Corporation"
5   "DVA"     "NYSE"   "DaVita Inc."
6   "BHACW"   "NASDAQ" "Barington/Hilco Acquisition Corp."
7   "BCRX"    "NASDAQ" "BioCryst Pharmaceuticals, Inc."

from_stream

@Reader
def from_stream(table: Optional[str] = None,
                stream: Optional[str] = None,
                *,
                prefix: CharString = '',
                assembly: Optional[str] = None,
                insights: bool = True,
                index: int = 0,
                position: Optional[str] = None) -> Reader

Read data using a kdb Insights Stream.

Arguments:

  • table - Name of the table to filter the stream on. By default, no filtering is performed.
  • stream - Name of stream to subscribe to. By default, the stream specified by the $RT_SUB_TOPIC environment variable is used.
  • position - The position in the stream to replay from. Options are start or end of the stream. If no position is specified, defaults to start.
  • index DEPRECATED - The integer position in the stream to replay from. If both index and position are set the position parameter will take priority.
  • prefix DEPRECATED - Prefix to add to the hostname for RT cluster. By default, the prefix given by the $RT_TOPIC_PREFIX environment variable is used.
  • assembly DEPRECATED - The kdb Insights assembly to read from. By default, no assembly is used.
  • insights DEPRECATED - Whether the stream being subscribed to uses Insights message formats.

Returns:

A from_stream reader, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_stream('trace', 'north')
        | sp.write.to_variable('out'))
>>> kx.q('out')
date       val
--------------
2023.09.26 0
2023.09.26 1
2023.09.26 2
2023.09.26 3
2023.09.26 4
2023.09.26 5
2023.09.26 6
2023.09.26 7
2023.09.26 8
2023.09.26 9

from_amazon_s3

@Reader
def from_amazon_s3(path: Union[CharString, List[CharString]],
                   mode: FileMode = FileMode.binary,
                   *,
                   offset: int = 0,
                   chunking: FileChunking = FileChunking.auto,
                   chunk_size: Union[int, CharString] = '1MB',
                   tenant: CharString = '',
                   domain: CharString = '',
                   region: CharString = 'us-east-1',
                   credentials: CharString = '',
                   watch: Union[bool, dict] = False) -> Reader

Reads a file from Amazon S3.

Arguments:

  • path - The S3 URI of an object or multiple objects to read from S3. Glob patterns are supported.
  • mode - How the content of the file should be interpreted by the reader. This must be a member of the sp.read.FileMode enumeration.
  • offset - DEPRECATED How many bytes into the file reading should begin.
  • chunking - A FileChunking enum value, or string equivalent.
  • chunk_size - The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.
  • tenant - The authorization tenant.
  • domain - A custom Amazon S3 domain.
  • region - The AWS region to authenticate against.
  • credentials - The secret name for the Amazon S3 credentials. Refer to the authentication section of the .qsp.read.fromAmazonS3 documentation for more information.
  • watch - Either True, False, or a dictionary containing method (a string) and frequency(a timedelta). The only current method is "timer". The default frequency is timedelta(seconds=5). If watch is set to True, use the watch defaults.

Returns:

A from_amazon_s3 reader, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_amazon_s3(':s3://mybucket/numbers1.txt', mode='text',region='us-east-2')
        | sp.write.to_variable('out'))
>>> kx.q('out')
"556"
"465"
"63"
"106"
"46"

from_azure_storage

@Reader
def from_azure_storage(path: Union[CharString, List[CharString]],
                       mode: FileMode = FileMode.binary,
                       *,
                       offset: int = 0,
                       chunking: FileChunking = FileChunking.auto,
                       chunk_size: Union[int, CharString] = '1MB',
                       account: CharString = '',
                       tenant: CharString = '',
                       domain: CharString = '',
                       credentials: CharString = '',
                       watch: Union[bool, dict] = False) -> Reader

Reads a file from Azure Blob Storage.

Notes:

"ms:// URLs"

This operator requires URLs to be in the format ms://yourContainerName/path/to/file.csv, so the URL https://myStorageAccount.blob.core.windows.net/myContainerName/path/to/file.csv would be written as sp.read.fromAzureStorage["ms://myContainerName/path/to/file.csv"; "myStorageAccount"]

Arguments:

  • path - The Blob URI of an object or multiple objects to read from Microsoft Azure Storage. Note that this must be an ms:// URL*, not an https:// URL. Glob patterns are supported.
  • mode - How the content of the file should be interpreted by the reader. This must be a member of the sp.read.FileMode enumeration.
  • offset - DEPRECATED How many bytes into the file reading should begin.
  • chunking - A FileChunking enum value, or string equivalent.
  • chunk_size - The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.
  • account - The Azure account to read from.
  • tenant - The authorization tenant.
  • domain - A custom Azure domain.
  • credentials - The secret name for the Azure credentials. Refer to the authentication section of the .qsp.read.fromAzureStorage documentation for more information.
  • watch - Either True, False, or a dictionary containing method (a string) and frequency(a timedelta). The only current method is "timer". The default frequency is timedelta(seconds=5) If watch is set to True, use the watch defaults.

Returns:

A from_azure_storage reader, which can be joined to other operators or pipelines.

Notes:

ms:// URLs

This operator requires URLs to be in the format ms://yourContainerName/path/to/file.csv, so the URL https://myStorageAccount.blob.core.windows.net/myContainerName/path/to/file.csv would be written as sp.read.fromAzureStorage["ms://myContainerName/path/to/file.csv"; "myStorageAccount"]

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_azure_storage('ms://mycontainer/numbers.txt',
            account='myaccount',mode='text')
        | sp.write.to_variable('out'))
>>> kx.q('out')
"556"
"465"
"63"
"106"
"46"

from_google_storage

@Reader
def from_google_storage(path: Union[CharString, List[CharString]],
                        mode: FileMode = FileMode.binary,
                        *,
                        offset: int = 0,
                        project: CharString = '',
                        chunking: FileChunking = FileChunking.auto,
                        chunk_size: Union[int, CharString] = '1MB',
                        tenant: Optional[CharString] = None,
                        domain: CharString = '',
                        credentials: CharString = '',
                        watch: Union[bool, dict] = False) -> Reader

Read a file hosted on Google Cloud Storage.

Arguments:

  • path - The GS URI of an object or multiple objects to read from Google Cloud Storage. Glob patterns are supported.
  • mode - A FileMode enum value, or the string equivalent. This must be a member of the sp.read.FileMode enumeration.
  • project - The Google Cloud Storage project ID.
  • offset - How many bytes into the file reading should begin.
  • chunking - A FileChunking enum value, or string equivalent.
  • chunk_size - The size of chunks to read when chunking is enabled. Can be specified as an integer number of bytes, or as a string with the unit, e.g. '1MB'.
  • tenant - The authentication tenant.
  • domain - A custom Google Cloud Storage domain.
  • Credentials - Name of a credentials secret to mount.
  • watch - Either True, False, or a dictionary containing method (a string) and frequency(a timedelta). The only current method is "timer". The default frequency is timedelta(seconds=5). If watch is set to True, use the watch defaults.

Returns:

A from_google_storage reader, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(
        sp.read.from_google_storage('gs://myBucket/numbers.txt', mode='text')
        | sp.write.to_variable('out'))
>>> kx.q('out')
"556"
"465"
"63"
"106"
"46"

from_http

@Reader
def from_http(url: CharString,
              method: CharString = 'GET',
              *,
              body: CharString = '',
              header: dict = None,
              on_response: OperatorFunction = None,
              follow_redirects: bool = True,
              max_redirects: int = 5,
              max_retry_attempts: int = 10,
              timeout: int = 0,
              tenant: CharString = '',
              insecure: bool = False,
              binary: bool = False,
              sync: bool = False,
              reject_errors: bool = True) -> Reader

Requests data from an HTTP endpoint.

Arguments:

  • url - The URL to send a request to.
  • method - The HTTP method for the HTTP request (ex. GET, POST, etc.).
  • body - The payload of the HTTP request.
  • header - A map of header fields to their corresponding values.
  • on_response - After a response, allows the response to be preprocessed or to trigger another request. Returning 'None' will process the return from the original request immediately. A return of a string will issue another request with the return value as the URL. A return of a dictionary allows for any of the operator parameters to be reset and a new HTTP request issued. A special 'response' key can be used in the return dictionary to change the payload of the response. If the response key is set to 'None', no data is pushed into the pipeline.
  • follow_redirects - If set, any redirect return will automatically be followed up to the maximum number of redirects.
  • max_redirects - The maximum number of redirects to follow before reporting an error.
  • max_retry_attempts - The number of times to retry a connection after a request timeout.
  • timeout - The duration in milliseconds to wait for a request to be completed before reporting an error.
  • tenant - The request tenant to use for providing request authentication details.
  • insecure - Indicates if unverified server SSL/TLS certificates should be trusted.
  • binary - Indicates that the resulting payload should be returned as binary data, otherwise text is assumed.
  • sync - Indicates if this request should be made synchronously or asynchronously. Setting the request to be synchronous will block the process until the request is completed.
  • reject_errors - Non-successful response codes will generate an error and stop the pipeline.

Returns:

A pipeline comprised of the from_http reader, which can be joined to other pipelines.

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_http('http://example.com', 'GET')
        | sp.write.to_variable('out'))
>>> kx.q('out')
"<!doctype html>\n<html>\n<head>\n    <title>Example Domain</title>\n\n...

Warnings:

Nondeterminism This reader is not yet deterministic. When recovering from a checkpoint, data from before the recovery will not be replayed, which can result in data loss or duplication.

from_mqtt

@Reader
def from_mqtt(topic,
              broker,
              *,
              username: str = "",
              password: str = "",
              sslCert: str = "",
              sslKey: str = "",
              sslKeyPassword: str = "",
              sslCAFile: str = "",
              sslCAPath: str = "",
              sslSecret: str = "") -> Reader

Read from an MQTT broker.

Arguments:

  • topic - The name of the topic to subscribe to.
  • broker - The address of the MQTT broker.
  • username - Username for the MQTT broker.
  • password - Password for the MQTT broker.
  • sslCert - Path of SSL/TLS certificate PEM file.
  • sslKey - Path of SSL/TLS key PEM file.
  • sslKeyPassword - Password of private key file if encrypted.
  • sslCAFile - Path of SSL/TLS public certificate chain.
  • sslCAPath - Path to directory of trusted certificates for SSL connection.
  • sslSecret - Name of Kubernetes secret to read SSL configuration from.

Returns:

A from_mqtt reader, which can be joined to other operators or pipelines.

>>> from kxi import sp
>>> sp.run(sp.read.from_http('readings', 'tcp://localhost:1883')
            | sp.write.to_variable('out'))
>>> kx.q('out')
$ mosquitto_pub -t readings -m "Hello, World!"
"Hello, World!"
This operator subscribes to an MQTT broker, pushing messages published to that topic to the pipeline.

Notes:

Quality of Service

This operator only supports quality of service 0 (at most once messaging). Messages published at a higher QoS will be received at QoS 0.

Determinism

This reader is non-deterministic, and the flag KXI_ALLOW_NONDETERMINISM must be set to "true" to use this reader.

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_mqtt('readings', 'tcp://localhost:1883')
        | sp.write.to_variable('out'))
>>> kx.q('out')
$ mosquitto_pub -t readings -m "Hello, World!"
"Hello, World!"

from_parquet

@Reader
def from_parquet(path: Union[CharString, List[CharString]],
                 mode: Optional[ParquetMode] = ParquetMode.table,
                 metadata: Optional[Dict[CharString, CharString]] = None,
                 storage: Optional[CharString] = '/tmp',
                 region: Optional[CharString] = 'us-east-1',
                 certificates: Optional[CharString] = '/opt/kx/ca-bundle.crt',
                 credentials: Optional[CharString] = '',
                 connection: Optional[CharString] = '',
                 project: Optional[CharString] = '',
                 domain: Optional[CharString] = '',
                 tenant: Optional[CharString] = '',
                 watch: Optional[bool] = False) -> Reader

Read parquet file contents into the pipeline.

Arguments:

  • path - An object URI of a parquet file or multiple parquet files to read from AWS/Azure/GCP or local file system. Note that this must be an [s3://|ms://|gs://|/fs] URL*, not an https:// URL. Glob patterns are supported.
  • mode - How the content of the parquet file should be interpreted by the reader.
  • metadata - Dictionary of metadata keys with their types to be applied according to tabular datasets of arrow.
  • region - Physical locations around the world where Amazon clusters data centers.
  • certificates - Location of default trust store of SSL certificates.
  • credentials - The secret name for the Amazon S3 credentials. Refer to the authentication section of the .qsp.read.fromParquet documentation for more information.
  • connection - The Azure Storage account connection string.
  • project - The Google Cloud Storage project ID.
  • domain - A custom Amazon S3 domain.
  • tenant - The authorization tenant.
  • watch - The flag enables file watcher mode for a parquet cloud registry.

Returns:

A from_parquet reader, which can be joined to other operators or pipelines.

Examples:

These examples assume you have Parquet files stored locally or in object storage buckets.

Reading a parquet file to a table:

>>> from kxi import sp

>>> sp.run(sp.read.from_parquet('/tmp/pytable.parquet')
        | sp.write.to_console(timestamp='none'))
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50

Reading a parquet file to a list of arrays:

>>> from kxi import sp

>>> sp.run(sp.read.from_parquet('/tmp/pytable.parquet', mode=sp.read.ParquetMode.lists)
        | sp.write.to_console(timestamp='none'))
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50

Reading a parquet file from AWS S3 registry:

>>> from kxi import sp

>>> parquet = 's3://kx-insights-nm-support/parquet.parquet'
>>> sp.run(sp.read.from_parquet(parquet, region='eu-west-1')
        | sp.write.to_console(timestamp='none'))
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50

Parquet file watching from AWS S3:

>>> from kxi import sp

>>> parquet = 's3://bucket/retention/interval*.parquet'
>>> sp.run(sp.read.from_parquet(parquet, region='eu-west-1', watch=True)
        | sp.write.to_console(timestamp='none'))
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50

If next we upload intervalJul.parquet to the bucket we see:

tm                            sym      px    qty
-----------------------------------------------
2023.07.19D15:00:00.000000000 "FTXO"   106.4 60
2023.07.19D16:00:00.000000000 "OAIE"   107.3 70
2023.07.19D17:00:00.000000000 "RDIV"   108.2 80
2023.07.19D18:00:00.000000000 "KBWB"   109.1 90
2023.07.19D19:00:00.000000000 "BIGB"   110.0 100

Parquet watching of tabular dataset from Azure:

>>> from kxi import sp

>>> parquet = 'ms://blob/year=2023/month=*/interval*.parquet'
>>> metadata = {'year': 'int', 'month': 'short'}
>>> sp.run(sp.read.from_parquet(parquet,
        connection='<REDACTED>',
        metadata=metadata,
        watch=True)
    | sp.write.to_console(timestamp='none'))
tm                            sym     px    qty year month
----------------------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10  2023 6
2023.06.09D11:00:00.000000000 "HSI"   102.8 20  2023 6
2023.06.09D12:00:00.000000000 "DIA"   103.7 30  2023 6
2023.06.09D13:00:00.000000000 "SPY"   104.6 40  2023 6
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50  2023 6

If next we upload intervalJul.parquet to the bucket we see:

tm                            sym    px    qty year month
---------------------------------------------------------
2023.07.19D15:00:00.000000000 "FTXO" 106.4 60  2023 7
2023.07.19D16:00:00.000000000 "OAIE" 107.3 70  2023 7
2023.07.19D17:00:00.000000000 "RDIV" 108.2 80  2023 7
2023.07.19D18:00:00.000000000 "KBWB" 109.1 90  2023 7
2023.07.19D19:00:00.000000000 "BIGB" 110   100 2023 7
Reading the parquet file from Microsoft Azure registry:

>>> from kxi import sp
>>> parquet = 'ms://bucket/table.parquet'
>>> sp.init()
>>> sp.run(sp.read.from_parquet(parquet, connection='<REDACTED>')
    | sp.write.to_console(timestamp='none'))
tm                            sym     px    qty
-----------------------------------------------
2023.06.09D10:00:00.000000000 "AEX"   101.9 10
2023.06.09D11:00:00.000000000 "HSI"   102.8 20
2023.06.09D12:00:00.000000000 "DIA"   103.7 30
2023.06.09D13:00:00.000000000 "SPY"   104.6 40
2023.06.09D14:00:00.000000000 "ISF.L" 105.5 50

from_upload

@Reader
def from_upload(uploadName: Union[str, kx.SymbolAtom]) -> Reader

Reads data supplied through an HTTP endpoint.

Arguments:

  • uploadName - Unique name for this fromUpload operator. This must be supplied in the path of HTTP upload requests. The request path will be of the form: https://insights.kx.com/streamprocessor/upload/{uploadName} . The uploadName is passed through in the operator metadata.

Returns:

A from_upload reader, which can be joined to other operators or pipelines.

Notes:

Fully Deterministic

The upload reader is fully deterministic by default. It will use event journaling to record the raw data it is provided. In a recovery scenario it will replay the journal (from the latest checkpoint) and return to the state it was in prior to crashing. Since all read events are journalled it will require enough disk space to write that data. To turn this feature off set the environment variable KXI_SP_EVENT_JOURNAL to false.

Limits and HTTP chunking

The upload limit is set to 10MB. The reader does not currently support HTTP chunking, therefore the user is responsible for manually chunking their files and making multiple upload requests if they wish to upload larger files.

No High Availability

Use of this node in a replicated pipeline is not supported and will fail. This will be fixed in a future release.

kdb Insights Enterprise only

The .qsp.read.fromUpload reader operates only within kdb Insights Enterprise deployments of the Stream Processor at this time. This functionality does not operate within the kdb Insights Stream Processor Microservice.

Optional HTTP parameters

The optional HTTP parameters are table and finish. finish is used to finish the operator and subsequently the pipeline. This can be useful if we pair the upload reader with a direct write. The table parameter is unused by the upload operator, however if provided it will be passed through in the metadata and could be used by a node downstream e.g. a map node.

The template of the HTTP request to upload data is: (See the Open API Spec for full details).

curl -X POST \
    https://insights.kx.com/streamprocessor/upload/{uploadName}?table=z&finish=true \
    --header "Content-Type: application/octet-stream" \
    --header "Authorization: Bearer $INSIGHTS_TOKEN" \
    --data-binary "@my/file.csv"

The @ symbol in "@my/file.csv" is required to signify the string resolves to a file rather than a literal.

Examples:

Start pipeline using the from_upload reader:

>>> from kxi import sp
>>> import pykx as kx

>>> sp.run(sp.read.from_upload('myUniqueName')
        | sp.decode.csv(kx.q('''([]time:`$(); ticker:`$(); bid:`$();
            bidSize:`$(); ask:`$(); askSize:`$())'''), header=sp.decode.CSVHeader.always)
        | sp.write.to_console(timestamp='none'))
Now send HTTP request to the coordinator to upload your file data.
>> head quotes.csv
time,ticker,bid,bidSize,ask,askSize
2023-01-09D06:03:54.726000000,AAPL,58.79616,890,5.538282,100
2023-01-09D09:05:49.815000000,AAPL,12.70558,790,47.71454,990
2023-01-10D09:24:01.982000000,AAPL,4.636787,620,50.61855,480
2023-01-10D03:58:45.586000000,MSFT,81.40808,970,23.05427,500
2023-01-10D03:16:20.405000000,FB,44.11634,560,74.11803,330
2023-01-08D09:43:50.155000000,KX,78.12387,650,26.48796,40
2023-01-09D10:55:05.862000000,AAPL,71.60091,840,66.77192,130
2023-01-09D09:03:34.867000000,KX,37.10555,130,69.64077,940
2023-01-08D07:59:00.020000000,TSLA,67.64414,500,43.57184,160

>> curl -X POST https://insights.kx.com/streamprocessor/upload/myUniqueName \
    --header "Authorization: Bearer $INSIGHTS_TOKEN" \
    --data-binary "@quotes.csv"

Worker logs:

time                            ticker bid           bidSize
------------------------------------------------------------
"2023-01-09D06:03:54.726000000" "AAPL" "58.79616"    "890"..
"2023-01-09D09:05:49.815000000" "AAPL" "12.70558"    "790"..
"2023-01-10D09:24:01.982000000" "AAPL" "4.636787"    "620"..
"2023-01-10D03:58:45.586000000" "MSFT" "81.40808"    "970"..
"2023-01-10D03:16:20.405000000" "FB"   "44.11634"    "560"..
"2023-01-08D09:43:50.155000000" "KX"   "78.12387"    "650"..
"2023-01-09D10:55:05.862000000" "AAPL" "71.60091"    "840"..
"2023-01-09D09:03:34.867000000" "KX"   "37.10555"    "130"..
"2023-01-08D07:59:00.020000000" "TSLA" "67.64414"    "500"..
Use the finish parameter to finish the reader after data has been processed. If not provided it defaults to false.
>> curl -X POST https://insights.kx.com/streamprocessor/upload/myUniqueName?finish=true \
    --header "Authorization: Bearer $INSIGHTS_TOKEN" \
    --data-binary "@quotes.csv"
Additionally we can finish the operator without sending any data.
>> curl -X POST https://insights.kx.com/streamprocessor/myUniqueName?finish=true \
    --header "Authorization: Bearer $INSIGHTS_TOKEN"
Note that the only Content-Type the HTTP requests can use are application/octet-stream (the default). Attempts to use any other Content-Type in the request will fail. If for example you wish to upload a JSON file upload use content type application/octet-stream (as opposed to application/json) and use a JSON decoder in the pipeline.