Skip to content

PostgreSQL Queries

This guide provides an overview of integrating the Stream Processor with a PostgreSQL database.

Mock Database Setup

This example uses a mock database which can be setup by following this guide

The Stream Processor provides a reader interface for issuing queries on a PostgreSQL database. The .qsp.read.fromPostgres API can be used as a data source in a pipeline.

The following spec.q will run a select query against a "finance" database and write it to the console. The details of the database will be configured during deployment.

.qsp.run
    .qsp.read.fromPostgres["SELECT * FROM stocks"; "finance"]
    .qsp.write.toConsole[]

Deployment prerequisite

The example below requires initdb.sql from this tutorial to be in the current directory.

Using the following docker-compose.yaml, use docker-compose to launch the deployment.

version: "3.3"
services:
  psql:
    image: docker.io/bitnami/postgres:latest
    ports:
      - 5432:5432
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: iamsecure
    restart: always
    volumes:
      - ./initdb.sql:/docker-entrypoint-initdb.d/initdb.sql
  worker:
    image: registry.dl.kx.com/kxi-sp-worker:1.1.0
    volumes:
      - .:/opt/kx/app/data
    environment:
      KDB_LICENSE_B64:
      KXI_LOG_FORMAT: text
      KXI_SP_SPEC: /opt/kx/app/data/example.q
      KXI_SP_POSTGRES_SERVER: psql
      KXI_SP_POSTGRES_PORT: 5432
      KXI_SP_POSTGRES_DATABASE: finance
      KXI_SP_POSTGRES_USERNAME: postgres
      KXI_SP_POSTGRES_PASSWORD: iamsecure
    depends_on:
      - psql

To run the example, execute the following.

docker-compose up

The following spec.q will run a select query against a "finance" database and write it to the console. The details of the database will be configured during deployment.

.qsp.run
    .qsp.read.fromPostgres["SELECT * FROM stocks"; "finance"]
    .qsp.write.toConsole[]

To deploy the above pipeline in Kubernetes, first follow the setup for Kubernetes guide. The pipeline can be deployed using a port forwarded Coordinator service.

Deployment prerequisite

The example below requires a PostgreSQL database to be running in the cluster as setup in the Kubernetes section of this tutorial.

jobname=$(curl -X POST http://localhost:5000/pipeline/create -d \
    "$(jq -n  --arg spec "$(cat spec.q)" --arg pass "$PGPASSWORD" \
    '{
        name     : "psql",
        type     : "spec",
        config   : { content: $spec },
        settings : { minWorkers: "1", maxWorkers: "10" },
        env      : {
            KXI_SP_POSTGRES_SERVER   : "postgresql",
            KXI_SP_POSTGRES_PORT     : "5432",
            KXI_SP_POSTGRES_DATABASE : "finance",
            KXI_SP_POSTGRES_USERNAME : "postgres",
            KXI_SP_POSTGRES_PASSWORD : $pass
        }
    }' | jq -asR .)" | jq -r .id)

Once deployed, check the console output of the deployed spwork pod to see the result of the query.