Skip to content

Troubleshooting pipelines

Pipeline Logs

When a pipeline enters into an Errored state, the first point of action is to review the logs to see what the error is and where exactly in the flow of the pipeline it has occurred.

Understanding logs

When investigating an error in a pipeline, understanding the context of the log messages surrounding an error can be just as important as understanding the error itself. This is because the surrounding messages provide information about where exactly in the flow of the pipeline the error occurred. To get a better grasp of this, let's walkthrough some key log messages that occur throughout the lifecycle of a healthy pipeline:

  • Log Level: The log level columns tells the type of a log message:
    • INFO: Normal/expected messages for the running of a pipeline. This log level is used to provide inform the user about significant events that occur throughout the progress of their pipeline. When debugging an issue, understanding the INFO messages on the lead up to and after an error has occurred is important, as this gives context to where and when the error has occurred within the flow of your pipeline.
    • WARN: This log level is used to warn you that something has occurred that may impact the pipeline's ability to run successfully, either now or in the future. Messages of this type should be treated as a foreshadowing for potential future errors or unexpected behaviour.
    • ERROR: This log level is used to inform you that a significant error condition has been triggered or the application has ran into unexpected behaviour that has caused the pipeline's code to error. Typically error messages are indicative of a loss of application functionality or data (although that data is likely recoverable).
    • FATAL: This log level indicates that a highly significant error has occurred which either has resulted or results (if the pipeline continues) in the unrecoverable loss of data and/or functionality.
    • DEBUG: This log level is used to relay important events throughout the running of a pipeline, but at a more granular and technical level than the INFO log level. By default messages at this log level are not published, as they create noise and unnecessary bloat in the log files. See below for how to enable them.
    • TRACE: This log level is used to capture and log data as it flows through the operators of a pipeline. Like the DEBUG log level, TRACE messages are not published by default, as they are intended for debugging purposes only, and would otherwise bloat the log files during normal operation. See below for how to enable.
  • Container: The container column of the logs indicates which container the log message came from. For a pipeline, there are 2 types of containers you should be aware of:
    • spctl: This is the container for the SP Controller, which is responsible for spinning up workers and generally managing the state of a pipeline as it progresses from setting up through to starting, running and finishing. Each pipeline has 1 and only 1 SP Controller. Typically an error in this container is indicative that there was an issue setting up or starting the SP Workers associated with the pipeline.
    • spwork: This is the container for the SP Worker, which is responsible for actually running the operators you have configured for your pipeline. By default, a pipeline has 1 SP Worker, but this can be increased in the pipeline's settings. An error in this container typically indicates that an operator in your pipeline has thrown an error. Heavily dependent on the error itself, but as a general tip the first steps when investigating an error in the SP worker are to review your pipeline configuration and the data being ingested.
  • Pod: The pod column tells us which Kubernetes pod the log message is from. Filtering on this column can be useful when you want to review the log messages for an individual SP worker.
  • Time: Time the logged event occurred.
  • Component: The component that the log message came from.
  • Message: The log message itself.

SP Controller Expected log messages

When a new pipeline is kicked off, the first thing that is started is a new SP Controller, which is assigned to the new pipeline for the duration that it is running. Here are some of the key log messages you can expect from the SP Controller:

  1. Spawning %n workers
    • The first major action the controller takes after loading its configuration settings is to begin spawning the SP Workers that runs the pipeline. This log message tells you that this action has begun and tells you how many workers it plans on spawning (%n is replaced by the number of workers being spawned).
  2. Spawned worker, name=%s1, addr=%s2
    • This log message let's us know that the controller has successfully made a request to the Kubernetes Control Plane to create a new worker. Here %s1 is replaced by the name of the pod for the new worker and %s2 is replaced by the host and port (i.e. address) of the worker process running within the worker pod. It should be noted that this log message does not necessarily indicate that the worker pod has started successfully, it simply informs us that the Kubernetes Control Plane has accepted the request for new resources to be created.
  3. Initializing worker %s, handle=%n
    • This log message signifies that a worker pod has successfully started and has sent a request to the register with the controller. The controller responds by 'initializing' that worker by passing the details of the pipeline graph the worker is expected to run. Here %s is replaced by the name of the worker pod and %n is replaced by the qIPC handle that the controller has opened to this worker.
  4. Starting all registered workers
    • Once all workers have successfully registered and been initialized, the controller sends a message to each registered worker to begin running the pipeline graph they were sent during the initialization phase.
  5. All workers started
    • This log message indicates that all workers have successfully started their pipeline graph and the pipeline overall is now in a Running state.

SP Worker expected log messages

The SP Worker is started by the SP Controller, as discussed above, and is responsible for actually running the pipeline. Here are some of the key log messages you should expect from a SP Worker (remember that there can be more than 1 worker running depending on configuration):

  1. Successfully connected to parent, addr=%s, handle=%n
    • The first action a worker takes is to connect to its parent process i.e. the controller. You should look for this log message to confirm that a worker was able to successfully connect to the controller. Once connected, the worker sends a message to the controller requesting to register with it. Here %s is replaced by the host and port (i.e. address) of the controller process running within the controller pod and %n is replaced by the qIPC handle that the worker has opened to the controller.
  2. Initializing worker
    • As part of registration, the controller sends the worker the pipeline graph it is expected to run. The worker stores this pipeline graph it receives in preparation for setting up and running the pipeline.
  3. Finished worker initialization
    • Indicates the initialization phase is complete and was successful.
  4. Setting up pipeline...
    • After initialization, the worker enters into a setup phase, where it uses the pipeline graph it had previously stored to setup each operator in the pipeline.
  5. Finished pipeline setup
    • Indicates the setup phase is complete and was successful.
  6. Starting readers...
    • Signifies that all workers have now successfully completed registration with the controller, and the controller has therefore sent a message to each worker to start their pipeline readers (which equals starting the pipeline).
  7. Pipeline started successfully
    • Readers started successfully and the pipeline is now running.

Cloud Readers

To debug errors reading files, try downloading the file using the cloud provider's CLI to verify your credentials and the path.

Install the Azure CLI, then download the file

az storage blob download --account-name myAccount --account-key myAccountKey --container-name myContainer --name fileToDownload.csv --file downloadedName.csv

The error messages can be interpreted as follows

"...Failed to establish a new connection: [Errno -2] Name or service not known..." The account name is incorrect

"...Authentication failure. This may be caused by either invalid account key, connection string or sas token..." The account key is incorrect

"...The specified container does not exist..." The container name is wrong

"...The specified blob does not exist..." The file name is incorrect

If any of these appear, log into the Azure portal and ensure you have the correct information.

Install the AWS CLI and log in, then download the file

aws s3api get-object --bucket myBucket --key myFilePath/myFile.csv desiredDownloadedFileName.csv

The error messages can be interpreted as follows

"...Unable to locate credentials..." No credentials are set

"...AWS Access Key Id you provided does not exist..." The AWS Access Key ID is incorrect

"... An error occurred (SignatureDoesNotMatch)..." The AWS Secret Access Key is incorrect

"... Could not connect to the endpoint URL..." The region is incorrect

"_...The specified key does not exist." The filepath is incorrect

To resolve any of these except the incorrect file path, run aws configure then retry.

If you are using an access token, first export it to ACCESS_TOKEN. If you are instead using a service account, you'll need to install the gcloud CLI, then populate ACCESS_TOKEN with

gcloud auth activate-service-account --key-file=path/to/service-account-key.json
export ACCESS_TOKEN=$(gcloud auth print-access-token)

you can then check the path and permissions by attempting to download the first 100 bytes

curl --range 0-100 -H "Authorization: Bearer $ACCESS_TOKEN" "https://storage.googleapis.com/my-bucket/path/to/file.txt"

The section of the response can be interpreted as follows

Authentication required Confirm the access token is correct or service account key are correct

The specified bucket does not exist Access denied The project to be billed is associated with an absent billing account The project to be billed is associated with a closed billing account For the above errors, first confirm the bucket name before assuming a literal interpretation

The specified key does not exist. The path is incorrect

Cloud Writers

Writing to S3 uses multi-part upload, and requires these permissions

s3:PutObject s3:ListMultipartUploadParts s3:AbortMultipartUpload

You can use the AWS CLI to test that you have these permissions, and the correct credentials, region, and bucket.

Download and install the AWS CLI, then enter your credentials and region with

aws configure

