Skip to content

Batch S3 ingestion and preprocessing

This page describes how to read bulk data from S3, apply scaling and preprocessing steps

Motivation

The purpose of this example is to provide a user with an example workflow showing the application of preprocessing steps commonly applied to data in machine learning use-cases prior to persistence. This workflow is important when a user is intending to make use of this bulk data repeatedly by a data-science team relying on use of clean, scaled data.

Example

This example follows closely the S3 ingestion examples outlined here with the application of a number of preprocessing steps to data making use of Machine Learning functionality defined here. Namely in this case preprocessing data prior to persistence to save feature sets for centralised machine learning tasks.

Follow the steps here if you don't already have it.

Update the pipelines: block in the s3-ingest.yaml with the content below. The default s3-ingest.yaml pipeline definition will download, decode and write to the database. This example adds preprocessing steps to the existing spec to;

  • remove any null values
  • remove any infinities
  • normalises values by scaling them into the 0-1 range
      pipelines:
        transform:
          base: q-ml
          env:
            - name: KX_KURL_DISABLE_AUTO_REGISTER
              value: "1"
          spec: |-
            schema: flip $[;()]@' (!) . flip (
              (`vendor                ; "j");
              (`pickup                ; "p");
              (`dropoff               ; "p");
              (`store_and_fwd         ; "b");
              (`ratecode              ; "j");
              (`pu_location           ; "j");
              (`do_location           ; "j");
              (`passengers            ; "j");
              (`distance              ; "f");
              (`fare                  ; "f");
              (`extra                 ; "f");
              (`mta_tax               ; "f");
              (`tip                   ; "f");
              (`tolls                 ; "f");
              (`ehail_fee             ; "f");
              (`improvement_surcharge ; "f");
              (`total                 ; "f");
              (`payment_type          ; "h");
              (`trip_type             ; "h");
              (`congestion_surcharge  ; "f")
              );

            // Define columns to undergo preprocessing
            preproc_cols:`ehail_fee`total`extra`fare;

            .qsp.run
              .qsp.read.fromAmazonS3["s3://kxs-prd-cxt-twg-roinsightsdemo/ny_taxi_data_2021_12.csv"; "eu-west-1"]
              .qsp.decode.csv[schema]
              .qsp.transform.replaceInfinity[preproc_cols]
              .qsp.transform.replaceNull[preproc_cols]
              .qsp.ml.minMaxScaler[preproc_cols]
              .qsp.v2.write.toDatabase[`greentrips; .qsp.use `directWrite`database!(1b; "s3-ingest")]

Deploying the assembly

Once you've downloaded and updated the sample assembly, you can deploy it using the commands below.

kxi auth login
kxi assembly deploy -f s3-ingest.yaml

This deploys a database, a stream, and an SP pipeline. It contains a taxi CSV dataset from Amazon S3 and ingests it into the deployed database. Once deployed, the pipeline will begin to ingest data and write to the database.

Checking progress

To check progress of the ingest and validate success, you should follow the steps described here.

Python

The pipeline can be updated to a Python version by replacing the pipelines: block with the code below. The definition is exactly as described above.

      pipelines:
        transform:
          base: py-ml
          env:
            - name: KX_KURL_DISABLE_AUTO_REGISTER
              value: "1"
          spec: |-
            from kxi import sp

            # Define schema for ingested data
            schema = {
                'vendor': 'j',
                'pickup': 'p',
                'dropoff': 'p',
                'store_and_fwd': 'b',
                'ratecode': 'j',
                'pu_location': 'j',
                'do_location': 'j',
                'passengers': 'j',
                'distance': 'f',
                'fare': 'f',
                'extra': 'f',
                'mta_tax': 'f',
                'tip': 'f',
                'tolls': 'f',
                'ehail_fee': 'f',
                'improvement_surcharge': 'f',
                'total': 'f',
                'payment_type': 'h',
                'trip_type': 'h',
                'congestion_surcharge': 'f',
            }

            # Define columns to undergo preprocessing
            preproc_cols = ['ehail_fee', 'total', 'extra', 'fare'];

            sp.run(sp.read.from_amazon_s3("s3://kxs-prd-cxt-twg-roinsightsdemo/ny_taxi_data_2021_12.csv", region="eu-west-1") |
                sp.decode.csv(schema, header='first') |
                sp.transform.replace_infinity(preproc_cols)| sp.transform.replace_null(preproc_cols) |
                sp.ml.min_max_scaler(preproc_cols) |
                sp.write.to_database("greentrips", database="s3-ingest", directWrite=True, api_version=2))

Summary

The above workflow shows the ingestion and modification of a file stored in S3 prior to persistence simulating the update/cleaning of centralised data for use by a team of data-scientists.

To validate that the pipeline has been applied appropriately querying the columns 'ehail_fee', 'total', 'extra' and 'fare' should only return values between 0 and 1 with no infinite values or nulls.