Skip to content

Apply predictive analytics to improve operations and reduce risks in manufacturing

For manufacturers, machine downtime can cost millions of dollars a year in lost profits, repair costs, and lost production time for employees. By embedding predictive analytics in applications, manufacturing managers can monitor the condition and performance of assets and predict failures before they happen.

The kdb Insights Enterprise exists to be the premier cloud-native platform for actionable, data-driven insights. The majority of data science efforts are spent wrangling data beforehand to build machine learning models and data visualizations to extract insights.

In this recipe you will apply compute logic to live data streaming in from the popular message service MQTT as well as use Deep Neural Network Regression to determine the likelihood of breakdowns.

Benefits

  Predictive analysis Use predictive analytics and data visualizations to forecast a view of asset health and reliability performance.
  Reduce risks Leverage data in real time for detection of issues with processes and equipment, reducing costs and maximizing efficiencies throughout the supply chain with less overhead and risk.
  Control logic Explore how to rapidly develop and visualize computations to extract insights from live streaming data.

Pre-requisites

Before you begin, you need access to a running instance of kdb Insights Enterprise.

Step 1: Build database

The goal of this step is to teach you how to build a database to store the data you need to ingest.

  1. Select Build a database from the home screen Getting Started page.

  2. Enter the name and desired size of your database and then select Next.

  3. Select Next again from the schema page.

    You do not need to enter any schema details on this page as you replace this default schema with one created in the upcoming steps 5-6.

  4. Select Save. At this stage your database should look like this.

    Do not deploy yet!

    Do not deploy this assembly until you replace the schema with the custom one for this recipe.

  5. Select the "+" icon from the toolbar at the top of the screen and then select Schema.

  6. Name your schema.

    Give the schema a new name

    Do not give it the same name as the database created in the previous section as a default schema already exists with this name.

  7. Select the Code View button on the top right hand side and replace with the custom schema provided.

    Click to expand for schema!
    Copy and paste these details into the Code View window and click Apply.
    [
        {
            "name": "sensors",
            "type": "partitioned",
            "primaryKeys": [],
            "prtnCol": "time",
            "updTsCol": "time",
            "sortColsDisk": [
                "time"
            ],
            "sortColsMem": [
                "time"
            ],
            "sortColsOrd": [
                "time"
            ],
            "columns": [
                {
                    "type": "timestamp",
                    "attrDisk": "parted",
                    "attrOrd": "parted",
                    "name": "time",
                    "primaryKey": false,
                    "attrMem": ""
                },
                {
                    "name": "flowplant",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "pressplant",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "tempplantin",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "tempplantout",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "massprecryst",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "tempprecryst",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "masscryst1",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "masscryst2",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "masscryst3",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "masscryst4",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "masscryst5",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "tempcryst1",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "tempcryst2",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "tempcryst3",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "tempcryst4",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "tempcryst5",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "temploop1",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "temploop2",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "temploop3",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "temploop4",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "temploop5",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "setpoint",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "contvalve1",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "contvalve2",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "contvalve3",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "contvalve4",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "contvalve5",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                }
            ]
        },
        {
            "columns": [
                {
                    "type": "timestamp",
                    "attrDisk": "parted",
                    "attrOrd": "parted",
                    "name": "time",
                    "primaryKey": false,
                    "attrMem": ""
                },
                {
                    "name": "model",
                    "type": "symbol",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                },
                {
                    "name": "prediction",
                    "type": "float",
                    "primaryKey": false,
                    "attrMem": "",
                    "attrOrd": "",
                    "attrDisk": ""
                }
            ],
            "primaryKeys": [],
            "type": "partitioned",
            "updTsCol": "time",
            "prtnCol": "time",
            "name": "predictions",
            "sortColsDisk": [
                "time"
            ],
            "sortColsMem": [
                "time"
            ],
            "sortColsOrd": [
                "time"
            ]
        }
    ]
    

  8. Select Submit.

    Your schema is saved and ready for use.

  9. Replace the default schema with the newly created one.

    To do this, navigate back to the assembly created in the previous steps 1-4 and select the new schema in Schema Configuration. Make sure to click Save to keep this change.

  10. Select Deploy.

    This takes a few seconds. When ready, the grey circle becomes a green tick and changes to Active status.

    That's it, you are now ready to ingest data!

Step 2: Ingest live MQTT data

MQTT is a standard messaging protocol used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc.