To do a multipart upload, you'll need a file over 5MB. 5MB is the minimum chunk size for multipart uploads (excluding the last chunk). Split a file up into numbered parts by running the following. This will create files named part00, part01, etc.

split -b 5M -d myFile.csv part

Create a multipart upload, and replace <myUploadID> in subsequent commands with the upload ID returned by this command.

aws s3api create-multipart-upload --bucket my-bucket --key myFile.csv

Upload the first part, saving the ETag returned. Note that the part-number flag starts counting from 1

aws s3api upload-part --bucket my-bucket --key myFile.csv --part-number 1 --upload-id <myUploadID> --body part00
aws s3api upload-part --bucket my-bucket --key myFile.csv --part-number 2 --upload-id <myUploadID> --body part01

Confirm that you can list the parts

aws s3api list-parts --bucket my-bucket --key largefile.txt --upload-id <myUploadID>

Create a file fileparts.json with the ETags from the previous step

{
  "Parts": [
    {
      "ETag": "bca9f8a501948fa8eeb446f006c7cb4b",
      "PartNumber": 1
    },
    {
      "ETag": "9107cdcdfa39d6e3347cdd462c182be1",
      "PartNumber": 2
    }
  ]
}

And complete the upload

aws s3api complete-multipart-upload --bucket my-bucket --key largefile.txt --upload-id <myUploadID> --multipart-upload file://fileparts.json

If successful, this will return an object containing the file's URL on S3.

Next, test that you can abort an upload, replacing <myUploadID> in the second line with the ID returned by the first line.

aws s3api create-multipart-upload --bucket my-bucket --key myFile.csv
aws s3api abort-multipart-upload --bucket my-bucket --key largefile2.txt --upload-id <myUploadID>

If successful, nothing will be printed to the console.

Diagnostics API

When debugging, it is useful gaining a complete picture of the current state and configuration of the worker. This can be done by calling a REST API on the worker process or by executing a script inside its container. This section provides an overview on the available functionality.

REST API

The REST API is available as GET /diagnostics. The full API is documented in the worker OpenAPI reference.

In Kubernetes, you may need to port-forward the worker pod to make it available outside the cluster. This will make the worker available on port 7000 on your host machine.

kubectl port-forward <worker pod> 7000:7000
curl http://localhost:7000/diagnostics

The output contains the following information. Note this is a small subset of the response data and included as a reference only, the actual output contains many additional keys and information.

  {
    "system": {
      "disk": { ... },
      "env":  { ... }
    },
    "sp": {
      "name":    "...",
      "metadata": {
        "graph": {
          "vertices": [ ... ],
          "edges":    [ ... ]
        },
      },
      "spec": {
        "base":     { ... },
        "compiled": { ... }
      },
      "runtime":    "...",
      "state": {
        "app":      "...",
        "operator": "...",
        "global":   { ... }
      }
    }
  }

The most useful sections are:

  • system — host environment of the worker, including disk usage and environment variables.
  • sp.metadata.graph — the pipeline's vertices and edges, showing how operators are connected.
  • sp.spec — the pipeline specification, both as authored (base) and after compilation (compiled).
  • sp.runtime / sp.state — the current runtime status and operator/global state, useful for spotting where a pipeline is stalled or misconfigured.

The worker exposes three diagnostics endpoints:

  • GET /describe — quick summary of the pipeline. The best place to start when you want basic details without the larger output from /diagnostics.
  • GET /diagnostics — the full diagnostic payload. Returns a verbose state of the pipeline and its environment. Used for debugging low-level issues and attaching to support tickets.
  • GET /description — the pipeline graph in Graphviz .dot format. Useful for visualizing how data flows through the configured nodes.

After installing Graphviz, you can render the graph:

curl -sS http://localhost:7000/description > graph.dot
dot -Tpng graph.dot -o graph.png

Alternatively, paste the .dot output into an online viewer such as graph.flyte.org for a quick visualization.

Script

The diagnostics.sh script (located at /opt/kx/app/tools/diagnostics.sh inside the worker container) calls the diagnostics REST endpoints in sequence and captures each response to a file. By default it writes to /tmp/sp_diagnostics_<timestamp>/, including:

  • description.dot — output of GET /description
  • diagnostics.json — output of GET /diagnostics
  • describe.json — output of GET /describe
  • output.log — the commands that were run and where each response was written
  • (optional, with --streams) additional files per stream — see Options below

