Friday, June 24, 2016

Simple Indexing Guide for WSO2 Data Analytics Server

Interactive search functionality in WSO2 Data Analytics Server is powered by Apache Lucene[1],  a powerful, high performing, full text search engine!.

Lucene index data were kept in a database in the first version of the Data Analytics Server[2] (3.0.0). But from DAS 3.0.1 onwards, Lucene indexes are maintained in the local filesystem.
Data Analytics Server(DAS) has a seperate server profile which enables it to perform as a dedicated indexing node. 

When a DAS node is started with indexing (disableIndexing=false), there are a quite a few things going on. But first let’s get the meta-information out of the way.

Configuration files locations:

Local shard config: <DAS-HOME>/repository/conf/analytics/local-shard-allocation-config.conf
Analytics config: <DAS-HOME>/repository/conf/analytics/analytics-config.xml

What is a Shard?

All the indexing data are partitioned across in partitions called shards ( default is 6). The partitioned data can be observed by browsing in <DAS-HOME>/repository/data/index_data/
Each and every record belongs to only one shard.

Digging in ...

Clustering:

In standalone mode, DAS behaves as a powerful indexer, but it truly shines as a powerhouse when it’s clustered.

A cluster of DAS indexer nodes behaves as a one giant and powerful distributed search engine. Search queries are distributed across all the nodes resulting in lightning fast result retrieval. Not only that, since indexes are distributed among the nodes, indexing data in the cluster is unbelievably fast. 

In the clustering mode, the aforementioned shards are distributed across the indexing nodes ( nodes in which indexing is enabled). For example for a cluster including 3 indexing nodes with 6 shards, each indexing node would be assigned two shards each (unless replication is enabled, more on that later).

The shard allocation is DYNAMIC!. If a new node starts up as an indexing node, the shard allocations would change to allocate some of the shards to the newly spawned indexing node. This is controlled using a global shard allocation mechanism.

Replication factor:
Replication factor can be configured in the analytics config file as indexReplicationFactor, which would decide how many replicas of each record ( or shard ) would be kept. Default is 1.

Manual Shard configuration:


There are 3 modes when configuring the local shard allocations of a node. They are,
  1. NORMAL
  2. INIT
  3. RESTORE

NORMAL means, the data for that particular shard would already be residing in that node.
For example, upon starting up a single indexing node, the shard configuration would look like follows,
0, NORMAL
1, NORMAL
2, NORMAL
3, NORMAL
4, NORMAL
5, NORMAL

This means, all the shards (0 through 5) are indexed successfully in that indexer node.

INIT allows you to tell the indexing node to index that particular shard.
If you restart the server after adding a shard as INIT,  that shard would be re-indexed in that node.
Ex: if the shard allocations are

1, NORMAL
2, NORMAL

and we add the line 4, INIT and restart the server,

1, NORMAL
2, NORMAL
4, INIT

This would index the data for the 4th Shard for that indexing node and you can see that it will be returned to the NORMAL state after that once indexing the 4th shard is done.

RESTORE provides you the opportunity to copy the indexed data from a different node/backup to another node and use that indexing data. This avoids re-indexing and reuses the already available indexing data. After successful restoring, that node is able to support queries on that corresponding shard as well.

Ex: for the same shard allocation above, if we copy the indexed data for the 5th Shard and add the line,
5, RESTORE
and restart the server, the node would then allocate the 5th shard to that node ( which would be used to search and index)

After restoring, that node would also index the incoming data for that shard as well.

FAQ!


How do I remove a node as an indexing node?


If you want to remove a node as an indexing node, then you have to restart that particular node with indexing disabled ( disableIndexing=true) and it will trigger the global shard allocations to be re-allocated, removing that node from the indexing cluster.

Then you have the option of restarting the other indexing nodes which will automatically distribute the shards among the available nodes again. 

Or you can manually assign the shards. Then you need to make sure to index the data for the shards that node took in other indexing node as you require.
You can view the shard list for that particular node in the local indexing shard config (refer above).

Then you can either use INIT or RESTORE methods to index those shards in other nodes (refer to the Manual Shard configuration section).

You must restart all the indexing servers for it to get the indexing updates after the indexing node has left.

Mistakenly started up an indexing node?


If you have mistakenly started up an indexing server, it will change the global configurations and has to be undone manually.
If the replication factor is equal to or greater than 1, you would still be able to query and get all the data even this node is down.

First of all, to remove a node as an indexing node, you need to delete the indexing data (optional).
Indexing data resides at: <DAS-HOME>/repository/data/indexing_data/

