Loading static data

In this topic, you will learn how to load a static CSV file into Refinery. The example uses a historical only data pipeline (no RDB or IDB).

For example, suppose you want to load in reference type data into a system through the use of a file drop via .csv, .txt or .binary. You do so by scheduling a q script to read in your file and ingesting it into a historical only pipeline. Once the data is in memory through the ingestion process, you can run a CLI command which will force the EOD rollover to happen and push the data into the HDB.

Note

It is down to the user to create a repeated schedule for running this script; e.g. using a cron job.

Constructing the pipeline

This is where we choose which processes to include in our pipeline. As mentioned above, we are going to exclude the RDB and IDB for this example.

The processes we are looking for are the:

  • Tickerplant (TP)
  • Intraday Persisting Database (IPDB)
  • End of Day Persisting Database (EPDB)
  • Historical Database (HDB)
pipeline:
  name: DemoPipeline
  type: realtime

  expose-to-gw: true

  taxonomy:
    region: global
    data-source: demo

  proc-layout:
    -
      all: all-processes

  processes:
    tp:
      port: 43210
      pub-mode: timer
      pub-freq-ms: 100
      log-to-journal: true
      rollover-mode: daily-at-time
      rollover-time: "00:00:00.001"
      enable-analyst: true
    hdb:
      timeout: 30
      port: 43211
      enable-analyst: true
    ipdb:
      write-freq: 60000
      write-row-limit: 0
      enable-analyst: true
      port: 43212
    epdb:
      timeout: 0
      enable-analyst: true
      port: 43213

Creating the table

The table being used in this example is the same as the example table in Creating Pipelines.

table:
  name: DemoTable
  id-col: sym
  time-col:  time
  intra-persist-type: splay
  end-persist-type: date-partition

  taxonomy:
    -
      region: global
      data-source: demo

  columns:
    -
      name: time
      data-type: timestamp
      attribute: sorted
    -
      name: sym
      data-type: symbol
      attribute: grouped
    -
      name: price
      data-type: float
    -
      name: volume
      data-type: long

Publishing the data

Creating the dummy data (CSV)

This is our example CSV data > demo.csv.

2023.03.29D00:00:05.000000000,AAPL,20,5
2023.03.29D00:00:06.000000000,TSL,23,10
2023.03.29D00:00:07.000000000,JPM,25,2

Creating the publishing script

Next let's make a q script (demoPub.q) which will be run to publish the data in the Demo.csv file to the TP.

// Creates a connect to the TP on your host using the example TP's port (43210)
tp: hopen `:aaa.refinery.aws.kx.com:43210;

// Loads in the data from the csv and saves in to a variable 'tbl'
tbl: flip `time`sym`price`volume ! ("PSFJ";",") 0: `demo.csv;

// Uses the `upd function to publish the saved data in 'tbl' to the TP under the DemoTable schema
tp(`upd;`DemoTable;tbl);  

Source data with no time-based column

If your source data has no time-based column, a current workaround for this is applying .z.p to the timestamp column within your publishing script before uploading to the TP.

tbl: flip `sym`price`volume ! ("SFJ";",") 0: csvFile;
tbl: `time`sym`price`volume xcols update time: .z.p from tbl;

Non-time series source data

If you want to upload non-time series data, you'll first need to have your pipeline configured to allow non-time series data to be stored. You do so by adding in the parameter allow-non-timeseries-data: true to the pipeline config. With this added, you'll need to edit your table YAML file to not include a time column or the time-col specification. Upload the CSV file as you would normally with no time data.

Publishing script for large CSV files

Sometimes you might want to publish a large CSV file. If memory management is a concern, .Q.fs uploads the table in chunks making the uploading less load bearing.

Note

.Q.fs is a projection of .Q.fsn with the chunk size set to 131000 bytes.

// Sets a global connection to the TP on your host using the example TP's port (43210)
`tp set hopen `:aaa.refinery.aws.kx.com:43210;

// Creates a function to be run within the .Q.fs function
// Using the load and publish functionality used in the first script
loadAndPublish: {[csvFile]
    tbl: flip `time`sym`price`volume ! ("PSFJ";",") 0: csvFile;
    neg[tp](`upd;`DemoTable;tbl) //asynchronously publishing to TP
    };

// .Q.fs loads in the file in chunks to reduce the load on the processes
.Q.fs[loadAndPublish;`:demo.csv]; 

Send the data to the tickerplant

Now that our two files (demo.csv & demoPub.q) are set up, in our command line run q demoPub.q and the data will be sent to the TP.

Note

Make sure that you have q installed/set up where you are running the command from.

Checking the data

Your Gateway Client won't be able to see your data until it has been loaded into the HDB which will happen at EOD, but you can force the EOD to save the data down into the HDB by running the rollover command in the IPDB.

refinery pipeline --rollover DemoPipeline --instance 0 --include-today

This will force the EOD function to start. If you go to the Gateway Client and run the following query, you will see your data displayed. Thus confirming that your data has safely been loaded into the HDB.

.gwClient.query.sync[`getTicks; `dataSource`dataType`startDate`endDate`idList!(`demo; `DemoTable; 2023.03.29; .z.d; `)]