Backfilling Historical Data
This guide demonstrates how to backfill historical quote data from an ICE replay file within an Amazon S3 bucket. It serves as a starting point, you can adjust the methodologies in this guide to ingest data from other formats or sources.
kdb Insights Enterprise provides multiple methods to read data into a pipeline. For more information, refer to the list of readers.
Setting up backfilling
You must download and install the IceFixedIncome package before setting up historical data backfilling.
The following environment variables must exist before proceeding:
INSIGHTS_HOSTNAME
- Hostname of Insights deploymentCLIENT_NAME
- Client ID used to request an access token from InsightsCLIENT_SECRET
- Client secret used to request an access tokenAWS_ACCESS_KEY_ID
- Access key ID for authenticating API requests to AWSAWS_SECRET_ACCESS_KEY
- Secret access key for authenticating API requests to AWSAWS_SESSION_TOKEN
- Required session token if using an access key as a temporary credential for AWS
1. Create the backfill directory
Create a directory to contain the files needed for the backfill setup. In this example, create a directory named ice-fi-backfill
:
mkdir ice-fi-backfill
ice-fi-backfill
directory:
cp IceFixedIncome/pipeline-spec/icehistoricreplayfixedincome-pipeline-spec.q ice-fi-backfill/backfill.q
2. Update the pipeline code to execute
The IceFixedIncome package contains the pipeline code that executes when the backfill
pipeline starts.
Open the backfill.q
script in a code or text editor, for example:
vi ice-fi-backfill/backfill.q
YOUR ASSEMBLY
to the variable .fsi.assemblyName
. This is the name of the pre-existing IceFixedIncome assembly. If you followed the quick starter guide, this is fsi-app-ice-fi
.
2. Assign YOUR FILEPATH
and YOUR FILENAME
to the variable .fsi.filePath
. These can be updated in the q script for a one time run or specified as a parameter using the -f
flag when calling the backfill.sh
script. Any parameter passed to the shell script overwrites the value in the q script.
3. Assign AWS Region
to the variable .fsi.region
. This is the AWS region for authentication.
3. Create the packaging script to deploy the backfill pipeline
Create a bash script to package the pipeline. Save this script in the same directory as the backfill.q
file, within ice-fi-backfill
. Use a code or text editor to create this script, for example:
vi runBackfill.sh
You can add the following code to your bash script:
#!/bin/bash
# Usage:
# Flags which take a user input:
# --file | -f : Specify the filepath to the directory containing the file to ingest. Expected to be an S3 bucket path to a compressed txt file
# Example: ./runBackfill.sh -f s3:://my-bucket/sample-folder/Quotes_20201231.txt.gz
logMsg(){
if [ -z "$*" ]; then
echo "No message provided"
return 1
fi
echo "$(date -u +'%Y-%m-%dT%H:%M:%S.%3NZ') ## $*"
}
## Default Arguments
## ASM_NAME be updated if using a different assembly name
ASM_NAME="fsi-app-ice-fi"
FILE_PATH=""
## Grab flags
while [[ $# -gt 0 ]]; do
case $1 in
-file|--file|-f)
FILE_PATH=("$2")
shift ## past argument
shift ## past value
;;
-*|--*)
echo "Unknown option $1"
exit 1
;;
esac
done
## The file path can be specified in the q script or passed as a parameter. Passing as a parameter overwrites the value specified in the q script
if [[ -n "$FILE_PATH" ]] ; then
## Replace the line beginning with .fsi.filePath in order to specify the inputted file path
sed -i "s|^\.fsi\.filePath:.*|.fsi.filePath:hsym \`\$\"${FILE_PATH}\";|" backfill.q
logMsg Running backfill pipeline for the file: ${FILE_PATH}
else
logMsg Running backfill pipeline
fi
sleep 5
################ FUNCTIONS ######################
renewToken(){
logMsg "Renewing keycloak token"
curl -s --header "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=client_credentials&client_id=$CLIENT_NAME&client_secret=$CLIENT_SECRET" \
"$INSIGHTS_HOSTNAME/auth/realms/insights/protocol/openid-connect/token" \
| jq -r .access_token > token
}
teardown(){
## arg1- pipeline name
PIPELINE_NAME=$1
renewToken
logMsg "Tearing down pipeline: $PIPELINE_NAME"
curl -s -S -X POST -H "Authorization: Bearer $(cat token)" $INSIGHTS_HOSTNAME/streamprocessor/pipeline/teardown/insights-$PIPELINE_NAME?clearCheckpoints=true
}
runPipeline(){
while getopts "p:s:" opt; do
case $opt in
p) PIPELINE_NAME="$OPTARG" ;;
s) SPEC_FILE="$OPTARG" ;;
esac
done
logMsg "Deploying pipeline: $PIPELINE_NAME with spec file: $SPEC_FILE"
## Token needs renewed before running pipeline
renewToken
## Teardown pipeline if it already exists
teardown $PIPELINE_NAME
logMsg "Waiting for pipeline to teardown"
sleep 10
logMsg "Pipeline will write to assembly: $ASM_NAME"
## run request
curl -s -S -X POST $INSIGHTS_HOSTNAME/streamprocessor/pipeline/create \
-H "Authorization: Bearer $(cat token)" \
-d "$(jq -n --arg spec "$(cat $SPEC_FILE)" \
--arg aws_access_key_id $AWS_ACCESS_KEY_ID \
--arg aws_secret_access_key $AWS_SECRET_ACCESS_KEY \
--arg aws_session_token $AWS_SESSION_TOKEN \
--arg pipeline_name $PIPELINE_NAME \
--arg asm_name $ASM_NAME \
--arg configmap_name $ASM_NAME-assembly-configmap \
'{
name : $pipeline_name,
type : "spec",
config : { content: $spec },
settings : {
minWorkers: "1",
maxWorkers: "1"
},
env : {
KXI_SP_BETA_FEATURES: "true",
ASM_NAME: $asm_name,
AWS_REGION: "eu-west-1",
AWS_ACCESS_KEY_ID: $aws_access_key_id,
AWS_SECRET_ACCESS_KEY: $aws_secret_access_key,
AWS_SESSION_TOKEN: $aws_session_token,
KX_KURL_DEBUG_LOG: "1",
KXI_SP_DIRECT_WRITE_ASSEMBLY: $asm_name,
KX_TRACE_S3: "1"
},
kubeConfig : {
configMaps: $configmap_name
}
}' | jq -asR .)"
}
runPipeline -p backfill -s backfill.q
4. Running the runBackfill.sh script
Use the runBackfill.sh
script to manually run the backfill pipeline. The script accepts a single flag:
- --file | -f : Specify the file path to the directory containing the file to ingest. This should be an S3 bucket path pointing to a compressed .txt
file.
To ensure the script is executable, run the following:
chmod +x runBackfill.sh
Now the script is ready to be executed. For example:
./runBackfill.sh -f s3:://my-bucket/sample-folder/Quotes_20201231.txt.gz
The pipeline name is prefixed with insights
, for example: insights-backfill
.