Monday, 2 May 2016

RDD in Spark


RDD is main feature in  Spark ,which is making it unique as compare to other frame work .
It uses the memory and disk space wisely and according to requirement which gives it extra
power to process the flow in faster way as compare to MR.



Spark’s RDD : 

R  for  resilient  -  able to withstand or recover quickly from difficult conditions.

D for  distributed - give a share or a unit of (something) to each of a number of recipients.

D for  dataset -  a collection of related sets of information that is composed of separate elements but   can be manipulated as a unit by a compute.

 It is the starting point of Spark i.e its key component of Spark.

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant immutable distributed  collection of elements that can be operated on in parallel.

There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.

Creation of RDD : 

Start  pyspark on your machine and type the below command to create RDD.

1.       Create RDD using Parallelize method :

>>> firstRDD = sc.parallelize([1,2,3,4,'5',6,7,'8'])    # It will create RDD as “firstRDD”

>>> firstRDD
    ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423

   # This command is showing you, RDD created or not and location of  it.

>>> firstRDD.collect()
   o/p : [1, 2, 3, 4, '5', 6, 7, '8']  (collect() is command to see output of your RDD .)




2.        Create RDD using file from local or HDFS location.

 >>> secondRDD = sc.textFile('/home/terbit/sample.txt')

16/05/02 10:18:55 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 41.7 KB, free 43.7 KB)
16/05/02 10:18:55 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.2 KB, free 47.8 KB)
16/05/02 10:18:55 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:49315 (size: 4.2 KB, free: 517.4 MB)
16/05/02 10:18:55 INFO SparkContext: Created broadcast 1 from textFile at NativeMethodAccessorImpl.java:-2

  >>> secondRDD

     MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:-2  #Note RDD is created.

>>> secondRDD.collect()
[u'Spark learning , this is test file created for spark testing.', u'Spark will treat each line as record.', u'lets see how it work . this is first time I am using file .']

>>> secondRDD.toDebugString
<bound method RDD.toDebugString of MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:-2>

Tansformation and Action

There are two types of operation being supported by Spark RDD . 

1.      Transformations - which create a new dataset from an existing one.
2.      Actions - which return a value to the driver program after running a computation on the dataset.



Transformation : 

They  are methods that take a RDD as the input and return another RDD as the output. They do not change the input RDD (since RDDs are immutable and hence cannot be modified), but always return one or more new RDDs by applying the computations they represent.
Transformations are lazy and thus are not executed immediately. Only after calling an action are transformations executed.

Actions : 

They trigger execution of RDD transformations to return values. Simply put, an action evaluates the RDD lineage graph.
You can think of actions as a valve and until no action is fired, the data to be processed is not even in the pipes, i.e. transformations. Only actions can materialize the entire processing pipeline with real data.


RDD Persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). 

This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. 
Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
 
     Cache RDD

>>> firstRDD.persist()
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423

2.      Check whether cached or not.

>>> firstRDD.is_cached
True

3.      Remove from cache.

>>> firstRDD.unpersist()
16/05/02 11:18:04 INFO ParallelCollectionRDD: Removing RDD 0 from persistence list
16/05/02 11:18:04 INFO BlockManager: Removing RDD 0
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423

4.        Check removed from cache or not.

>>> firstRDD is_cached.
False



1 comment: