Skip to content

Coordinator

The Coordinator REST OpenAPI documentation is available below:

Note

The Coordinator service is only used within Kubernetes.

To interact with the Stream Processor Coordinator, either use its Kubernetes service name within the cluster, or port-forward to access it locally.

$ kubectl port-forward sp-kxi-sp-0 5000

Create a pipeline

First, create a spec file using the Pipeline API.

$ cat spec.q
.qsp.run
    .qsp.read.fromCallback[`upd]
    .qsp.write.toConsle[]

Then, submit the job to the Coordinator.

$ curl -X POST localhost:5000/pipeline/create -d "{\"name\":\"file\",\"type\":\"spec\",\"config\":{\"content\":$(jq -asR < spec.q)},\"settings\": {\"minWorkers\":1}}"

See the available configuration options for information about available configuration.

The REST call will return the ID of the pipeline, which can be used in other REST calls for obtaining status information as the pipeline is brought online.

After submitting, you can watch the Controller and Worker StatefulSets spin up within the cluster and connect themselves together.

Get pipelines

Return basic information about all pipelines.

$ curl localhost:5000/pipelines | jq
[
  {
    "id": "file-gmhxu97kpo",
    "name": "file",
    "created": "2021-08-14T21:04:35.631193507",
    "state": "RUNNING"
  }
]

Get details of pipelines

Return more detailed information about all pipelines, including aggregated performance metrics.

curl localhost:5000/details | jq
[
  {
    "id": "file-gmhxu97kpo",
    "name": "file",
    "created": "2021-08-14T21:04:35.631193507",
    "lastSeen": "2021-08-14T21:13:46.342579775",
    "state": "RUNNING",
    "metrics": {
      "eventRate": 119400.708,
      "bytesRate": 311912.823,
      "latency": 18.0915
    }
  }
]

Get pipeline status

$ curl localhost:5000/pipeline/status/file-gmhxu97kpo
"Running"

Tear down a pipeline

Tear down a single pipeline. This will remove all associated workloads and Kubernetes resources related to the pipeline.

$ curl -X POST localhost:5000/pipeline/teardown/<name>

Alternatively, use kubectl directly with the pipeline name:

$ kubectl delete rs,pod,svc,cm,servicemonitor,pvc,statefulset -l pipeline=$name