Second if you want to use the node in another server profile (more info on profiles: [3]), restart the server with the required profile.

Then follow the steps in the above question on "How do I remove a node as an indexing node?"

How do I know all my data are indexed?


The simplest way to do this would be to use the Data Explorer in DAS, refer to [4] for more information.

There, for a selected table, run the query “*:*” and it should return all the results with the total count.

For more information or queries please do drop by the mailing lists[4].


Thursday, June 2, 2016

Incremental Analytics With WSO2 DAS

This is the second blog post on WSO2 Data Analytics Server. The first post can be found at [1].

Introduction

The duration of the batch process is critical in production environments. For a product that does not support incremental processing, it needs to process the whole dataset in order to process the unprocessed data. With incremental processing, the batch job only processes the data partition that’s required to be processed, not the whole dataset (which has already been processed), which improves the efficiency drastically.

For example let’s say you have a requirement to summarize data for each day. The first time the summarization script is run, it would process the whole data set and summarize the data. That’s where the similarities end between a typical batch process and incremental analytics.

The next day when the script is run, the batch processing system without incremental analytics support would have to summarize the whole dataset in order to get the last days’ summarization. But with incremental processing, you would only process the last days’ worth of data and summarize, which reduces the overhead of processing the already processed data again.

Think of how it can improve the performance in summarizations starting from minutes running all the way to years.

Publishing events
Incremental analytics uses the timestamps of the events sent when when retrieving the data for processing. Therefore when defining streams for incremental analytics, you need to add an extra field to the event payload as _timestamp LONG to facilitate this.

When sending the events you have the ability to either add the timestamp to the _timestamp attribute or set it for each event at event creation.

Syntax


In DAS, in the spark script, when defining the table, you need to add extra parameters to the table definition for it to support incremental analytics.

If you do not provide these parameters, it will be treated as a typical analytics table and for each query which reads from that table, would get the whole table.

The following is an example in defining a spark table with incremental analytics.

create temporary table orders using CarbonAnalytics options (tableName "ORDERS", schema "customerID STRING, phoneType STIRNG, OrderID STRING, cost DOUBLE, _timestamp LONG -i", incrementalParams "orders, DAY");

And when you are done with the summarization, then you need to commit the status indicating the reading of the data is successfull. This is done via

INCREMENTAL_TABLE_COMMIT orders;

Parameters


incrementalParams has two required parameters and an optional parameter.
incrementalParams “uniqueID, timePeriod, #previousTimePeriods

uniqueID : REQUIRED
    This is the unique ID of the incremental analytics definition. When committing the change, you need to use this ID in the incremental table commit command as shown above.

timePeriod: REQUIRED (DAY/MONTH/YEAR)
    The duration of the time period that you are processing. Ex: DAY

If you are summarizing per DAY (the specified timePeriod in this case), then DAS has the ability to process the timestamp of the events and get the DAY they belongs to.

Consider the situation with the following received events list. The requirement is we need to get the total number of orders placed per each minute.

Customer ID
Phone Type
Order ID
Cost
_timestamp
1
Nexus 5x
33slsa2s
400
26th May 2016 12:00:01
12
Galaxy S7
kskds221
600
27th May 2016 02:00:02
43
iPhone 6s
sadl3122
700
27th May 2016 15:32:04
2
Moto X
sdda221s
350
27th May 2016 16:22:10
32
LG G5
lka2s24dkQ
550
27th May 2016 19:42:42

And the last processed event is,

12
Galaxy S7
kskds221
600
27th May 2016 15:32:04

In the summarized table for the day 27th May 2016, would be 2 since when the script ran last, there were only two events for that particular time duration and other events came later.

So when the script runs the next time, it needs to update the value for the time duration for the day of 27th May 2016.

This is where the timePeriod parameter is used. For the last processed event, DAS calculates the “time period” it belongs to and pulls the data from the beginning of that time period onwards.

In this case the last processed event
12
Galaxy S7
kskds221
600
27th May 2016 15:32:04

Would trigger DAS to pull data from 27th May 2016 00:00:00 onwards.

#previousTimePeriods - Optional (int)
    Specifying this value would allow DAS to pull from previous time periods onwards. For example, if you had set this parameter to 30, then it would fetch 30 more periods worth of data as well.

As per the above example, it would pull from 27th April 2016 00:00:00 onwards.

For more information or queries do drop by the mailing lists[2].

