Query Manager Support

The Query Manager (QM) is a Delta Control process that sits between clients and the database layer to manage data subscriptions to streaming analytics. It manages overlapping subscriptions to reduce load on the database layer, maintaining the list of distinct subscriptions and mapping these to the connected devices and clients. Streaming analytics are modelled on the idea of pushing data to clients, instead of the more typical request/response calls.

Library support for this feature is encapsulated in the IStreamingAnalyticsService property of a DeltaStream instance.

Before use, connectivity must be established. This will automatically happen as part of the internal workflow of the Service.Start and Service. startStreamingAnalyticsService methods.

Below shows the SubscribeToStreamAnalytic method:

public UUID subscribeToStreamingAnalytic(string name, Map<string,object> parameters, string target, StreamingCallBack onData)
        {     
            try
            {
             LOG.debug(String.format("Subscribing to '%s' streaming analytic", name));

             var count = parameters.Count;
             if(streamingAnalyticsService == null)
             throw new IllegalStateException("The streaming service is null.");
             ArrayList<String> fields = new ArrayList<>();
             ArrayList<Object> values = new ArrayList<>();
             for(String key : parameters.keySet()){
             fields.add(key);
             values.add(parameters.get(key));
             }
             c.Dict dict = new c.Dict(fields.toArray(), values.toArray());
             Object[] params = {dict};
             UUID id = UUID.randomUUID();

             ISubscriptionMessageListener listener = new ISubscriptionMessageListener() {
                @Override
                public void update(UpdateMessage updateMessage) {
                c.Flip flip = (c.Flip) updateMessage.getData();
                QueryResult queryResult = new QueryResult(flip);
                onData.update(id, queryResult);
                }
                @Override
                public void ack(AckMessage ackMessage) {
                }
                @Override
                public void nack(NackMessage nackMessage) {
                }
                @Override
                public void status(StatusMessage statusMessage) {
                }
            };
            Subscription sub = streamingAnalyticsService.startStreaming(name, params, target, lis-tener);
            streamingSubscriptions.put(id, sub);
            return id;
        }
            catch(Exception e)
            {
                String msg = toString() + " error subscribing to streaming analytic.";
                LOG.error(msg, e);
                throw e;
            }      
        }

To invoke the SubscribeToStreamAnalytic method please use the following:

UUID id = service.subscribeToStreamingAnalytic("getTicks", parameters, "", callback);
Thread.sleep(15000);

Note

For Kx Refinery environments running Hot-Hot, the data will continue to be streamed by the secondary side if the primary side fails. This is handled automatically by the API and Kx Refinery (assuming the connection was established with the secondary side during subscription.