Skip to content

Batch S3 ingestion and preprocessing

Read bulk data from S3, apply scaling and preprocessing steps

Motivation

The purpose of this example is to provide a user with an example workflow showing the application of preprocessing steps commonly applied to data in machine learning use-cases prior to persistence. This workflow is important when a user is intending to make use of this bulk data repeatedly by a data-science team relying on use of clean, scaled data.

Example

This example follows closely the S3 ingestion examples outlined here with the application of a number of preprocessing steps to data making use of Machine Learning functionality defined here. Namely in this case preprocessing data prior to persistence to save feature sets for centralised machine learning tasks.

To run an ingestion pipeline using kdb Insights Enterprise, first deploy the base system. Next, download the s3-ingest.yaml assembly by following the download instructions. Deploy the assembly with the following command.

Deploy the assembly with the following kdb Insights CLI command.

kxi assembly create --filepath s3-ingest.yaml

This will setup Data Access (DA) processes and a Storage Manager (SM) for a data warehouse. A Reliable Transport (RT) stream labelled green-taxi will publish data into the Storage Manager.

Once deployed, the q and Python specifications defining the preprocessing workflow can be deployed to the Stream Processor Coordinator. The following define the respective pipelines for q and Python

The following script should be saved as spec.q for deployment to the stream-processor.

// Define schema for ingested data
schema: flip $[;()]@' (!) . flip (
    (`vendor                ; "j");
    (`pickup                ; "p");
    (`dropoff               ; "p");
    (`store_and_fwd         ; "b");
    (`ratecode              ; "j");
    (`pu_location           ; "j");
    (`do_location           ; "j");
    (`passengers            ; "j");
    (`distance              ; "f");
    (`fare                  ; "f");
    (`extra                 ; "f");
    (`mta_tax               ; "f");
    (`tip                   ; "f");
    (`tolls                 ; "f");
    (`ehail_fee             ; "f");
    (`improvement_surcharge ; "f");
    (`total                 ; "f");
    (`payment_type          ; "h");
    (`trip_type             ; "h");
    (`congestion_surcharge  ; "f")
    );

// Define columns to undergo preprocessing
preproc_cols:`ehail_fee`total`extra`fare

.qsp.run
    .qsp.read.fromAmazonS3["s3://nyc-tlc/trip data/green_tripdata_2020-01.csv"]
    .qsp.decode.csv[schema]
    .qsp.transform.replaceInfinity[preproc_cols]
    .qsp.transform.replaceNull[preproc_cols]
    .qsp.ml.minMaxScaler[preproc_cols]
    .qsp.map[{(`greentrips; x)}]
    .qsp.write.toStream[]

The following script should be saved as spec.py for deployment to the stream-processor.

from kxi import sp

sp.init()

# Define schema for ingested data
schema = {
    'vendor': 'j',
    'pickup': 'p',
    'dropoff': 'p',
    'store_and_fwd': 'b',
    'ratecode': 'j',
    'pu_location': 'j',
    'do_location': 'j',
    'passengers': 'j',
    'distance': 'f',
    'fare': 'f',
    'extra': 'f',
    'mta_tax': 'f',
    'tip': 'f',
    'tolls': 'f',
    'ehail_fee': 'f',
    'improvement_surcharge': 'f',
    'total': 'f',
    'payment_type': 'h',
    'trip_type': 'h',
    'congestion_surcharge': 'f',
}

# Define columns to undergo preprocessing
preproc_cols = ['ehail_fee', 'total', 'extra', 'fare']

sp.run(sp.read.from_amazon_s3('s3://nyc-tlc/trip data/green_tripdata_2020-01.csv')
    | sp.decode.csv(schema)
    | sp.map(lambda x: ('greentrips', x))
    | sp.transform.replace_infinity(preproc_cols)
    | sp.transform.replace_null(preproc_cols)
    | sp.ml.min_max_scaler(preproc_cols)
    | sp.write.to_stream()

Once defined we can finally submit the pipelines created via a request to the Coordinator using port forwarding of the coordinator or using the cluster's ingress path to /streamprocessor/pipeline/create. The requests below uses a port forward to the kxi-sp service to submit the pipeline.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n --arg spec "$(cat spec.q)" \
    '{
        name       : "s3-ingest",
        type       : "spec",
        base       : "q-ml",
        config     : { content : $spec },
        env        : { RT_PUB_TOPIC: "s3-ingest-green-taxi", RT_TOPIC_PREFIX: "rt-" }
    }' | jq -asR .)"
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n --arg spec "$(cat spec.py)" \
    '{
        name       : "s3-ingest",
        type       : "spec",
        base       : "py-ml",
        config     : { content : $spec },
        env        : { RT_PUB_TOPIC: "s3-ingest-green-taxi", RT_TOPIC_PREFIX: "rt-" }
    }' | jq -asR .)"

Reliable Transport configuration

In the request above, the request configures the Worker with RT related environmental variables such that the deployed pipeline can connect to other kdb Insights components through RT.

RT_TOPIC_PREFIX is set to the same value set in the assembly file in the topicPrefix fields. This field must be consistent across the assembly and all components.

RT_PUB_TOPIC is set to the topic name of the streaming data. The topic is composed from fields set in the assembly in the semantic format: <assembly-name>-<sequencer-topic-name>.

When complete the ingestion pipeline will terminate and cleanup its resources.

Summary

The above workflow shows the ingestion and modification of a file stored in S3 prior to persistence simulating the update/cleaning of centralised data for use by a team of data-scientists.

To validate that the pipeline has been applied appropriately querying the columns 'ehail_fee', 'total', 'extra' and 'fare' should only return values between 0 and 1 with no infinite values or nulls.