It can be easily consumed by kdb Insights Enterprise. The provided MQTT feed has live sensor readings that we will consume in this recipe.

  1. Select Import from the Getting Started panel to start the process of loading in new data.

  2. Select the MQTT node and enter the details as listed below to connect to the data source.

    Setting Value
    Broker* 34.130.174.118:1883
    Topic* livesensor
    Username* mqtt
    Use TLS* disabled

  3. Select the JSON Decoder option and select Next.

    The event data on MQTT is of JSON type so use the related decoder to transform to a kdb+ friendly format. This converts the incoming data to a kdb+ dictionary.

  4. Fill in the details in the Configure Schema screen.

    This screen is a useful tool that transforms the upstream data to the correct datatype of the destination database.

    1. Select Data Format of Table from the dropdown.
    2. Click on the blue "+" icon next to Parse Strings and you will get a popup window called Load Schema.

    3. Select sensors database and sensor table from the dropdown and select Load.

    4. Leave Parse Strings set to Auto for all fields and select Next.

  5. From the Writer dropdown select the manufacturing database and the sensors table to write this data to and select Open Pipeline.

  6. Add a new Function node Map to the pipeline.

    Why do I need this?

    Before you deploy the pipeline there is one more node to add. This data has been converted from incoming JSON format on the MQTT feed to a kdb+ friendly dictionary. You need to further convert this to a kdb+ table before deploying so it is ready to save to our database.

    Expression

    Now, transform the kdb+ dictionary format using enlist. Copy and paste the below code into your Map node and select Apply.

    {[data]
        enlist data
    }
    

    You need to Remove Edge by hovering over the connecting line between the Decoder and Transform nodes and right clicking. You can then add in links to the Function map node.

  7. Click Save on your pipeline and give it a name that is indicative of what you are trying to do. Then you can select Deploy.

    You are prompted at this stage to enter a password for the MQTT feed.

    setting value
    password kxrecipe

    Once deployed you can check on the progress of your pipeline back in the Overview pane where you started. When it reaches Status=Running then it is done and your data is loaded.

    Trouble deploying your pipeline?

    When running Deploy, choose the Test option to debug each node.

    How to run a test deployment

  8. Select Query from the Overview panel to start the process of exploring your loaded data.

    1. Select Query on the top of the page. From the provided dropdowns, you can then select the table, start and end times and define a variable name that can be used in the scratchpad.

    2. When you have populated the dropdowns, select Get Data to execute the query. Data appears in the below console output.

      You can view the output as a Table and also as a Visual element.

    3. Select the Visual tab and enter the following code to only return temperature fields.

      select time,tempcryst1,tempcryst2,tempcryst3,tempcryst4,tempcryst5 from s1
      

      Select Run Scratchpad to see the dataset plotted across time.

      It is clear from the graph that there is a cyclical pattern to the temperature in the crystallizers fluctuating between 7-20 degrees Celcius.

    4. Run a similar query but select the mass fields instead.

      select time,masscryst1,masscryst2,masscryst3,masscryst4,masscryst5 from s1
      

      There seems to be a similar pattern to the mass levels in the crystallizers, where they are fluctuating between 0-20000kg on a cycle.

You have successfully setup a table consuming live sensor readings from a MQTT topic with your data updating in real time.