[1] http://sachithdhanushka.blogspot.com/2016/04/wso2-data-analytics-server-introduction.html
[2] http://wso2.com/products/data-analytics-server/

Friday, April 29, 2016

WSO2 Data Analytics Server: Introduction

WSO2 Data Analytics Server(DAS) is a full blown analytics platform which fuses together batch analytics, stream analytics, predictive analytics and interactive analytics in to a single package (how convenient is that?).

This post would (hopefully) act like a FAQ list to DAS new bees!!

Oh, by the way WSO2 DAS is completely open source AND free ( free as in speech and as in free beer :) ). So please be sure to give it a go and play around with the code! And do drop by to our mailing lists [2], we are a open community at WSO2!

Introduction:


The image below summarizes WSO2 DAS elegantly.

Analytics-diagram-v2.jpg

In the simplest terms,
1. It can collect the data from various sources (HTTP, Thrift, External Databases, even the WSO2 ESB! and much more).
2. Run various types of analytics on the data
3. Communicate the results through various ways ( alerts, dashboards ... etc)


So What's underneath?


For batch analytics, we use Apache Spark [3], a clustered large-scale computing framework with upto 100x performance as Apache Hadoop.

WSO2 Siddhi is one of the most powerful complex event processors around. (Even Uber is using siddhi underneath ;) ). Siddhi powers the real time analytics in DAS. 

Apache Lucene, a high performance full text search engine[4], is powering our interactive analytics.  

Predictive analytics is powered by immensely powerful algorithms that extends the Apache Sparks' MLlib package. It helps you build models of the data which can then be used to run analytics against!.

Okay! what are the "other" advantages? 


I'm glad you asked!

  • Single API for all kinds of processing
    You just want to publish data once and get it over with? Yep, that's how we do it.
  • Pluggable data storage support
    Want to run on HDFS? no problem!
    Already have a Cassandra cluster? we've got that covered!
  • Extremely fast writes!
    Asynchronous and non-blocking nature of our publishing allows extremely fast writes for data!
  • Multiple data publishing methods
    Publish data through PHP, python, java, c++ ...etc
    JMX publishers, log publishers and many more!

What can I do with the analyzed data?

Well, simply put, a lot!

We have built in support for interactive dashboards as shown below. You can view and analyze the data in hundreds of ways.

  

You can also send alerts!! 

You want to send SMSs, Emails for a particular type of occurrence? IT'S BUILT IN!

Want more?
Couple this with WSO2 ESB[5] and the options are limitless.
You can even send pagers, or better yet, trigger a physical alarm!! How cool is that?


WSO2 Data Analytics Server is an extremely powerful tool and I hope this gave a very brief but a fairly comprehensive introduction to the DAS server.

Refer to the documentation to for more details [6]

[1] http://wso2.com/products/data-analytics-server/
[2] http://wso2.com/mail/
[3] http://spark.apache.org/
[4] https://lucene.apache.org/core/
[5] http://wso2.com/products/enterprise-service-bus/
[6] https://docs.wso2.com/display/DAS301/WSO2+Data+Analytics+Server+Documentation

Monday, March 14, 2016

Open Water Diving ...!!!

It's been while since I've posted any material on my blog, and I figured it's time and this time I have something I'm REALLY excited about, SCUBA DIVING!

OH YES!!, for a while now, the first spot of my very short bucket list is occupied by 'Diving with the Great White Sharks' ( followed by petting a Lion/ Tiger ;) ). I truly love the nature ( specially predatory creatures, as you might have guessed :) ) and wanted to get my scuba diving licence for a while now. FINALLY I completed the first stage in getting there.

I'm a PADI certified Open Water Diver now!! Basically what it means is I can dive up to 18 meters with regular scuba equipment. Next up is the Advanced Open Water Diver course, which would let me dive up to 40m!.

It was truly an eye-opening experience, the colors and the amazing variety of life hidden underneath the surface is mind blowing. The weightlessness adds to the fantastic sensation and gliding pass the shoals of small fish with a pair of Batfish following you around, is a once in a life time experience.

Among the creatures I encountered (so far), Octupus, the Lion Fish, Box Fish tops the list. But Nudibranchs are my favorites so far, those small creatures are a treat to look at, so colorful and glowing ( seen in the picture below).



Apart from these magnificent sea life, it was disappointing to witness the human intervention in destroying these environments, plastic bags, yogurt cups can be seen wandering around the corals. Not all the environmental pollution is visible from up above the sea level, down there, it's a mess. Truly questions the humans existence in nature.

So far I've dove in Palagala and the T.Sierra wreck, both of the coast of Mount Lavinia in SL. Mainly because I got my licence from IslandScuba [1], the best place to get your Padi licence in Sri Lanka. Shout out to Naren, James and Jehan for all the support and the training and being patient in getting us there :) .

T.Sierra was by far the favorite of the two dive sites, mainly due to the high visibility :), we even made a video of the dive [2]. I encourage you all to go and watch it in 720p for the full experience :).

This is a very short post describing my first experiences underwater, hopefully there will be more to come.

Here's the bunch I dove with, with me giving the OK sign :).




[1] http://www.tripadvisor.com/Attraction_Review-g293962-d3793273-Reviews-Island_Scuba-Colombo_Western_Province.html
[2] https://www.youtube.com/watch?v=8joz4m91JTM


Wednesday, April 23, 2014

Kerberos Java Client: Adding Multi user support

I've written two blog posts on connecting to Kerberos clients using the JCraft library using Java.
In those posts, I've only allowed single user support by using a static login configuration file ( jaas.conf).

To allow multi user support, you have to provide the login configurations for Java programmatically, instead of setting it as a environment variable.
To do this we need to have a configuration object with the relevant settings that were in the login.conf file. We have to create that object extending the javax.security.auth.login.Configuration class.

Here's an example java class.

public class JaaSConfiguration   extends javax.security.auth.login.Configuration {
    private Map BASIC_JAAS_OPTIONS =
            new HashMap();

    private Map USER_KERBEROS_OPTIONS =
            new HashMap();

    private String ticketCache;

    // provide the ticket location in the constructor
    public JaaSConfiguration(String ticketCache) {
        this.ticketCache = ticketCache;
        System.out.println("TicketCache: "+ticketCache);
        init();
    }

    private void init()
    {
     
        USER_KERBEROS_OPTIONS.put("useDefaultCache", "true");
        USER_KERBEROS_OPTIONS.put("doNotPrompt", "true");
        USER_KERBEROS_OPTIONS.put("useTicketCache", "true");
        USER_KERBEROS_OPTIONS.put("debug", "true");
        
        USER_KERBEROS_OPTIONS.put("ticketCache", ticketCache);
        USER_KERBEROS_OPTIONS.put("renewTGT", "true");

    }

    private AppConfigurationEntry USER_KERBEROS_LOGIN =
            new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule",
                    AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
                    USER_KERBEROS_OPTIONS);

    private AppConfigurationEntry[] SIMPLE_CONF =
            new AppConfigurationEntry[]{USER_KERBEROS_LOGIN};

    @Override
    public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
        return SIMPLE_CONF;
    }
   


}

Now that you have the extended class, you have to specify before you start the session, where to look for the login configurations ( instead of setting it as a system variable)


javax.security.auth.login.Configuration.setConfiguration(new JaaSConfiguration(ticketCache));

notice that, in the constructor, you have to provide the ticket location of each user.

After that you are good to go.

Tuesday, March 11, 2014

SSH Agent Forwarding using Java

SSH Agent forwarding is a really useful way to use a chain of ssh commands without configuring public keys for each and every machine in the chain.
You just have to add your public key to all the nodes in the chain and it allows you to make ssh calls to any node while in a another node.
Here's a very good illustration of how it works ... [1]

First you should test it using the terminal.
You can do this by editing the config file inside the .ssh directory if you don't have one, create one

vi ~/.ssh/config

Example config file:
Host beast
   HostName 196.96.179.220
   User swithana
   IdentityFile ~/.ssh/id_rsa
   ForwardAgent yes

Host iu10
   HostName xx.xxx.xsede.org
   User swithana
   ForwardAgent yes
   IdentityFile ~/.ssh/id_rsa

With this file configured, you can ssh to the beast host machine and then do a ssh from inside the beast host machine to the iu10 without having to include the beast machine's public key in the iu10 machine. It can also be done the other way around ( through iu10 to beast)

Example commands
$swithana:~ ssh beast
$swithana@beast:~ ssh iu10

This would work because you've set the SSH Agent Forwarding to true.

Here's the Java code to copy a file from one host to another using SSH Agent Forwarding.( uses scp)
I have used the JCraft library [2]

