Loading static data¶
In this topic, you will learn how to load a static CSV file into Refinery. The example uses a historical only data pipeline (no RDB or IDB).
For example, suppose you want to load in reference type data into a system through the use of a file drop via .csv, .txt or .binary. You do so by scheduling a q script to read in your file and ingesting it into a historical only pipeline. Once the data is in memory through the ingestion process, you can run a CLI command which will force the EOD rollover to happen and push the data into the HDB.
Note
It is down to the user to create a repeated schedule for running this script; e.g. using a cron job.
Constructing the pipeline¶
This is where we choose which processes to include in our pipeline. As mentioned above, we are going to exclude the RDB and IDB for this example.
The processes we are looking for are the:
- Tickerplant (TP)
- Intraday Persisting Database (IPDB)
- End of Day Persisting Database (EPDB)
- Historical Database (HDB)
pipeline:
name: DemoPipeline
type: realtime
expose-to-gw: true
taxonomy:
region: global
data-source: demo
proc-layout:
-
all: all-processes
processes:
tp:
port: 43210
pub-mode: timer
pub-freq-ms: 100
log-to-journal: true
rollover-mode: daily-at-time
rollover-time: "00:00:00.001"
enable-analyst: true
hdb:
timeout: 30
port: 43211
enable-analyst: true
ipdb:
write-freq: 60000
write-row-limit: 0
enable-analyst: true
port: 43212
epdb:
timeout: 0
enable-analyst: true
port: 43213
Creating the table¶
The table being used in this example is the same as the example table in Creating Pipelines.
table:
name: DemoTable
id-col: sym
time-col: time
intra-persist-type: splay
end-persist-type: date-partition
taxonomy:
-
region: global
data-source: demo
columns:
-
name: time
data-type: timestamp
attribute: sorted
-
name: sym
data-type: symbol
attribute: grouped
-
name: price
data-type: float
-
name: volume
data-type: long
Publishing the data¶
Creating the dummy data (CSV)¶
This is our example CSV data > demo.csv.
2023.03.29D00:00:05.000000000,AAPL,20,5
2023.03.29D00:00:06.000000000,TSL,23,10
2023.03.29D00:00:07.000000000,JPM,25,2
Creating the publishing script¶
Next let's make a q script (demoPub.q) which will be run to publish the data in the Demo.csv file to the TP.
// Creates a connect to the TP on your host using the example TP's port (43210)
tp: hopen `:aaa.refinery.aws.kx.com:43210;
// Loads in the data from the csv and saves in to a variable 'tbl'
tbl: flip `time`sym`price`volume ! ("PSFJ";",") 0: `demo.csv;
// Uses the `upd function to publish the saved data in 'tbl' to the TP under the DemoTable schema
tp(`upd;`DemoTable;tbl);
Source data with no time-based column¶
If your source data has no time-based column, a current workaround for this is applying .z.p to the timestamp column within your publishing script before uploading to the TP.
tbl: flip `sym`price`volume ! ("SFJ";",") 0: csvFile;
tbl: `time`sym`price`volume xcols update time: .z.p from tbl;
Non-time series source data¶
If you want to upload non-time series data, you'll first need to have your pipeline configured to allow non-time series data to be stored. You do so by adding in the parameter allow-non-timeseries-data: true to the pipeline config. With this added, you'll need to edit your table YAML file to not include a time column or the time-col specification. Upload the CSV file as you would normally with no time data.
Publishing script for large CSV files¶
Sometimes you might want to publish a large CSV file. If memory management is a concern, .Q.fs uploads the table in chunks making the uploading less load bearing.
Note
.Q.fs is a projection of .Q.fsn with the chunk size set to 131000 bytes.
// Sets a global connection to the TP on your host using the example TP's port (43210)
`tp set hopen `:aaa.refinery.aws.kx.com:43210;
// Creates a function to be run within the .Q.fs function
// Using the load and publish functionality used in the first script
loadAndPublish: {[csvFile]
tbl: flip `time`sym`price`volume ! ("PSFJ";",") 0: csvFile;
neg[tp](`upd;`DemoTable;tbl) //asynchronously publishing to TP
};
// .Q.fs loads in the file in chunks to reduce the load on the processes
.Q.fs[loadAndPublish;`:demo.csv];
Send the data to the tickerplant¶
Now that our two files (demo.csv & demoPub.q) are set up, in our command line run q demoPub.q and the data will be sent to the TP.
Note
Make sure that you have q installed/set up where you are running the command from.
Checking the data¶
Your Gateway Client won't be able to see your data until it has been loaded into the HDB which will happen at EOD, but you can force the EOD to save the data down into the HDB by running the rollover command in the IPDB.
refinery pipeline --rollover DemoPipeline --instance 0 --include-today
This will force the EOD function to start. If you go to the Gateway Client and run the following query, you will see your data displayed. Thus confirming that your data has safely been loaded into the HDB.
.gwClient.query.sync[`getTicks; `dataSource`dataType`startDate`endDate`idList!(`demo; `DemoTable; 2023.03.29; .z.d; `)]