Step 3: Standard compute logic

  1. Create control chart limit thresholds

    Use Control Chart Limits to analyze data over time. It is possible to identify if process is predictable by analyzing its data, Upper Control Limit (UCL) and Lower Control Limit (LCL).

    Control limits are used to avoid troubleshooting things that are not real problems on the process line. Using this chart it is possible to identify points where the process didn't match user specifications.

    You will calculate the 3 sigma (three standard deviations from mean) upper and lower control limits (UCL and LCL) on one of the temperature fields. This means that 99.7% of data are within the range and outliers can be identified.

    // Calculating 5 new values, most notably ucl and lcl where
    // ucl - upper control limit
    // lcl - lower control limit
    select lastTime:last time,
        lastVal: (1.0*last tempcryst3),
        countVal: count tempcryst3,
        ucl: avg tempcryst3 + (3*dev tempcryst3), 
        lcl: avg tempcryst3 - (3*dev tempcryst3)
        by xbar[10;time.minute]
        from s1
    

    q/kdb+ functions explained

    If any of the above code is new to you, don't worry, we have detailed the q functions used above:

    • last to get last record
    • * to multiply
    • count returns count of items
    • avg average value of list
    • dev standard deviation
    • xbar buckets time into intervals

    The above query selects 3 standard deviations from the average control threshold, this enables engineers to drill down into specific timeseries datapoints.

    The resulting chart shows a controlled variation.

  2. Aggregate over multiple time windows

    Let's further extend the previous query to aggregate over two rolling time intervals. In this example we split the calculated fields with some to be aggregated over the first window (lastTime, lastVal, countVal) and the others to be aggregated over the limit fields (ucl, lcl).

    // Using asof join to join 2 tables 
    // with differing aggregation windows 
    aj[`minute;
        // table 1 aggregates every 1 minute
        select lastTime : last time,
            lastVal : last tempcryst3,
            countVal : count tempcryst3 
            by xbar[1;time.minute] from s1;
        // table 2 aggregates every 60 minute
        select ucl : avg[tempcryst3] + (3*dev tempcryst3),
            lcl : avg[tempcryst3] - (3*dev tempcryst3)
            by xbar[60;time.minute] from s1]
    

    q/kdb+ functions explained

    aj is a powerful timeseries join, also known as an asof join, where a time column in the first argument specifies corresponding intervals in a time column of the second argument.

    This is one of two bitemporal joins available in kdb+/q.

    To get an introductory lesson on bitemporal joins, view the free Academy Course on Table Joins.

    This enables us to look at sensor values every minute, with limits calculated every 60 minutes providing smoother accurate more control limits.

  3. Customize query by using parameters

    This analytic can be easily parameterized to enable custom configuration for the end user.

    // table = input data table to query over 
    // sd = standard deviations required
    // w1 = window one for sensor readings to be aggregated by
    // w2 = window two for limits to be aggregated by
    controlLimit:{[table;sd;w1;w2;col1]
        aj[`minute;
        // table 1
        select lastTime : last time,
            lastVal : last tempcryst3,
            countVal : count tempcryst3 
            by xbar[w1;time.minute] from table;
        // table 2
        select ucl : avg[tempcryst3] + (sd*dev tempcryst3),
            lcl : avg[tempcryst3] - (sd*dev tempcryst3)
            by xbar[w2;time.minute] from table]
    }
    

    q/kdb+ functions explained

    {} known as lambda notation allows us to define functions.

    It is a pair of braces (curly brackets) enclosing an optional signature (a list of up to 8 argument names) followed by a zero or more expressions separated by semicolons.

    To get an introductory lesson, view the free Academy Course on Functions.

    Then the user can easily change the desired windows and standard deviations required to plot variations of the dataset.

    // By changing the parameter for w1 we can see the input sensor
    // signal becoming smoother the larger we make it
    controlLimit[s1;3;1;60]
    controlLimit[s1;3;10;60]
    controlLimit[s1;3;20;60]
    

    This powerful feature enables you to calculate limits on the fly and easily customize the inputs to your own requirements.

Step 4: Apply deep neural network regression model

In this final stage, you will extend the pipeline created earlier in this recipe and apply a trained model stored in the cloud to the incoming streaming data.

The steps in blue in the image above are performed outside kdb Insights Enterprise. For the purposes of this recipe the model has been trained already and pushed to the Cloud Registry for you.

The model was trained used tensorflow neural networks and an xgboost model. The predictions of these two models were then aggregated to get a further performance boost.

Did you know?

It is possible to train the model inside kdb Insights Enterprise. We could even pull the training dataset directly from kdb Insights Enterprise.

