Kafka (Subway)
No kdb+ knowledge required
This example assumes no prior experience with q/kdb+ and you can replace the endpoints provided with other Kafka brokers to gain similar results.
What is Kafka?
Apache Kafka is an event streaming platform that can be easily published to and consumed by the KX Insights Platform.
In this example you will learn how to set up a pipeline to pull subway data from a real time Kafka feed.
Subway Dataset
The KX Insights Platform helps users easily consume Kafka natively, the Evangelism team has provided a real time demo Subway dataset stored using Kafka. This feed has live alerts for NYC Subway trains containing details such as arrival time of train, stop location coordinates, direction and route details.
Ingesting Data
Importing
Select Import from the Overview panel to start the process of loading in new data. Select the Kafka node and enter the details as listed below to connect to the datasource.
Here are the broker and topic details for the subway feed on Kafka.
setting | value |
---|---|
Broker* | 34.130.174.118:9091 |
Topic* | subway |
Offset* | End |
Decoding
The event data on Kafka is of JSON type so you will need to use the related decoder to transform to a kdb+ friendly format. This will convert the incoming data to a kdb+ dictionary.
Select the JSON Decoder option.
Transforming
The next screen that comes up is Apply Schema. This is a useful tool that transforms the upstream data to the correct datatype of the destination database.
In the "Apply A Schema" screen select Data Format = Table from the dropdown.
Next, click on the blue "+" icon next to "Parse Strings" and you should get a popup window called "Load Schema".
You can then select insights-demo
database and subway
table from the dropdown and select "Load".
Note, for any fields of type time, timestamp and string check the box that says "Parse Strings".
Parse
This indicates whether parsing of input string data into other datatypes is required. Generally for all time, timestamp, string fields you should have ‘Parse Strings’ ticked unless your input is IPC or RT.
Writing
Finally, from the Writer dropdown select the insights-demo
database and the subway
table to write this data to and select 'Open Pipeline'.
Apply Function
Before we deploy our pipeline there is one more node to add. This data has been converted from incoming JSON format on the Kafka feed to a kdb+ friendly dictionary.
We need to further convert this to a kdb+ table before we deploy so it is ready to save to our database. We can do this by selecting Pipelines
from the left hand side and adding a Function node Map.
Additional Functions
For readability additional nodes can be added to continue transforming the data, and moreover, if functions are predefined the name of the function can be provided here to improve reusability.
In this section you are transforming the kdb+ dictionary format using enlist. Copy and paste the below code into your Map node and select Apply.
{[data]
enlist data
}
You will need to Remove Edge by hovering over the connecting line between the Decoder and Transform nodes and right clicking. You can then add in links to the Function map node.
Deploying
Next you can Save your pipeline giving it a name that is indicative of what you are trying to do. Then you can select Deploy.
Once deployed you can check on the progress of your pipeline back in the Overview pane where you started. When it reaches Status=Running
then it is done and your data is loaded.
Exploring
Select Explore
from the Overview
panel to start the process of exploring your loaded data.
Select insights-demo
from the assembly dropdown on the top left hand side.
You can running the following SQL to retrieve count of the subway
table.
SELECT COUNT(*) FROM subway
Define an Output Variable
and then select Get Data
to execute the query.
Rerunning this query a short while later you should notice the number increase as new messages appear on the Kafka topic and are ingested into the kdb+ table. You have just successfully setup a table consuming live events from a Kafka topic with your data updating in real time.
Refresh Browser
You may notice a delay when waiting for your SQL output but this is as expected due to the frequency of the live data. If this is the case, it is a good idea to refresh your browser and retry.
Troubleshooting
If the database or tables are still not outputting after refreshing your browser, try our Troubleshooting page.
Let's ingest some more data! Try next with the PostgreSQL Dataset on Health Data.