Incremental Analytics With WSO2 DAS

This is the second blog post on WSO2 Data Analytics Server. The first post can be found at [1].


The duration of the batch process is critical in production environments. For a product that does not support incremental processing, it needs to process the whole dataset in order to process the unprocessed data. With incremental processing, the batch job only processes the data partition that’s required to be processed, not the whole dataset (which has already been processed), which improves the efficiency drastically.

For example let’s say you have a requirement to summarize data for each day. The first time the summarization script is run, it would process the whole data set and summarize the data. That’s where the similarities end between a typical batch process and incremental analytics.

The next day when the script is run, the batch processing system without incremental analytics support would have to summarize the whole dataset in order to get the last days’ summarization. But with incremental processing, you would only process the last days’ worth of data and summarize, which reduces the overhead of processing the already processed data again.

Think of how it can improve the performance in summarizations starting from minutes running all the way to years.

Publishing events
Incremental analytics uses the timestamps of the events sent when when retrieving the data for processing. Therefore when defining streams for incremental analytics, you need to add an extra field to the event payload as _timestamp LONG to facilitate this.

When sending the events you have the ability to either add the timestamp to the _timestamp attribute or set it for each event at event creation.


In DAS, in the spark script, when defining the table, you need to add extra parameters to the table definition for it to support incremental analytics.

If you do not provide these parameters, it will be treated as a typical analytics table and for each query which reads from that table, would get the whole table.

The following is an example in defining a spark table with incremental analytics.

create temporary table orders using CarbonAnalytics options (tableName "ORDERS", schema "customerID STRING, phoneType STIRNG, OrderID STRING, cost DOUBLE, _timestamp LONG -i", incrementalParams "orders, DAY");

And when you are done with the summarization, then you need to commit the status indicating the reading of the data is successfull. This is done via



incrementalParams has two required parameters and an optional parameter.
incrementalParams “uniqueID, timePeriod, #previousTimePeriods

    This is the unique ID of the incremental analytics definition. When committing the change, you need to use this ID in the incremental table commit command as shown above.

    The duration of the time period that you are processing. Ex: DAY

If you are summarizing per DAY (the specified timePeriod in this case), then DAS has the ability to process the timestamp of the events and get the DAY they belongs to.

Consider the situation with the following received events list. The requirement is we need to get the total number of orders placed per each minute.

Customer ID
Phone Type
Order ID
Nexus 5x
26th May 2016 12:00:01
Galaxy S7
27th May 2016 02:00:02
iPhone 6s
27th May 2016 15:32:04
Moto X
27th May 2016 16:22:10
27th May 2016 19:42:42

And the last processed event is,

Galaxy S7
27th May 2016 15:32:04

In the summarized table for the day 27th May 2016, would be 2 since when the script ran last, there were only two events for that particular time duration and other events came later.

So when the script runs the next time, it needs to update the value for the time duration for the day of 27th May 2016.

This is where the timePeriod parameter is used. For the last processed event, DAS calculates the “time period” it belongs to and pulls the data from the beginning of that time period onwards.

In this case the last processed event
Galaxy S7
27th May 2016 15:32:04

Would trigger DAS to pull data from 27th May 2016 00:00:00 onwards.

#previousTimePeriods - Optional (int)
    Specifying this value would allow DAS to pull from previous time periods onwards. For example, if you had set this parameter to 30, then it would fetch 30 more periods worth of data as well.

As per the above example, it would pull from 27th April 2016 00:00:00 onwards.

For more information or queries do drop by the mailing lists[2].



Popular Posts