We are doing this for the purposes of this recipe to make it easier for you to follow along as Machine Learning knowledge is required for the training of the model.

  1. Start by duplicating the pipeline created earlier in Step 2.

    Give this a new name in the window that pops up and select Ok.

  2. Define global variables to use in this pipeline.

    To do this, click on the whitespace in the pipeline window and expand the Advanced tab.

    ewmaLag100:0n;
    ewmaLead100:0n;
    ewmaLag300:0n;
    ewmaLead300:0n;
    timeSince:0n;
    started:0b;
    back:0n;
    

    Copy and paste in the above code to the Global Code section in Advanced.

  3. Split incoming data.

    The next thing you will do is split the incoming data into two. One feeds into the KX Insights Database as before and the other makes up your new Machine Learning Workflow.


    Do this by adding a Function node Split.

    As before you can Remove Edge by hovering over the connecting line between the Transform and Writer nodes and right clicking. You can then add in links to the Function split node and the link back to the Writer node.

  4. Add spike detection logic

    Next, you will add some custom code creating new fields to calculate when the mass of the product goes from below 1000kg to above 1000kg. This is the spike threshold. It is represented in the new column timePast, which is the time interval since the last spike.

    {[data]
        data:-1#data;
        $[(first[data[`masscryst3]]>1000)&(not null back)&(back<1000);
        [timeSince::0;
            back::first[data[`masscryst3]];
            tmp:update timePast:timeSince from data];
        [timeSince::timeSince+1;
            back::first[data[`masscryst3]];
            tmp:update timePast:timeSince from data]
        ];
        select time,timePast,tempcryst4,tempcryst5 from tmp
        }
    

    Drag in an another Function map node, copy and paste in the above code and select Apply.

    Take this opportunity to right-click the new node and rename it to something representative of its function. This is a good idea as you need to add a few more map nodes and it helps to determine which is which at a glance.

  5. Add feature nodes

    Next, you will add some more custom code to create some feature nodes for our model. We will calculate some moving averages of temperatures in crystallizers number 4.

    {[data]
        a100:1%101.0;
        a300:1%301.0;
        $[not started;
        [ewmaLag100::first data`tempcryst5;
        ewmaLead100::first data`tempcryst4;
        ewmaLag300::first data`tempcryst5;
        ewmaLead300::first data`tempcryst4;
        started::1b];
        [ewmaLag100::(a100*first[data[`tempcryst5]])+(1-a100)*ewmaLag100;
        ewmaLead100::(a100*first[data[`tempcryst4]])+(1-a100)*ewmaLead100;
        ewmaLag300::(a300*first[data[`tempcryst5]])+(1-a300)*ewmaLag300;
        ewmaLead300::(a300*first[data[`tempcryst4]])+(1-a300)*ewmaLead300
        ]
        ];
        t:update ewmlead300:ewmaLead300,ewmlag300:ewmaLag300,ewmlead100:ewmaLead100,ewmlag100:ewmaLag100 from data;
        select from t where not null timePast
    }
    

    Drag in an another Function map node, copy and paste in the above code and select Apply.

  6. Pull in the machine learning model

    Everything up to this point has been in preparation for this step. Now we are ready to add our Machine Learning model.

    1. Drag in the Machine Learning node called Predict using Registry. Click the image below to zoom in.

    2. Select the "+" icon highlighted in the image and add the following features as calculated in the previous nodes.

    3. Enter the prediction configuration details as follows and select Apply:

      setting value
      Prediction Column Name yhat
      Registry Type AWS
      Registry Path s3://qrisk

  7. Add metadata

    Next, you will add some more custom code to add some metadata.

    {[data]
        tmp:?[update model:`ANN from data;();0b;`time`model`prediction!`time`model`];
        update "f"$prediction from tmp
    }
    

    Drag in an another Function map node, copy and paste in the above code and select Apply.

  8. Write to database

    Add a Writer node to the KX Insights Database. Select the manufacturing database and the predictions table to write this data to and select Apply.

  9. Define environment variables

    Select Save and in the window that pops up select the Environment Variables option and enter the following details required to access the Machine Learning model saved on AWS Cloud Storage.

    variable value
    AWS_ACCESS_KEY_ID AKIAZPGZITYXCEC35GF2
    AWS_SECRET_ACCESS_KEY X4qk/NECNmKhyO24AlEzhbzoY84bSmo7f6W7AfER
    AWS_REGION eu-west-1

    Select Save.

  10. Deploy

    If your old pipeline from Step 2 is still running, stop it by selecting Teardown at this point before deploying the new pipeline as the steps are repeated.

    Now you can select Deploy. You are prompted to enter a password for the MQTT feed.

    setting value
    password kxrecipe

    Once deployed, check on the progress of your pipeline back in the Overview pane where you started. When it reaches Status=Running then the prediciton model has begun.

    When will my data be ready to Query?

    It will be some time (~10-20 minutes) before the new predictions table is populated as the first spike in the dataset must occur before the timePast model parameter is calculated. Remember, this is the time interval since the last spike.

    When the dataset is populated depends on the spike conditions you have set and the frequency of those occurrences in your specific dataset.

  11. Query

    Select Query from the Getting Started panel to start the process of exploring your loaded data.

    Select Query at the top of the page. From the provided dropdowns, select the predictions table, start and end times and define a variable name that can be used in the scratchpad.

    When you have populated the dropdowns, select Visual and then Get Data to execute the query. Data appears in the below output.

    Let's next plot these predictions against the real data to see how well the model performs.

    select time,
        temp:20*tempcryst4,      // adjust temp by a factor of 20 for visulization purposes
        prediction 
        from aj[`time;s1;p1]        // joining sensors input data with predictions table on time
    

    While not perfect, the model fits fairly well the incoming real dataset by how closely the red line (prediction) matches the blue line (real data). The model would perform better with more data.

    This prediction gives the manufacturer advance notice of when the next cycle begins and predicted temperatures. This type of forward forecasting is useful to inform the manufacturer of the levels of filtration aids required in advance. In a typical food processing plant these levels can be adjusted in relation to the actual temperatures and pressures. By predicting these parameters before the product reaches the filters, the manufacturer can alter the dosing rate in advance and maximise throughput/yields.

    This and other prediction models can save millions of dollars a year in lost profits, repair costs, and lost production time for employees. By embedding predictive analytics in applications, manufacturing managers can monitor the condition and performance of assets and predict failures before they happen.