public class SSHCommandExecutor {
    public static void main() {

        String host = "196.36.199.10";
        String user = "swithana";

        //your paraphrase here
        String paraphrase = "***********";

        String command = "scp test.txt swithana@xx.xx.xsede.org:/home/swithana/test_2.txt";

        //private key location
        String privateKey = "/Users/swithana/.ssh/id_rsa";

        JSch jsch = new JSch();
        jsch.setLogger(new MyLogger());


        try {
            jsch.addIdentity(privateKey, paraphrase);

            Session session = jsch.getSession(user, host, 22);
            Properties config = new java.util.Properties();
            config.put("StrictHostKeyChecking", "no");
            config.put("PreferredAuthentications",
                    "publickey");

            session.setConfig(config);
            session.connect(20000);

            Channel channel = session.openChannel("exec");

            ((ChannelExec) channel).setCommand(command);

            // this is the key line that sets AgentForwading to true
            ((ChannelExec) channel).setAgentForwarding(true);

            channel.setInputStream(null);
            ((ChannelExec) channel).setErrStream(System.err);

            channel.connect();
            Thread.sleep(1000);

            channel.disconnect();
            session.disconnect();
            System.out.println("DONE");

        } catch (JSchException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static class MyLogger implements com.jcraft.jsch.Logger {
        static java.util.Hashtable name = new java.util.Hashtable();

        static {
            name.put(new Integer(DEBUG), "DEBUG: ");
            name.put(new Integer(INFO), "INFO: ");
            name.put(new Integer(WARN), "WARN: ");
            name.put(new Integer(ERROR), "ERROR: ");
            name.put(new Integer(FATAL), "FATAL: ");
        }

        public boolean isEnabled(int level) {
            return true;
        }

        public void log(int level, String message) {
            System.err.print(name.get(new Integer(level)));
            System.err.println(message);
        }
    }
}



[1] http://www.unixwiz.net/techtips/ssh-agent-forwarding.html
[2] http://www.jcraft.com/

Wednesday, February 26, 2014

Kerberos Java Client: Code

This is the second post of the Kerberos Java Client series.
First post can be found here [1].

This post is on the java code that is used to connect to the Kerberized server using ssh, execute a command ('ls') there and get results ( read the output stream).

You need to provide the locations of the Kerberos configuration file and the Jaas configuration file as System properties. I've done it inside the class itself.

The code is pretty much self explanatory.

import com.jcraft.jsch.*;


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class JSCHKerberosConnector {
    public static void main() {

        String host = "test.xsede.org";
        String user = "sachith";
        String  command = "ls -ltr";

        JSch jsch = new JSch();
        jsch.setLogger(new MyLogger());

        System.setProperty("java.security.krb5.conf", );
        System.setProperty("java.security.auth.login.config", );
        System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
       
        //to enable kerberos debugging mode
        System.setProperty("sun.security.krb5.debug", "true");

        try {

            Session session = jsch.getSession(user, host, 22);
            Properties config = new java.util.Properties();
            config.put("StrictHostKeyChecking", "no");
            config.put("PreferredAuthentications",
                    "gssapi-with-mic");

            session.setConfig(config);
            session.connect(20000);

            Channel channel = session.openChannel("exec");
            ((ChannelExec) channel).setCommand( command);
            channel.setInputStream(null);
            ((ChannelExec) channel).setErrStream(System.err);

            InputStream in = channel.getInputStream();
            channel.connect();
            byte[] tmp = new byte[1024];
               while (true) {
                while (in.available() > 0) {
                    int i = in.read(tmp, 0, 1024);
                    if (i < 0) break;
                    System.out.print(new String(tmp, 0, i));
                }
                if (channel.isClosed()) {
                    System.out.println("exit-status: " + channel.getExitStatus());
                    break;
                }
                try {
                    Thread.sleep(1000);
                } catch (Exception ee) {
                }
            }
            channel.disconnect();
            session.disconnect();
            System.out.println("DONE");

        } catch (JSchException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //to log the jsch activity
    public static class MyLogger implements com.jcraft.jsch.Logger {
        static java.util.Hashtable name=new java.util.Hashtable();
        static{
            name.put(new Integer(DEBUG), "DEBUG: ");
            name.put(new Integer(INFO), "INFO: ");
            name.put(new Integer(WARN), "WARN: ");
            name.put(new Integer(ERROR), "ERROR: ");
            name.put(new Integer(FATAL), "FATAL: ");
        }
        public boolean isEnabled(int level){
            return true;
        }
        public void log(int level, String message){
            System.err.print(name.get(new Integer(level)));
            System.err.println(message);
        }
    }
}




[1] http://sachithdhanushka.blogspot.com/2014/02/kerberos-java-client-configuration.html