Friday, June 24, 2016

Simple Indexing Guide for WSO2 Data Analytics Server

Interactive search functionality in WSO2 Data Analytics Server is powered by Apache Lucene[1],  a powerful, high performing, full text search engine!.

Lucene index data were kept in a database in the first version of the Data Analytics Server[2] (3.0.0). But from DAS 3.0.1 onwards, Lucene indexes are maintained in the local filesystem.
Data Analytics Server(DAS) has a seperate server profile which enables it to perform as a dedicated indexing node. 

When a DAS node is started with indexing (disableIndexing=false), there are a quite a few things going on. But first let’s get the meta-information out of the way.

Configuration files locations:

Local shard config: <DAS-HOME>/repository/conf/analytics/local-shard-allocation-config.conf
Analytics config: <DAS-HOME>/repository/conf/analytics/analytics-config.xml

What is a Shard?

All the indexing data are partitioned across in partitions called shards ( default is 6). The partitioned data can be observed by browsing in <DAS-HOME>/repository/data/index_data/
Each and every record belongs to only one shard.

Digging in ...

Clustering:

In standalone mode, DAS behaves as a powerful indexer, but it truly shines as a powerhouse when it’s clustered.

A cluster of DAS indexer nodes behaves as a one giant and powerful distributed search engine. Search queries are distributed across all the nodes resulting in lightning fast result retrieval. Not only that, since indexes are distributed among the nodes, indexing data in the cluster is unbelievably fast. 

In the clustering mode, the aforementioned shards are distributed across the indexing nodes ( nodes in which indexing is enabled). For example for a cluster including 3 indexing nodes with 6 shards, each indexing node would be assigned two shards each (unless replication is enabled, more on that later).

The shard allocation is DYNAMIC!. If a new node starts up as an indexing node, the shard allocations would change to allocate some of the shards to the newly spawned indexing node. This is controlled using a global shard allocation mechanism.

Replication factor:
Replication factor can be configured in the analytics config file as indexReplicationFactor, which would decide how many replicas of each record ( or shard ) would be kept. Default is 1.

Manual Shard configuration:


There are 3 modes when configuring the local shard allocations of a node. They are,
  1. NORMAL
  2. INIT
  3. RESTORE

NORMAL means, the data for that particular shard would already be residing in that node.
For example, upon starting up a single indexing node, the shard configuration would look like follows,
0, NORMAL
1, NORMAL
2, NORMAL
3, NORMAL
4, NORMAL
5, NORMAL

This means, all the shards (0 through 5) are indexed successfully in that indexer node.

INIT allows you to tell the indexing node to index that particular shard.
If you restart the server after adding a shard as INIT,  that shard would be re-indexed in that node.
Ex: if the shard allocations are

1, NORMAL
2, NORMAL

and we add the line 4, INIT and restart the server,

1, NORMAL
2, NORMAL
4, INIT

This would index the data for the 4th Shard for that indexing node and you can see that it will be returned to the NORMAL state after that once indexing the 4th shard is done.

RESTORE provides you the opportunity to copy the indexed data from a different node/backup to another node and use that indexing data. This avoids re-indexing and reuses the already available indexing data. After successful restoring, that node is able to support queries on that corresponding shard as well.

Ex: for the same shard allocation above, if we copy the indexed data for the 5th Shard and add the line,
5, RESTORE
and restart the server, the node would then allocate the 5th shard to that node ( which would be used to search and index)

After restoring, that node would also index the incoming data for that shard as well.

FAQ!


How do I remove a node as an indexing node?


If you want to remove a node as an indexing node, then you have to restart that particular node with indexing disabled ( disableIndexing=true) and it will trigger the global shard allocations to be re-allocated, removing that node from the indexing cluster.

Then you have the option of restarting the other indexing nodes which will automatically distribute the shards among the available nodes again. 

Or you can manually assign the shards. Then you need to make sure to index the data for the shards that node took in other indexing node as you require.
You can view the shard list for that particular node in the local indexing shard config (refer above).

Then you can either use INIT or RESTORE methods to index those shards in other nodes (refer to the Manual Shard configuration section).

You must restart all the indexing servers for it to get the indexing updates after the indexing node has left.

Mistakenly started up an indexing node?


If you have mistakenly started up an indexing server, it will change the global configurations and has to be undone manually.
If the replication factor is equal to or greater than 1, you would still be able to query and get all the data even this node is down.

First of all, to remove a node as an indexing node, you need to delete the indexing data (optional).
Indexing data resides at: <DAS-HOME>/repository/data/indexing_data/

