Thursday, March 26, 2015

YARN : Yet Another Resource Negotiater

Yet Another Resource Negotiator : Hadoop

YARN: Yet Another Resource Negotiator

In last post here, we discussed about map reduce paradigm initially proposed by Google. However, as I said there too, no implementation was made public by Google for this. Folks at Yahoo came up with implementation and made it public under Apache, that project is called as Hadoop.

Hadoop implemented map reduce paradigm using distributed file system called as Hadoop Distributed File Systems (HDFS).
In this post we will understand more about internals of map reduce implementation and resource allocation and scheduling mechanism called as Yet Another Resource Negotiator (YARN) 
There are four basic function which needs to be done in order to implement the concept as listed below:
1. Parallelize map function
2. Transfer the data from map function output to reduce input
3. Parallelize reduce function
4. Storage for : Map input, map output, reduce input and reduce output.

As we understood earlier, that map function or mapper work on data independent of each other, they are very easy to parallelize. Next step is to transfer mapper output to reducer. In order to have efficient reducer, records with same keys are sent to same reducers. As we will see later, these records may be local to server where reducer is running or may be remote to it. Other way to distribute mapper output is to use some partitioning method like hashing.
Now, since input required for reducer is available to it independent of other reducers, they can also be parallelize very easily.
To implement storage for input and output of various stages, below is the table:
Map input : From distributed file system
Map output : Local file system
Reduce input : Local file system 
Reduce output : Distributed file system.

Local file system can be normal operating system file sytem like Linux FS. Example of distributed file system are Google File System (GFS) or Hadoop Distributed File System (HDFS)
Internal working of above can be easily understood by figure below:

Hadoop internal
In figure above, we can see that mappers and reducers need to run on servers  (physical computing machines). How this servers are assigned, which application maps to which server and how many instances of these processes requires a robust resource allocation and scheduling mechanism. In following text we will talk very specific to Hadoop implementation of Map Reduce paradigm.

Hadoop is used now a days in industry for distributed processing of humongous amount of data. It can scale from a single node  or machine to virtually thousands of nodes or machines. In earlier versions of resource allocation and scheduling mechanisms in Hadoop, job scheduling and resource allocation was done by single daemon. Hadoop cluster can be divided into two major blocks : The map reduce implementation and as said above a distributed file system. Figure shows high level architecture of Hadoop cluster.

Whenever a task is assigned by cluster, job tracker selects the server node which can most efficiently process the data, one of the criteria being proximity of server to data on which mapper or reducer has to do processing. The node which is selected is called as slave node and has task tracker which has multiple slots to accommodate processes assigned to it.
Communication is sent from task tracker to to tracker when task assigned to it has been completed. Once all such task trackers give completion report to job tracker, it informs client that job has been done.
With above explanation of working, it is clear that each job needs to go through single daemon called job tracker, which becomes bottle-neck, dampens the scalability and limits processing speed.

To solve this problem of scalability, Hadoop came up with new implementation called as YARN : Yet Another Resource Negotiator or MRv2 So how does it overcomes shortcomings of above implementation? This is what mentioned on official Apache Hadoop website 
The fundamental idea of MRv2 is to split up the two major functionalities of the JobTracker, resource management and job scheduling/monitoring, into separate daemons. The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM). An application is either a single job in the classical sense of Map-Reduce jobs or a DAG of jobs. 
Pre-YARN implementation where more like operating systems which can run same kind of application, that application being 'map reduce'. YARN is kind of distributed operating system which can run various kind of applications also called as multi-tenancy. Although YARN maintains backward compatible with MRv1 supporting map reduce APIs however it can do much more than that, and you can develop your distributed applications in any language you want and run it on YARN. It gets rid of the requirement to keep separation between map reduce and other known complex distributed application frameworks.

Below is figure of YARN, each component explained

yarn architecture

Resource manager and node managers form the data computation part of the framework. resource manager arbitrates resources among all stakeholders. Resource manager has two parts to it : Scheduling and application manager. Scheduler is allocates resources to various application based on capacity and limitations of system. It also implements queue and other required mechanisms.
Application manager accepts jobs which are submitted by various clients, negotiates containers who can execute those jobs (Container here means some machine with processing power and storage).

Node manager is machine level entity which monitors machine level stats like CPU, memory etc and containers and sends them back to resource manager.

Application manager is per task library which request resources from resource manager and works with node managers to execute and monitor tasks.This also detect task failures for a job.

