Skip to content

Batch S3 Ingest

Read bulk data from S3 and write to downstream consumers

To run an ingestion pipeline using the KX Insights Enterprise, first deploy the base system. Next, download the s3-ingest.yaml assembly by following the download instructions. Deploy the assembly with the following command.

kubectl apply -f s3-ingest.yaml

This will setup Data Access (DA) processes and a Storage Manager (SM) for a data warehouse. A Reliable Transport (RT) stream labeled green-taxi will publish data into the Storage Manager.

Once deployed, the ingest.q script can be deployed to the Stream Processor Coordinator.

AWS credential discovery

The S3 reader supports custom AWS credentials. For more information, see the reader configuration.

The streaming program ingest.q should be as follows:

schema: flip $[;()]@' (!) . flip (
    (`vendor                ; "j");
    (`pickup                ; "p");
    (`dropoff               ; "p");
    (`store_and_fwd         ; "b");
    (`ratecode              ; "j");
    (`pu_location           ; "j");
    (`do_location           ; "j");
    (`passengers            ; "j");
    (`distance              ; "f");
    (`fare                  ; "f");
    (`extra                 ; "f");
    (`mta_tax               ; "f");
    (`tip                   ; "f");
    (`tolls                 ; "f");
    (`ehail_fee             ; "f");
    (`improvement_surcharge ; "f");
    (`total                 ; "f");
    (`payment_type          ; "h");
    (`trip_type             ; "h");
    (`congestion_surcharge  ; "f")
    );

.qsp.run
    .qsp.read.fromAmazonS3["s3://nyc-tlc/trip data/green_tripdata_2020-01.csv"]
    .qsp.decode.csv[schema]
    .qsp.map[{(`greentrips; x)}]
    .qsp.write.toRT[]

The streaming program ingest.py should be as follows:

from kxi import sp

sp.init()

schema = {
    'vendor': 'j',
    'pickup': 'p',
    'dropoff': 'p',
    'store_and_fwd': 'b',
    'ratecode': 'j',
    'pu_location': 'j',
    'do_location': 'j',
    'passengers': 'j',
    'distance': 'f',
    'fare': 'f',
    'extra': 'f',
    'mta_tax': 'f',
    'tip': 'f',
    'tolls': 'f',
    'ehail_fee': 'f',
    'improvement_surcharge': 'f',
    'total': 'f',
    'payment_type': 'h',
    'trip_type': 'h',
    'congestion_surcharge': 'f',
}

sp.run(sp.read.from_s3('s3://nyc-tlc/trip data/green_tripdata_2020-01.csv')
    | sp.decode.csv(schema)
    | sp.map(lambda x: ('greentrips', x))
    | sp.write.to_rt())

Finally, submit the pipeline create request to the Coordinator using the cluster's ingress path to /streamprocessor/pipeline/create. The request below uses a port forward to the kxi-sp service to submit the pipeline.

curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n --arg spec "$(cat ingest.q)" \
    '{
        name       : "s3-ingest",
        type       : "spec",
        config     : { content : $spec },
        env        : { RT_PUB_TOPIC: "s3-ingest-green-taxi", RT_TOPIC_PREFIX: "rt-" }
    }' | jq -asR .)"
curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n --arg spec "$(cat ingest.py)" \
    '{
        name       : "s3-ingest",
        type       : "spec",
        base       : "py",
        config     : { content : $spec },
        env        : { RT_PUB_TOPIC: "s3-ingest-green-taxi", RT_TOPIC_PREFIX: "rt-" }
    }' | jq -asR .)"

Reliable Transport configuration

In the request above, the request configures the Worker with RT related environmental variables such that the deployed pipeline can connect to other KX Insights components through RT.

RT_TOPIC_PREFIX is set to the same value set in the assembly file in the topicPrefix fields. This field must be consistent across the assembly and all components.

RT_PUB_TOPIC is set to the topic name of the streaming data. The topic is composed from fields set in the assembly in the semantic format: <assembly-name>-<sequencer-topic-name>.

When the ingestion pipeline is complete, it will terminate and cleanup its resources.