Skip to content

Kafka (Subway)

No kdb+ knowledge required

This example assumes no prior experience with q/kdb+ and you can replace the endpoints provided with other Kafka brokers to gain similar results.

What is Kafka?

Apache Kafka is an event streaming platform that can be easily published to and consumed by the KX Insights Platform.

In this example you will learn how to set up a pipeline to pull subway data from a real time Kafka feed.

Subway Dataset

The KX Insights Platform helps users easily consume Kafka natively, the Evangelism team has provided a real time demo Subway dataset stored using Kafka. This feed has live alerts for NYC Subway trains containing details such as arrival time of train, stop location coordinates, direction and route details.

Ingesting Data

Importing

Select Import from the Overview panel to start the process of loading in new data. Expression Select the Kafka node and enter the details as listed below to connect to the datasource.

Here are the broker and topic details for the subway feed on Kafka.

setting value
Broker* 34.130.174.118:9091
Topic* subway
Offset* End

Expression

Decoding

The event data on Kafka is of JSON type so you will need to use the related decoder to transform to a kdb+ friendly format. This will convert the incoming data to a kdb+ dictionary.

Select the JSON Decoder option. Expression

Transforming

The next screen that comes up is Apply Schema. This is a useful tool that transforms the upstream data to the correct datatype of the destination database.

In the "Apply A Schema" screen select Data Format = Table from the dropdown.

Next, click on the blue "+" icon next to "Parse Strings" and you should get a popup window called "Load Schema".

Expression

You can then select insights-demo database and subway table from the dropdown and select "Load".

Expression

Note, for any fields of type time, timestamp and string check the box that says "Parse Strings".

Parse

This indicates whether parsing of input string data into other datatypes is required. Generally for all time, timestamp, string fields you should have ‘Parse Strings’ ticked unless your input is IPC or RT.

Expression

Writing

Finally, from the Writer dropdown select the insights-demo database and the subway table to write this data to and select 'Open Pipeline'. Expression

Apply Function

Before we deploy our pipeline there is one more node to add. This data has been converted from incoming JSON format on the Kafka feed to a kdb+ friendly dictionary.

We need to further convert this to a kdb+ table before we deploy so it is ready to save to our database. We can do this by selecting Pipelines from the left hand side and adding a Function node Map.

Additional Functions

For readability additional nodes can be added to continue transforming the data, and moreover, if functions are predefined the name of the function can be provided here to improve reusability.

Expression

In this section you are transforming the kdb+ dictionary format using enlist. Copy and paste the below code into your Map node and select Apply.

{[data]
    enlist data
 }

You will need to Remove Edge by hovering over the connecting line between the Decoder and Transform nodes and right clicking. You can then add in links to the Function map node. Expression

Deploying

Next you can Save your pipeline giving it a name that is indicative of what you are trying to do. Then you can select Deploy.

Once deployed you can check on the progress of your pipeline back in the Overview pane where you started. When it reaches Status=Running then it is done and your data is loaded. Expression

Exploring

Select Explore from the Overview panel to start the process of exploring your loaded data.

Select insights-demo from the assembly dropdown on the top left hand side.

You can running the following SQL to retrieve count of the subway table.

SELECT COUNT(*) FROM subway
Define an Output Variable and then select Get Data to execute the query.

Expression

Rerunning this query a short while later you should notice the number increase as new messages appear on the Kafka topic and are ingested into the kdb+ table. You have just successfully setup a table consuming live events from a Kafka topic with your data updating in real time.

Refresh Browser

You may notice a delay when waiting for your SQL output but this is as expected due to the frequency of the live data. If this is the case, it is a good idea to refresh your browser and retry.

Troubleshooting

If the database or tables are still not outputting after refreshing your browser, try our Troubleshooting page.

Let's ingest some more data! Try next with the PostgreSQL Dataset on Health Data.