I have tried to explain concept of YARN, please share your views if something is missing or wrong.
And sharing is caring :)

Ford Fulkerson Algorithm for Maximum Flow

Ford Fulkerson Method for Maximum Flow in graphs

Ford Fulkerson Method for Maximum Flow

Before jumping directly on to Ford Fulkerson algorithm,we will have to understand concepts of network flow. So, we will start by some definitions and stating some important properties and then go on to the Ford Fulkerson Algorithm for Maximum Flow.
This particular topic requires the previous knowledge of Graph Theory. So, it is recommended to study basic algorithms of graph theory first.

Network Flow
Let us consider a situation, where we have to transfer water from source to the sink in a network of pipes. Figure 1 shows the situation. Each pipe has some capacity associated with it. The flow through the pipe cannot be more than its capacity. The source 's' supplies water and the sink 't' consumes it. We want to transfer maximum water through the network form source to the sink.

Ford Fulkerson algorithm for max flow

Now, talking in terms of graph theory, we are given a network - a directed graph, in which every edge has a capacity 'c' associated with it, a starting vertex source 's' and an ending vertex sink 't'. 
we have to associate some flow 'f' to the edges such that f ≤ c for every vertex. Note that, all the vertices other than source and the sink, must have the sum of the flow entering into the vertex equal to the sum of the flow leaving from the vertex i.e. there is no net flow coming out of any node and no net flow is consumed by the node other than source and sink. The flow coming out of the source  and flow going into the sink must be greater than zero. Figure 2 shows the graph network.
Mathematically : 
f(x, y) ≤ c(x, y)
y∈V(x,y) = 0 = ∑y∈V(y,x) ∀ x∈V-{s,t}         where 'V' is a set of all vertices
f(s,y) ≥ 0 ∀ y
f(y,t) ≥ 0 ∀ y

There is a notion of  Residual Capacity. Suppose there is an edge between nodes x and y with capacity 10 and flow 7, as shown in figure 2. There is also an edge between the same nodes from y to x with capacity 20 and flow 0. So, here we have capacity of 3 to add in the direction from x to y. Therefore residual capacity from x to y is 3. We have residual capacity of 20 in the direction form y to x, but whatever 7 units of flow is in x to y direction can also be neutralized, so that can also be added in the direction y to x which makes the net residual capacity in the direction y to x 27. So we define residual capacity Cf  as :
Cf(x,y)= C(x,y) - f(x,y).

If Cf(x,y) = 0 then, we say that edge (x,y) is saturated because we cannot send more flow through this edge. Let U and W be the subsets of V, then flow f(U,W) is donated by sum of the flow of all the edges with one vertex in U and the other one in W.
f(U, W) = ∑x∈Uy∈Wf(x,y)

The figure shows the graph of the network flow.Note that the flow in all the vertices other than source and the sink  is conserved i.e. net flow going into a vertex is equal to the net flow coming out of the vertex.

A cut is basically a partition of the vertices of a graph into two subsets such that the source and the sink are in different subsets.
So, a cut is defined as a touple (S, V-S), where S is a set of vertices such that source s∈S and t∈(V-S). Therefore
C = (S, V-S) such  that  s∈S, t∈(V-S)

Edge Set E of a cut is the set of all the edges such one of the vertex of the edge is in S and the other one is in V-S. It contains the edges that point form the set containing the source to the set containing the sink. Therefore 
E(S, V-S) = {(x, y) | x∈S, y∈(V-S)}
Capacity C of a cut (S, V-S) is the sum of capacities of all the edges of the in the Edge set of the cut.
C(S, V-S) = ∑ C(x, y) | x∈S, y∈(V-S)

Let us understand a very important property (let us call it Property A), which states that :
Property A : An Edge Set E'⊂E (where E is the complete set of edges) is an edge set of a cut if and only if E' is the minimal set with the property that any path from source 's' to sink 't' passes through atleast one edge of E'. Here, by "minimal" i mean to say that this property breaks down if even one edge of E' is withdrawn.
Proof : Let us say E' is the edge set of  some cut (S, V-S). Now, let us take some arbitrary path from source s to sink t. The figure below shows the path from s to t.

