RDD

Parallelism

Serialization and GC

References

TreeReduce and TreeAggregate Demystified

Introduction

In a regular **reduce **or** aggregate** functions in Spark (and the original MapReduce) all partitions have to send their reduced value to the driver machine, and that machine spends linear time on the number of partitions (due to the CPU cost in merging partial results and the network bandwidth limit). It becomes a **bottleneck **[13]** **when there are many partitions and the data from each partition is big.

Since Spark 1.1 [20] was introduced a new aggregation communication pattern based on multi-level aggregation trees. In this setup, data are combined partially on a small set of executors before they are sent to the driver, which dramatically reduces the load the driver has to deal with. Tests showed that these functions reduce the aggregation time by an order of magnitude, especially on datasets with a large number of partitions.

So, in **treeReduce **and in** treeAggregate**, the partitions talk to each other in a logarithmic number of rounds.

In case of **treeAggregate** imagine the follow n-ary tree that has all the partitions at its leaves and the root will contain the final reduced value. This way there is no single bottleneck machine.

Differences between reduceByKey and treeReduce

Differences between aggregate and treeAggregate

**treeAggregate** [19] is** **a specialized implementation of **aggregate **that iteratively applies the combine function to a subset of partitions. This is done in order to prevent returning all partial results to the driver where a single pass reduce would take place as the classic **aggregate **does.

Why you should use TreeReduce/TreeAggregate

