Streaming subway data to a Data Grid in Views
This walkthrough showcases the ability of kdb Insights Enterprise to visualize streaming data in real-time in a View, by streaming the latest co-ordinates of a set of NY City subway trains to a data grid.
In this walkthrough you will:
- Create a pipeline to ingest and transform the subway data
- Create a View to visualize the subway data in a Data Grid
A Kafka subway feed has been provided for use in this walkthrough. This feed generates live alerts for NYC subway trains, tracking which trains are stopped at stations. For each train it provides arrival time, station location coordinates, direction and route details.
Create a pipeline
To create the pipeline open the Pipeline editor by clicking [+] on the ribbon menu and clicking Pipeline, as shown below, or by clicking [+] beside Pipelines on the left-hand menu.
The pipeline editor opens, as shown below.
The pipeline you are setting up requires the following:
- Kafka Node to ingest the data
- Decoder Node to decode the data
- Map Node to convert the decoded data to a kdb+ table
- Transform Node to convert data fields to kdb+ compatible types
- Subscriber Node to provide real-time updates of data
- Environment Variable to ensure the subscriber node updates the stream every 10th of a second
Once these are configured you can save and deploy the pipeline. You can then:
Add a Kafka node
You begin your pipeline with a Kafka node to read the data from the subway feed we have provided.
-
Click-and-drag a Kafka node, from the Readers, into the workspace.
-
Select the Kafka node and add the following connection details to the Configure Kafka Node panel:
setting value Broker kafka.trykdb.kx.com:443 Topic subway Offset End Use TLS Unchecked Use Schema Registry Unchecked -
Expand the Advanced parameters section and select Advanced Broker Options.
-
Click the [+] to add an advanced configuration and enter the following key value pairs:
key value sasl.username demo sasl.password demo sasl.mechanism SCRAM-SHA-512 security.protocol SASL_SSL -
Click Apply to apply these changes to the Kafka node.
Add a Decoder node
Kafka event data is in JSON and must be decoded to a kdb+ friendly format (a kdb+ dictionary).
-
Click-and-drag a JSON decoder node, from the Decoders, into the workspace, and connect it to the Reader node.
-
Click Apply to keep the default JSON decoder settings.
Add a Map node
The decoded data needs to be converted to a kdb+ table. The Map node converts an incoming dictionary to a table using an enlist.
-
Click-and-drag a Map node, from the Functions, into the workspace, and connect it to the Decoder node.
-
Select the Map node and replace the code in the right-hand Configure Map Node property panel with the following code:
{[data] enlist data }
-
Click Apply to apply this code to the Map node.
Add a Transform Node
The next step is to add a Transform node which transforms the subway data fields to kdb+ compatible types and drops the fields that are not going to be displayed in the View.
Converting strings to symbols for use as keyed columns in the Subscriber node
The data from Kafka that is used in this example includes strings. The Subscriber node requires any columns defined as keyed columns to be symbols. This step is required to convert the strings to symbols.
If you are using a Subscriber node with your own data and the data in your keyed columns are already symbols this step is not necessary. See the next section for details on keyed columns.
-
Click-and-drag the Apply Schema node from the list of Transform nodes and connect it to the Decoder node.
-
In the Schema section click the [+] icon under the Add schema columns below text. Then add the following columns, leaving Parse Strings set to Auto for all columns:
column name column type arrival_time Timestamp stop_name Symbol stop_lat Float stop_lon Float direction_id Symbol route_long_name Symbol route_color Symbol -
Click Apply to apply changes to the node.
Add a Subscriber node
A Subscriber node emits a summarized view of the input data, with one record per unique combination of values in the list of keyed columns. The remaining columns are set to their latest values.
Once deployed, the emitted stream of data is available as a data source in a View. This is described in the Create a View section later.
In this example the node is configured so that it emits updates to the co-ordinates of trains per route and direction of travel (i.e. inbound or outbound).
-
Click-and-drag the Subscriber node, from the list of Writers into the central workspace, and connect it to the Apply Schema node.
-
Select the Subscriber node and set the Table to subway.
-
Add the following Keyed Column values:
value route_long_name direction_id Defining these keyed columns ensures that:
-
The output of the Subscriber node has one record per train route and direction.
-
You can filter these columns in the View, as you can only filter on Keyed Columns.
-
-
Set the Publish Frequency to 100. This determines how regularly the subscriber updates the web-socket in milliseconds.
-
Click Apply to apply these settings.
Save and deploy the pipeline
The pipeline is now configured and ready to be saved and deployed.
-
Enter subway-streaming in the Name field in the top left of the workspace.
-
Click Save & Deploy in the top panel.
-
Check the progress of the pipeline under the Running Pipelines panel of the Overview tab. The data is ready to visualize when the Status is set to Running. Note it may take a few minutes for the pipeline to start running.
Create a View
Now that the data is being streamed to a pipeline containing a Subscriber node, you can create a View with a Data Grid to display the locations of the subway trains that are stopped at stations.
-
Select Visualize on the Overview page, or View from the ribbon menu, as shown below.
-
Click-and-drag a Data Grid, which is the first component on the list, from the icon menu into the central workspace.
-
In the Data Grid component, Click to populate Data Source to open the Data editor.
-
Click New to create a new data source and name it streaming.
-
Click on the Streaming radio button and set the following properties:
value setting pipeline This is automatically populated with any running pipelines with a Subscriber node. Select subway-streaming. table This is automatically populated with tables associated with the pipeline selected. Choose subway. filters leave empty -
Click Execute to view the results in the Results panel at the bottom of the screen, as shown below. The streaming position values are displayed with updates visible as data is streamed.
-
Click Apply and then Select Item.
-
With the Data Grid selected, set the Sort Column, in the Basics properties, to route_long_name.
-
Save the View by clicking the Save Dashboard icon at the top of the workspace.
The Data Grid now displays a record per train showing the most recent stop along the NY City subway.
Filtering the Data Grid
Now that you have a View that is being updated with streamed data you can add filters, based on any of the Keyed Columns, to show a subset of the data.
You can only filter on the Keyed Columns, filtering on any other column causes a timeout.
-
With the Data Grid selected in the new View, click on Data Source value to open the Data dialog and choose the streaming Data Source from the list on the left-hand side.
-
Type in one of the filters below, which are examples of how filtering can be applied:
filter filter description {"direction_id":"inbound"}
All inbound trains {"route_long_name":"6 Avenue Express"}
All trains on the 6 Avenue Express route Current filter restrictions
- Filters must be defined using the following JSON format:
{"keyed_column":"value"}
. - Only one keyed column can be filtered on.
- Only one value can be applied to the keyed column.
- Filters must be defined using the following JSON format:
-
Click Execute to check that the results, displayed in the Results panel at the bottom of the screen, have applied the filter to the data.
-
Click Apply and then Select Item.
The Grid now lists records that match the filter being applied. The following example shows the grid being updated with data for all inbound trains only.
Filtering based on a drop-down
Having used a static filter to limit the Data Grid to a specific value in a keyed column, we can enhance this by using a drop-down to filter the Data Grid to a specific value from a list.
-
With the View open, select Drop Down List from the list of components. Drag and drop it above the Data Grid.
-
With the Drop Down List selected, expand the Items properties in the Basics properties.
-
Click + to add an item with Value and Text properties equal to inbound. Repeat for outbound.
-
Create a View State parameter for the selected value in the Drop Down List:
-
Click on the Selected Value item of the Drop Down List
-
Click New to add a new view state and click Rename to change the name to selected_direction.
-
Set the Default value to inbound
-
Click Select Item.
The selected_direction View State is now populated with the value chosen in the drop-down.
-
-
The Filter field value, for Streaming Data Sources, must be in JSON format. The following steps explain how to create a virtual query that takes the selected_direction view state as an input and updates a second view state, json_selected_direction, with the appropriate JSON.
-
In Design view, click View workspace, to deselect the components and display the Dashboard Properties.
-
Click on the Data Dialog icon at the top of the Dashboard properties tab on the right-hand side of the View.
-
Click New to add a new data source and click Rename to name it virtual.
-
Click Virtual and delete any data contained there. Then input the following in the code editor:
function (ddSelection, callback) { var toReturn = { "columns": ["kk", "vv"], "meta": {"kk":"11", "vv":"11"}, "rows":[{ "kk": "json", "vv": `{"direction_id":"${ddSelection}"}` }] } callback(toReturn); }
-
Delete any trailing blank spaces/lines after the code above.
-
Click on the eye below the code editor to the right of the ddSelection parameter name to open the View State screen.
-
Choose selected_direction from the list on the left and click Select Item to set selected_direction as the input to the Virtual query. The View State screen closes.
-
Click Execute to check the results. The results are displayed in the Results panel at the bottom of the screen.
-
Select the Mapping tab in the Results section.
-
Click + in the bottom right-hand side in the Mapping tab.
-
Enter json in the Key field of the new record.
-
Click on the eye in the View State column of the new record, to open the View State popup.
-
Create a new view state and rename it to json_selected_direction.
-
Click Select Item.
The image below shows the new virtual data source.
-
Click Apply and Close.
-
-
The next step is to update the Filter in the Data Grid to use the new json_selected_direction View State value.
-
Select the Data Grid and click on the Data Source to open the Data dialog.
-
Click on the eye in the far right of the Filter field and select json_selected_direction from the list of View States displayed.
- Click Select Item.
-
-
Click Apply and then Select Item to close the Data Source screen.
In Preview mode, choose the direction from the Drop-down List and the data grid is filtered appropriately. In the following example, the Direction selected is Inbound and so only Inbound trains are displayed:
Add a map
If you have an access to an API Key for Google Maps Platform you can add a Map component to your View to display the subway data.
This step is optional. If you want to proceed you must create an API Key for Google Maps Platform as described here.
To add a map to your View:
-
In the design mode for your View, search for the Map component in the list of components, on the left, and drag it into the workspace.
-
Click on the Map component and configure it as follows:
-
Click "set Google Maps JavaScript API Key in Map Key Property". The Map Key is saved as a view state inside your view.
-
Click New and name the node
mapkey
. -
In the Properties section for
mapkey
, leave Type set toSymbol
. Enter your API Key for Google Maps into the Default and Value properties and click Select Item. -
Centralize the map on New York by setting the values, in the following table, in the Map properties. These are illustrated in the screenshot below the table.
setting value CenterX 40.75 CenterY -73.98
-
-
Define the Data Source from which the component will get data:
-
Click on Data Source in the Points properties.
- Create a new data source and name it streaming-map.
!!! note "If you want to share your filtering options for the Data Grid, select the existing streaming
data source."
-
Click on the Streaming radio button and set the following properties:
value | setting --------- | ------- pipeline | subway-streaming table | subway filters | leave empty
-
Click Execute to check the results are displayed in the Results panel at the bottom of the screen, as shown below. This displays the streaming position values with updates visible as data is streamed.
-
Click Apply and then Select Item.
-
To ensure the trains are named, positioned and coloured on the map based on the streaming data set the Points properties as follows:
properties value Selected Attr route_long_name Latitude Data stop_lat Longitude Data stop_long Shape train Shape Colour (From DB) route_colour Cluster Unchecked -
Save the View by clicking the Save Dashboard icon at the top of the workspace.
Your View now contains a data grid and a map updating for all inbound and outbound trains, as illustrated below.