So, if we travel from any path s to t, there has to be an edge (Vi, Vi+1) that has vertex Vi in set V and vertex Vi+1 in set (V-S). Therefore, if Vi+1 is the first vertex from left that belongs to set (V-S), then vertex Vi∈S and edge (Vi, Vi+1) belongs to edge set (V, V-S),
Now, we have to prove that this is a minimal such set, let us take off some edge (x,y) from this set and call the new set (after taking off an edge from E') E''. So
E'' = E' - {(x, y)}                 where x∈S and y∈(V-S)
So, now consider a path  s, x, y, t. In this path, edge (s, x)∈S and edge (y, t)∈(V-S). It is clear that neither of the edges (s,x) and (y, t) belongs to the edge set of the cut and since edge (x,y) also does not belong to the edge set, the edge set E'' fails to satisfy the property because now there exists a path the does not pass through the edge set. Hence, E' is a minimal set having the Property A.

The converse of this property is also true, which says that if E' is a set that satisfies Property A and also it is minimal, then the set E' has to be an edge set E(S, V-S) of a cut. 
Since, set of vertices 'S' which contains source vertex 's' does not contain any edge that is in E' which is the minimal Edge set of a cut, we can say that every vertex in set 'S' is reachable from source 's' without passing through any edge of E'. So
S = {x | x' is reachable from source 's' without passing through any edge of E'}

Now let us consider an edge (x, y) belongs to edge set E(V, V-S) but does not belong to minimal edge set E', where x∈S and y∈(V-S), Mathematically : 
(x, y)∈E(S, V-S)-E' where  x∈S and y∈(V-S) .....................(1)

So, we must have a path from source 's' to vertex 'x' without passing through any edge of E', now, because we have assume that edge (x, y) does not belong to E', so that path can be extended to y, which makes 'y' also a part of set 'S', which contradicts our assumption above in (1). This means, there is no edge that belongs to edge set E(V, V-S) but does not belong to minimal edge set E'. Therefore, E' contains every edge of the edge set of the cut E(S, V-S).
E(S, V-S) ⊆ E'.

This proves that E(S, V-S) also satisfies the Property A. and if E(S, V-S) is strictly smaller than E', then E' cannot be a minimal edge set of the cut. Therefore E' must be equal to E(S, V-S). Mathematically
E' = E(S, V-S)
Hence the converse is also proved that if an Edge set of a cut follows Property A, then it has to be minimal. That also means that no proper subset of E' satisfies Property A.

Net Flow
Net Flow of a floe network is defined as the total flow supplied by the source 's', it is also equal to the total flow consumed by the sink 't'. It is denoted by |f|. Therefore,
|f| = ∑ f(s, y)   where y∈V.
Also, for any cut (S, V-S), capacity of the the cut is greater than or equal to the flow of that cut which is equal to the net flow. Mathematically
C(S, V-S) ≥ f(S, V-S) = |f|
The above result states that whatever net flow has emerged out of the source 's' is equal to the net flow that goes from S to (V-S).

We are given a flow network with a source 's' and sink 't', we have to compute a flow 'f ', such that |f| is maximum.
Now, to calculate the result we will have to consider Max flow Min Cut Theorem.

Max flow Min Cut Theorem : This theorem states that if 'f ' is the maximum flow for a given network, then the value of the flow |f|  is the minimum cut capacity. 
So, value of |f| is equal to the minimum value of the capacity of a cut over all the cuts. Therefore
|f|  = min{C(S,V-S)} over all the cuts.

Now, let us consider a flow 'f ' and an edge set E1 = {(x,y) | (x, y) is saturated in f}. Then edge set E1 has to follow Property A. Because if it does not follow property A, then there must exist a path P = s, x1, x2 ....., xk, t,  such that none of the edges of E1 are in the path. So, every edge in E1 has some greater than zero residual capacity. Let 'r' be the minimum of all the residual capacities in path P. Then every flow 'f ' in every edge can be increased to f ' by 'r'. So, 
f ' (x, y) = f (x, y) + r.
The above expression is also valid for vertex 'x' being source vertex 's', this means that the net flow |f| also increases to |f '| by 'r'. which is absurd because it was our assumption that |f| was maximum. Hence E1 has to follow the Property A.
Now, let us consider a set E1' ⊆ E1 and E1' is also minimal, Then E1' also has to follow the Property A and every edge of E1 is saturated because it is a subset of E1 which itself contains all the saturated edges . Since, E1' also follows the Property A and it has saturated edges, the capacity through E1' is equal to the net flow.
This tells us that as long as there is room for some improvement in the flow, or we can say that the flow is not maximum, there exists a path from source 's' to sink 't'. Note that by saying that there exists a path from source 's' to sink 't', we mean that there is no saturated edge in the path.

Residual Graph
In a Residual Graph, an edge (x, y) has the value equal to the residual capacity of that edge. Residual Graph is made from the original graph. It does not contain the edges with residual capacity equal to zero. The figure below shows a residual graph

We always have to find a path in the original graph on which the flow can be improved. Since, the residual graph only contains the edges that have positive residual capacity, we will only have to find a path in the residual graph and update the residual capacity which is the weight of each edge. This path is called "Augmenting Path". So, the flow will be maximum if and only if the residual graph has no path form source 's' to sink 't'.
So, now according to the theorem : 'f ' is a maximum flow if and only if  the residual graph has no path from source 's' to sink 't'.

Ford-Fulkerson Method

The algorithm for the Ford-Fulkerson algorithm is shown below. 
The algorithm below works only for the integer capacities.

The Augmenting path in the step 2 can be found out using Breadth First Search or Deapth First Search. If the former is used, the algorithm is called as Edmonds-Karp.

Complexity Analysis

By adding flow in the augmenting path to the existing flow in the graph, maximum flow will be reached when there is no more augmenting paths found in the graph. However, it is not certain that the maximum flow will be found every time, because if the capacities are irrational, the flow might not even converge to the maximum flow. 
The complexity of the ford-fulkerson algorithm (ford-fulkerson uses Deapth First Search) is O(E*f) where 'E' is the number of edges in the graph and 'f' is the maximum flow in the graph. This is because each augmenting path can be found in O(E) time and increases the flow by an integer amount by atleast 1.
A variation in ford-fulkerson method is Edmond-Karp algorithm which has complexity O(VE2).

The code of the algorithm will be added later.

Monday, March 23, 2015

Map Reduce : Cloud computing paradigm

Map Reduce paradigm

Very important concept in cloud computing is Map Reduce. This paradigm originated at Google and they published a paper on this. However, they never released the implementation of it. Map reduce has two parts to it : Map and Reduce.

Mapper function
Map function performs certain processing on data sets where processing on each data set is independent of other. For example, if sum of squares of list of number is to be found, map function will find square of each number in list. Here finding square of a number is completely independent of other.

Reducer function
Reduce function works on the output generated by map function and combines them to produce final result. Significant difference between map function and reduce function is that reduce may or may not perform processing on data sets independently. So in above example. reduce function takes output of map function that is square of each number and adds them.

Since the map function can independently on data sets. it is very good candidate for parallelism. Multiple map function can run on multiple servers on independent data sets and generate some key value pairs which then can be consumed by reduce function for further processing.
In the same way more than one reduce functions can be invoked on more than on server which run different batches of inputs (output from map) which are related and dependent on each other. In our example, as all the output needs to be there in order to compute sum, only once reduce function is possible.
It is important to know that map and reduce function do not talk to each other hence no data sharing can happen between any two instances of them.
Let's go to the programming part and see how these two concept work. Take a problem called wordcount. Problem states that given a huge block of text like Wikipedia or work of Shakespeare,  find out count of each word that occurred in that text. In our small example, let's take below file as our input. Output of above program should be something like this: How does map reduce work here? In this problem first part is to read the input task and check all the words which occur in block of text. Map function will go through the complete text and generate an output in <key, count> format where key is word and count is 1 (I will explain why count is 1 always in this function).Output of map function will be as below:
Now, reduce function take the output of map and adds the count of same word and produces the output show above. High level description of algorithm is as follows: In example above, only instance of map function is used. Can we use more?
Why not? There can be two map function which process different part of file. And that's why count is always one, as one map function instance may not be aware of the fact that same word has been encountered by other function also. And it's responsibility is to count not to aggregate, that has to be done by reduce function. Multiple instance is shown in figure.

Map reduce paradigm

There can be multiple instances of reduce function also. How do output values are sent to particular reduce function. A mapping function can be decided, which sends similar or dependent values to same reduce function. Simple mapping function can be Hashing. Hash the key and then take module with number of instances (taking module ensures load balancing) reduce# = hash(key) %No. of instances and based on the result, send value to appropriate reduce function as shown in figure above. We can use as many instances of each functions as needed.
Lot of talk, let's see some code.  Map function is implemented as below.

Reduce function is implemented as follows:
The driver function which connects both these two worlds is

Source of above three code is : Hadoop tutorial

This is first post on map reduce paradigm and we will talk more on this in next couple of post. Please share your views in comments. Also subscribe to read many more such articles.