Many of **MLib**'s algorithms uses **treeAggregate**, in the case of **GaussianMixture **(https://tinyurl.com/n3l68a8) the use of **treeAggregate **rather than **aggregate **have increased the performance about **20%**, while **Online Variational Bayes for LDA **(https://tinyurl.com/kt6kty6) uses a **treeAggregate **instead of a **reduce **to aggregate the expected word-topic count matrix (potentially a very large matrix) without scalability issues. Also **MLlib**'s implementation of **Gradient Descent** use **treeAggregate **(https://tinyurl.com/l6q5nn7).

In fact curious about this I've decided to use **treeAggregate **instead of a **reduce **to compute **Gradient **in my implementation of **Back Propagation**. In my test of dataset with 100 features and 10M instance partitioned in 96 partitions, performed on a cluster consists of 3 Worker nodes and one Application Master node (each with 16 CPUs and 52 GB memory), the **neural network **performed 100 epochs in only 36 minutes instead of hours.

Code examples (Scala)

The follow **Scala **code generates two random double **RDD **that contains **1 million values** and calculates the **Euclidean distance** using **map-reduce pattern**, **treeReduce **and **treeAggregate**:

1

import org.apache.commons.lang.SystemUtils

2

import org.apache.spark.mllib.random.RandomRDDs._

3

import org.apache.spark.sql.SQLContext

4

import org.apache.spark.{SparkConf, SparkContext}

5

6

import scala.math.sqrt

7

8

object Test{

9

10

def main(args: Array[String]) {

11

12

var mapReduceTimeArr : Array[Double]= Array.ofDim(20)

13

var treeReduceTimeArr : Array[Double]= Array.ofDim(20)

14

var treeAggregateTimeArr : Array[Double]= Array.ofDim(20)

15

16

// Spark setup

17

val config = new SparkConf().setAppName("TestStack")

18

val sc: SparkContext = new SparkContext(config)

19

val sql: SQLContext = new SQLContext(sc)

20

21

// Generate a random double RDD that contains 1 million i.i.d. values drawn from the

22

// standard normal distribution `N(0, 1)`, evenly distributed in 5 partitions.

23

val input1 = normalRDD(sc, 1000000L, 5)

24

25

// Generate a random double RDD that contains 1 million i.i.d. values drawn from the

26

// standard normal distribution `N(0, 1)`, evenly distributed in 5 partitions.

27

val input2 = normalRDD(sc, 1000000L, 5)

28

29

val xy = input1.zip(input2).cache()

30

// To materialize th RDD

31

xy.count()

32

33

for(i:Int <-0 until 20){

34

val t1 = System.nanoTime()

35

val euclideanDistanceMapRed = sqrt(xy.map { case (v1, v2) => (v1 - v2) * (v1 - v2) }.reduce(_ + _))

36

val t11 = System.nanoTime()

37

println("Map-Reduce - Euclidean Distance "+euclideanDistanceMapRed)

38

mapReduceTimeArr(i)=(t11 - t1)/1000000.0

39

println("Map-Reduce - Elapsed time: " + (t11 - t1)/1000000.0 + "ms")

40

}

41

42

for(i:Int <-0 until 20) {

43

val t2 = System.nanoTime()

44

val euclideanDistanceTreeRed = sqrt(xy.map { case (v1, v2) => (v1 - v2) * (v1 - v2) }.treeReduce(_ + _))

45

val t22 = System.nanoTime()

46

println("TreeReduce - Euclidean Distance "+euclideanDistanceTreeRed)

47

treeReduceTimeArr(i)=(t22 - t2) / 1000000.0

48

println("TreeReduce - Elapsed time: " + (t22 - t2) / 1000000.0 + "ms")

49

}

50

51

for(i:Int <-0 until 20) {

52

val t3 = System.nanoTime()

53

val euclideanDistanceTreeAggr = sqrt(xy.treeAggregate(0.0)(

54

seqOp = (c, v) => {

55

(c + ((v._1 - v._2) * (v._1 - v._2)))

56

},

57

combOp = (c1, c2) => {

58

(c1 + c2)

59

}))

60

val t33 = System.nanoTime()

61

println("TreeAggregate - Euclidean Distance " + euclideanDistanceTreeAggr)

62

treeAggregateTimeArr(i) = (t33 - t3) / 1000000.0

63

println("TreeAggregate - Elapsed time: " + (t33 - t3) / 1000000.0 + "ms")

64

}

65

66

val mapReduceAvgTime = mapReduceTimeArr.sum / mapReduceTimeArr.length

67

val treeReduceAvgTime = treeReduceTimeArr.sum / treeReduceTimeArr.length

68

val treeAggregateAvgTime = treeAggregateTimeArr.sum / treeAggregateTimeArr.length

69

70

val mapReduceMinTime = mapReduceTimeArr.min

71

val treeReduceMinTime = treeReduceTimeArr.min

72

val treeAggregateMinTime = treeAggregateTimeArr.min

73

74

val mapReduceMaxTime = mapReduceTimeArr.max

75

val treeReduceMaxTime = treeReduceTimeArr.max

76

val treeAggregateMaxTime = treeAggregateTimeArr.max

77

78

println("Map-Reduce - Avg:" + mapReduceAvgTime+ "ms "+ "Max:" +mapReduceMaxTime+ "ms "+ "Min:" +mapReduceMinTime+ "ms ")

79

println("TreeReduce - Avg:" + treeReduceAvgTime + "ms "+ "Max:" +treeReduceMaxTime+ "ms "+ "Min:" +treeReduceMinTime+ "ms ")

80

println("TreeAggregate - Avg:" + treeAggregateAvgTime + "ms "+ "Max:" +treeAggregateMaxTime+ "ms "+ "Min:" +treeAggregateMinTime+ "ms ")

81

}

82

}

Copied!

To see **treeReduce/treeAggregate** shine, this code should be run on a cluster with a large number of partitions.

Code examples (Java)

Basic Java treeReduce and treeAggregate examples

1

public void treeReduce() {

2

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);

3

Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {

4

@Override

5

public Integer call(Integer a, Integer b) {

6

return a + b;

7

}

8

};

9

for (int depth = 1; depth <= 10; depth++) {

10

int sum = rdd.treeReduce(add, depth);

11

assertEquals(-5, sum);

12

}

13

}

14

15

16

public void treeAggregate() {

17

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);

18

Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {

19

@Override

20

public Integer call(Integer a, Integer b) {

21

return a + b;

22

}

23

};

24

for (int depth = 1; depth <= 10; depth++) {

25

int sum = rdd.treeAggregate(0, add, add, depth);

26

assertEquals(-5, sum);

27

}

28

}

Copied!

Previous

Use coalesce to repartition in decrease number of partition

Next - RDD

When to use Broadcast variable

Last modified 2yr ago