Second if you want to use the node in another server profile (more info on profiles: [3]), restart the server with the required profile.

Then follow the steps in the above question on "How do I remove a node as an indexing node?"

How do I know all my data are indexed?


The simplest way to do this would be to use the Data Explorer in DAS, refer to [4] for more information.

There, for a selected table, run the query “*:*” and it should return all the results with the total count.

For more information or queries please do drop by the mailing lists[4].


Thursday, June 2, 2016

Incremental Analytics With WSO2 DAS

This is the second blog post on WSO2 Data Analytics Server. The first post can be found at [1].

Introduction

The duration of the batch process is critical in production environments. For a product that does not support incremental processing, it needs to process the whole dataset in order to process the unprocessed data. With incremental processing, the batch job only processes the data partition that’s required to be processed, not the whole dataset (which has already been processed), which improves the efficiency drastically.

For example let’s say you have a requirement to summarize data for each day. The first time the summarization script is run, it would process the whole data set and summarize the data. That’s where the similarities end between a typical batch process and incremental analytics.

The next day when the script is run, the batch processing system without incremental analytics support would have to summarize the whole dataset in order to get the last days’ summarization. But with incremental processing, you would only process the last days’ worth of data and summarize, which reduces the overhead of processing the already processed data again.

Think of how it can improve the performance in summarizations starting from minutes running all the way to years.

Publishing events
Incremental analytics uses the timestamps of the events sent when when retrieving the data for processing. Therefore when defining streams for incremental analytics, you need to add an extra field to the event payload as _timestamp LONG to facilitate this.

When sending the events you have the ability to either add the timestamp to the _timestamp attribute or set it for each event at event creation.

Syntax


In DAS, in the spark script, when defining the table, you need to add extra parameters to the table definition for it to support incremental analytics.

If you do not provide these parameters, it will be treated as a typical analytics table and for each query which reads from that table, would get the whole table.

The following is an example in defining a spark table with incremental analytics.

create temporary table orders using CarbonAnalytics options (tableName "ORDERS", schema "customerID STRING, phoneType STIRNG, OrderID STRING, cost DOUBLE, _timestamp LONG -i", incrementalParams "orders, DAY");

And when you are done with the summarization, then you need to commit the status indicating the reading of the data is successfull. This is done via

INCREMENTAL_TABLE_COMMIT orders;

Parameters


incrementalParams has two required parameters and an optional parameter.
incrementalParams “uniqueID, timePeriod, #previousTimePeriods

uniqueID : REQUIRED
    This is the unique ID of the incremental analytics definition. When committing the change, you need to use this ID in the incremental table commit command as shown above.

timePeriod: REQUIRED (DAY/MONTH/YEAR)
    The duration of the time period that you are processing. Ex: DAY

If you are summarizing per DAY (the specified timePeriod in this case), then DAS has the ability to process the timestamp of the events and get the DAY they belongs to.

Consider the situation with the following received events list. The requirement is we need to get the total number of orders placed per each minute.

Customer ID
Phone Type
Order ID
Cost
_timestamp
1
Nexus 5x
33slsa2s
400
26th May 2016 12:00:01
12
Galaxy S7
kskds221
600
27th May 2016 02:00:02
43
iPhone 6s
sadl3122
700
27th May 2016 15:32:04
2
Moto X
sdda221s
350
27th May 2016 16:22:10
32
LG G5
lka2s24dkQ
550
27th May 2016 19:42:42

And the last processed event is,

12
Galaxy S7
kskds221
600
27th May 2016 15:32:04

In the summarized table for the day 27th May 2016, would be 2 since when the script ran last, there were only two events for that particular time duration and other events came later.

So when the script runs the next time, it needs to update the value for the time duration for the day of 27th May 2016.

This is where the timePeriod parameter is used. For the last processed event, DAS calculates the “time period” it belongs to and pulls the data from the beginning of that time period onwards.

In this case the last processed event
12
Galaxy S7
kskds221
600
27th May 2016 15:32:04

Would trigger DAS to pull data from 27th May 2016 00:00:00 onwards.

#previousTimePeriods - Optional (int)
    Specifying this value would allow DAS to pull from previous time periods onwards. For example, if you had set this parameter to 30, then it would fetch 30 more periods worth of data as well.

As per the above example, it would pull from 27th April 2016 00:00:00 onwards.

For more information or queries do drop by the mailing lists[2].

[1] http://sachithdhanushka.blogspot.com/2016/04/wso2-data-analytics-server-introduction.html
[2] http://wso2.com/products/data-analytics-server/