Thursday, January 26, 2017

Apache Storm - Custom Bolt

Apache Storm is open source distributed computing system. Storm can be used for - Streaming  Data, Realtime Data Processing and Distributed Messaging ...

In one of my assignment am using Storm for streaming data (one of Storm main feature). Requirement was to stream CDR (Call Data Records) file data to HDFS. As calls originate from different regions, data records has to go to directory in HDFS corresponding to region of call origination.
eg: consider a call records from Karnataka (KA) will be written to directory: <PARETN_DIR>/state=NAME in HDFS.

Later i can define a hive table with state as partition fetch desired records.

To achieve data streaming to specific directory based on record content i chose to create a custom CDRHDFSBolt by extending HDFSBolt API provided by Storm. Except for writeTuple() method i kept all method untouched. I over wrote writeTuple() method to verify tuple content: if it has a value corresponding to desire state, then write tuple to HDFS else discard the tuple.

Bolt Class:
public class CDRHDFSBolt extends HdfsBolt{

    @Override
    public void writeTuple(Tuple tuple) throws IOException{      
        String record = tuple.getString(0);
        String statename = tuple.getString(1);
     
        if(statename.equals("NAME"))
            super.writeTuple(tuple);
     
        this.collector.ack(tuple);
    }
}

Topology Class:


     HdfsBolt bolt = CDRHdfsBolt();
     FileNameFormat fileNameFormat = new DefaultFileNameFormat()
            .withPath("/prac/storm/callrcds/statename=NAME")
            .withExtension(".txt");
     bolt.withFileNameFormat(fileNameFormat);

Note: set HDFS details to bolt.


Wednesday, December 28, 2016

Spark - RDD


First - RDD stands for Resilient Distributed Dataset.

Spark RDD is a distributed collection of data. This distributed collection is usually created in two ways: by external data ( a file, data from HDFS) or by distributing a collection of object ( eg: List/Set) in driver program.

Scala code to create RDD:

  1. External data RDD: val lines = sc.textFile("input.txt") 
  2. Distribute Collection RDD: val nums = sc.parallelize(List(1, 2, 3, 4))
*sc - is SparkContext object 

Now we have RDDs created in our driver program. Once RDD created, we do computation on theses.
Two ways of computation can be performed on RDD:

  • Transformation: Transformation results in new RDDs. Commonly used Transformations:
    • flatMap(): apply function to each element in RDD and returns cotent of iterator returned as new RDD.
    • filter(): returns an RDD that contains only elements that pass filter condition
    • map():  returns an RDD applying function to each element in RDD 
    • distinct(): removes duplicate elements.
    • Union: produces an RDD with contianning elements from both ...
  • Actions: Actions are the operations that return some value or write data. Commonly used Actions:
    • collect(): returns all elements in RDD
    • count(): Number of elements in RDD
    • foreach(): iterate over the elements in RDD 
    • top(num): returns top num elements from RDD ...