Skip to content

PostgreSQL Ingestion Example

This page demonstrates how to setup a simple PostgreSQL database and query it from the kdb Insights Stream Processor.

PostgreSQL is an open-source relational database system. This example demonstrates how to setup a simple PostgreSQL database and query it from the kdb Insights Stream Processor.

This example is split into a install and query. If you already have a running PostgreSQL database, consider jumping to the query section.

Install a PostgreSQL database

This example uses scripts provided by Bitnami to install a PostgreSQL database.

The following initdb.sql is provided as an example to setup a database with mock data.

CREATE DATABASE finance;

\c finance

CREATE TABLE stocks (
    id int,
    sym varchar(10),
    market varchar(10),
    name varchar(255),
    cap varchar(20)
);

INSERT INTO stocks VALUES (1, 'AAPL', 'NASDAQ', 'Apple Inc.', '$2.47T');
INSERT INTO stocks VALUES (2, 'MSFT', 'NASDAQ', 'Microsoft', '$2.32T');

To launch the PostgreSQL database in Docker, add a PostgreSQL service to a docker-compose.yaml

version: "3.3"
services:
  psql:
    image: docker.io/bitnami/postgres:latest
    ports:
      - 5432:5432
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: iamsecure
    restart: always
    volumes:
      - ./initdb.sql:/docker-entrypoint-initdb.d/initdb.sql

Finally, run docker-compose up to launch the database.

Being by installing a PostgreSQL database into the current Kubernetes cluster.

helm install postgresql bitnami/postgresql

This should result in a postgresql image being launched in the current Kubernetes context.

kubectl get pods | grep postgresql
NAME                      READY   STATUS    RESTARTS   AGE
postgresql-postgresql-0   1/1     Running   0          4h16m

The PostgreSQL image generates a random password. Extract the password from the cluster and store it in and environment variable called PGPASSWORD for later use.

export PGPASSWORD=$(kubectl get secret postgresql -o jsonpath="{.data.postgresql-password}" | base64 --decode)

Adding mock data

Now that the database is running, add some mock data by interacting with a PostgreSQL client. Launch an interactive client by running the following.

kubectl run postgresql-client --rm --tty -i --restart='Never' \
    --image docker.io/bitnami/postgresql:latest \
    --env="PGPASSWORD=$PGPASSWORD" \
    --command -- psql --host postgresql -U postgres -d postgres -p 5432

To add a mock stocks table with some mock data, run each of the following commands in the client shell from above.

CREATE DATABASE testdb;
CREATE USER testdbuser WITH PASSWORD 'testpass';
GRANT ALL PRIVILEGES ON DATABASE testdb TO testdbuser;
CREATE TABLE stocks (id int, sym varchar(10), market varchar(10), name varchar(255), cap varchar(20));
INSERT INTO stocks VALUES (1, 'AAPL', 'NASDAQ', 'Apple Inc.', '$2.47T');
INSERT INTO stocks VALUES (2, 'MSFT', 'NASDAQ', 'Microsoft', '$2.32T');

The end reult should have a table with the mock data added

SELECT * FROM stocks
 id | sym  | market |    name    |  cap
----+------+--------+------------+--------
  1 | AAPL | NASDAQ | Apple Inc. | $2.47T
  2 | MSFT | NASDAQ | Microsoft  | $2.32T

For next steps, see the guide on issuing a PostgreSQL query from the Stream Processor, continue to the querying guide.

Data examples

Another example using car sample data follows:

CREATE TABLE cars (id int, Name varchar(250), "Miles_per_Gallon" smallint, "Cylinders" smallint, "Displacement" smallint, "Horsepower" smallint, "Weight_in_lbs" smallint NOT NULL, "Acceleration" smallint, "Year" date NOT NULL, "Origin" character varying(60));
INSERT INTO cars VALUES (1, 'chevrolet chevelle malibu', 18, 8, 307, 130, 3504, 12, '1970-01-01', 'USA');
INSERT INTO cars VALUES (2, 'volkswagen 1131 deluxe sedan', 26, 4, 97, 46, 1835, 21, '1970-01-01', 'Europe');

The final table data should be as follows:

SELECT * FROM cars
id |             name             | Miles_per_Gallon | Cylinders | Displacement | Horsepower | Weight_in_lbs | Acceleration |    Year    | Origin
----+------------------------------+------------------+-----------+--------------+------------+---------------+--------------+------------+--------
  1 | chevrolet chevelle malibu    |               18 |         8 |          307 |        130 |          3504 |           12 | 1970-01-01 | USA
  2 | volkswagen 1131 deluxe sedan |               26 |         4 |           97 |         46 |          1835 |           21 | 1970-01-01 | Europe

PostgreSQL queries

This section provides an overview of integrating the Stream Processor with a PostgreSQL database.

Mock Database Setup

This example uses a mock database which can be setup by following the PostgreSQL setup guide

The Stream Processor provides a reader interface for issuing queries on a PostgreSQL database. The PostgreSQL Reader API can be used as a data source in a pipeline.

The following spec.q will run a select query against a "finance" database and write it to the console. The details of the database will be configured during deployment.

.qsp.run
    .qsp.read.fromPostgres["SELECT * FROM stocks"; "finance"]
    .qsp.write.toConsole[]

Deployment prerequisite

The example below requires a PostgreSQL database to be running in the cluster as setupm in the Kubernetes section of this tutorial.

jobname=$(curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" --arg pass "$PGPASSWORD" \
    '{
        name     : "psql",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : {
            KXI_SP_POSTGRES_SERVER   : "postgresql",
            KXI_SP_POSTGRES_PORT     : "5432",
            KXI_SP_POSTGRES_DATABASE : "finance",
            KXI_SP_POSTGRES_USERNAME : "postgres",
            KXI_SP_POSTGRES_PASSWORD : $pass
        }
    }' | jq -asR .)" | jq -r .id)

Once deployed, check the console output of the deployed spwork pod to see the result of the query.