The target directory can be archived and sent to KX support for analysis. There is also a README.md in the tools directory with additional information on this and other included tools.

To run the script, exec into the worker and run:

bash /opt/kx/app/tools/diagnostics.sh

Options

Run diagnostics.sh --help for inline usage. The supported options are:

  • --port — worker port. Defaults to 7000.
  • --target — directory to write output files. Defaults to /tmp/sp_diagnostics_<timestamp>. Useful when you want a predictable location for archiving or running the script more than once.
  • --streams — comma-separated list of stream host:port pairs (for example rt-orders:8080,rt-trades:8081). For each stream, the script queries:

    The stream host can be found by checking the names of the running rt-* pods or containers in your deployment.

  • --help — print inline usage and exit.

Example combining all options:

bash /opt/kx/app/tools/diagnostics.sh --port 7000 --target /tmp/sp_diag --streams rt-orders:8080,rt-trades:8081

Pipeline Status Unresponsive

An unresponsive status indicates that the Pipeline SP Controller is not responding to HTTP requests. If you are seeing this status in your pipeline, view the pipeline logs to investigate. Some common causes of this status include:

Secret Not Found

For readers such as Amazon S3 you can set the credentials option to the Kubernetes secret name.

If the secret specified doesn't exist, the pipeline will go into an "Unresponsive" status. In such cases if you look at the logs you will see the following error:

MountVolume.SetUp failed for volume "nonexistentawscreds" : secret "nonexistentawscreds" not found

The error can be resolved by creating the missing secret.

user@A-LPTP-5VuBgQiX:~$ kubectl create secret generic --from-file ~/.aws/credentials nonexistentawscreds
secret/nonexistentawscreds created
user@A-LPTP-5VuBgQiX:~$

Once the missing secret has been created, the pipeline picks it up automatically, moves out of the "Unresponsive" status and progresses as normal.

Secret Name

The secret name nonexistentawscreds is just an example to highlight that the secret didn't exist. Please choose a more appropriate name when naming your secrets! E.g. awscreds.

Memory Management

Some pipelines can have large or complex memory requirements, which can result in excess memory usage or hitting memory limits. This section aims to cover the basics, address common issues, and provide some general troubleshooting tips.

  • Buddy allocation — memory is assigned in power-of-two sized blocks.

  • Reference counting — multiple objects can refer to the same memory without copying.

  • Garbage collection — runs in one of two modes:

  • deferred (default): when a reference count hits zero, blocks are freed and returned to the heap for reuse.
  • immediate: when a reference count hits zero, blocks larger than 64 MB are returned directly to the OS; smaller blocks are returned to the heap.

Refer to Available modes for more details.

You can change the mode by setting the garbage collection option, or trigger more aggressive collection on demand via the API. Overall memory usage can be inspected with .Q.w, or you can trigger additional garbage collection if you wish.

Pipeline workers can have their memory limits configured explicitly to increase the amount of available memory.

Specify using the kubeConfig object in the body of the create request:

...
"kubeConfig": {
    "worker": {
      "resources": {
        "limits":{
          "memory":"16Gi"
        },
      },
    }
}
...

Additional Garbage Collection

In cases where data is heavily nested, i.e. many higher-dimensional arrays or compound table columns, memory usage may be much higher than expected. Modifying or adding additional garbage collection may help to reduce this.

There are two main ways to reduce memory usage:

  • Enable immediate garbage collection - this releases blocks over 64MB with a reference count of 0 back to the OS. This happens automatically at the kdb+ level at a cost of latency or processing time.
  • Manual garbage collection - add code to your pipeline to run explicit garbage collection at specific times. This is more aggressive than automatic garbage collection so will have a larger latency overhead.

To enable immediate garbage collection, you should specify the following in your pipeline code:

system "g 1"

Manual garbage collection needs to be explicitly called as part of the pipeline lifecycle. You can define when it runs as part of your pipeline definition. Two examples would be to run on a timer that fires every minute, or called as part of pipeline checkpointing:

.qsp.timer.add[`gc;(`.Q.gc;::);60000;0]
.qsp.onPostCheckpoint[{.Q.gc[]}]

To inspect memory usage during pipeline execution, a map node can be used in conjunction with .Q.w:

.qsp.map[{show .Q.w[];x}]
Back to top