課程名稱︰巨量資料系統
課程性質︰選修
課程教師:廖世偉
開課學院:電資學院
開課系所︰資工所、網媒所
考試日期(年月日)︰2013.11.10
考試時限(分鐘):
試題 :
Big Data System Midterm Exam
November 10, 2013
1 (20%) Term Explanation
1. (5%) What is combiner? How can it help with the performance of MapReduce
under the scenario of big-data?
Ans:
a. A pass executed between map and reduce(after map), to compress/trim
the output from the mappers in 1 machine.
b. Reduce communication overhead over the network between machines,
reduce data amount ….
2. (5%) In Spark, what's the difference between Transformation and Action?
Ans:
a. Transformation => create new RDD, only add record to lineage and no
computation. (Like Mapper)
b. Action => results will be computed and sent back to master
(Like Reducer)
3. (5%) What is aggregator in Pregel? How can it be utilized? Please give at
least 2 examples.
Ans:
a. A conponent that computes and maintains shared global variables
derived from values provided by vertices.
b. global communication, state monitoring, execution flow control ... etc.
4. (5%) What does "defined" means in GFS? (Hint: compare with the definition
of "consistent" in GFS)
Ans:
Consistent, (the order of all modifications on primary and replica are
the same), and all writes will be in the same order as issued by clients
(serial)
primary ■■■■■ ■■■■■ ■■■■■
replica ■■■■■ ■■■■■ ■■■■■
defined consistent inconsistent
2 (20%) Map-Reduce
In statistics, logistic regression a type of probabilistic classification
model. Given large scale of data point, the format of each data point is as
below. Given the original Logistic Regression and MapReduce Example
(WordCount). Please rewrite the logistic regression algorithm into MapReduce.
╭ X_N1 Y_1 ╮
│ ﹒ │
│ ︰ │
m data point < X_Ni Y_i > (X_N is N-Dimensional Vector) (1)
│ ﹒ │ (Y is Label)
│ ︰ │
╰ X_Nm Y_m ╯
┌────────────────────────────┐
1│w = Vector.Random(N) // N-dimensional vector │
2│for i from 1 to ITERATIONS do{ │
3│ // Compute gradient │
4│ g = Vector.Zero(N) // N-dimensional zero vector │
5│ for every data point (XN, Y) do { │
6│ // XN is a vector, Y is +1 or 0 │
7│ g += (Y * XN) / (1 + exp(Y * MatrixMulti (w * XN))) │
8│ } │
9│ w -= LEARNING_RATE * g │
10│} │
└────────────────────────────┘
Listing 1: Logistic Regression Algorithm
┌───────────────┐
1│main() │
2│{ │
3│ runJob() │
4│} │
5│ │
6│map(key, value) │
7│{ │
8│ // key: document name │
9│ // value: document content │
10│ for each word w in value: │
11│ EmitIntermediate(w, "1") │
12│} │
13│ │
14│reduce(key, value) │
15│{ │
16│ // key: a word │
17│ // values: a list of counts │
18│ int result = 0; │
19│ for each v in values: │
20│ result += ParseInt(v); │
21│ Emit( AsString(result) ); │
22│} │
└───────────────┘
Listing 2: WordCount in MapReduce
Ans:
┌───────────────────────────┐
│main(){ │
│ w = Vector.Random(N) │
│ for i from 1 to ITERATIONS do{ │
│ runJob(); │
│ } │
│ Output(w) │
│} │
│ │
│map(key, value){ │
│ g = Vector.Zero(N) │
│ // key: document name, value: document content │
│ for each line L in value │
│ { │
│ XN = value.parseX() │
│ Y = value.parseY() │
│ g += (Y * XN) / (1 + exp(Y * MatrixMulti(w * XN)))│
│ } │
│ EmitIntermediate(constant_key, g); │
│} │
│ │
│reduce(key, value){ │
│ g = Vector.Zero(N) │
│ for each v in value │
│ { │
│ g += value │
│ } │
│ w -= LEARNING_RATE * g │
│} │
└───────────────────────────┘
3 (20%) Google File System
1. (10%) Please give 2 brief descriptions of the trade-off of chunk size
in GFS.
Ans:
The following descriptions are the advantages.
● Large
○ Reduce client's need to interact with the master because reads and
writes on the same chunk require only one initial request to the
master for chunk location information.
○ Reduce network overhad by keeping a persistent TCP connection to
the chunkserver over an extended period of time. (Since a large
chunk, a client is more likely to perform many operations on a
given chunk.)
○ Reduce the size of the metadata stored on the master. This allows
master to keep the metadata in memory.
● Small
○ More likely to avoid a chunk becoming hot spot.
○ Less re-replication overhead when the number of available replicas
falls below a user-specified goal.
2. (10%) Please give a brief description of how GFS achieve fault tolerance
(both high availability and data integrity).
Ans:
Reference to p.37&38 in slides of "Google File System".
● High availability (5%)
○ Fast recovery
Both the master and the chunkserver are designed to restore their
state and start in seconds no matter how they terminated. We do not
distinguish between normal and abnormal termination.
○ Chunk replication
Each chunk is replicated on multiple chunkservers on different
racks. (Default is 3)
○ Master replication
■ The master state is replicated for reliability. Its operation
log and checkpoints are replicated on multiple machines.
■ Shadow masters provide read-only access to the file system even
when the primary master is down.
● Data integrity (5%)
○ Checksum
A chunk is broken up into 64 KB blocks. Each has a corresponding
32 bit checksum.
○ Verify at read & write times
For reads, the chunkserver verifies the checksum of data blocks
that overlap the read range before returning any data to the
requester, whether a client or another chunkserver. Therefore
chunkservers will not propagate corruptions to other machines.
○ Background scan for rarely used data
During idle periods, chunkservers can scan and verify the contents
of inactive chunks.
4 (20%) Spark
Most MapReduce systems are built around an acyclic data ow model that is not
suitable for iterative applications. Therefore, Spark propose an in-memory
computing framework that supports these applications while retaining the
scalability and fault tolerance of MapReduce.
1. (5%) Please explain why MapReduce framework is infeasible when it comes
to iterative computations.
Ans: MapReduce framework needs disk I/O after map phase and reduce phase.
When doing each iteration, it writes files into disk and reads files
from disk.
2. (5%) Does Spark always outperform Hadoop MapReduce framework? If no,
please give an example and briefly explain it.
Ans: If we just need one iteration or the input data size doesnt fit in
memory.
3. (10%) What's the advantage of Lineage when it comes to fault tolerance on
big data computation. (Hint: compare with traditional approaches like
logging / checkpoint)
Ans: Lineage consists of operations of RDDs. It store less information
and easy to know which operation needs to be recomputed. When the
rdds are lost, it can be reconstruct according to the lineage. In
constrast, checkpoint and loggings need to store lots of imformation
and when the data is lost, it needs to fall back to last checkpoint.
5 (20%) Bulk Synchrounous Parallel
1. (6%) Please describe what the Bulk Synchronous Parallel (BSP) model is in
detail, i.e. you must give detail explanation of what each phase is doing.
Ans:
BSP includes three phase:
a. (2%) computation - perform computation in parallel in different
processing node.
b. (2%) communication - processor nodes perform communication with each
node.
c. (2%) barrier synchronization - when the process finish communicating,
it waits until all process finishes the previous phase.
2. (8%) Describe how graph algorithms can be mapped to the BSP model,
including how the computation terminates.
Ans:
a. Each vertex can be mapped to a processing node, which can be compute
in parallel. (2%)
b. After computation, each vertex send messages to other vertices
c. The vertex may vote to halt depending on the graph algorithm and
become inactive. (3%)
d. Another BSP iteration is performed until the terminating condition
e. The BSP iteration terminates when all node is inactive
(vote to halt) (1%) and no message is in transit (2%).
3. (6%) Single source shortest path is an important problem in graph
computing. Given a graph and a starting vertex, we want to find the
shortest path to every node other than the starting node. Please devise
an algorithm using the BSP model that calculates the shortest "distance"
to every node other than the starting node. The graph has only positive
edges and edges might have different values. You have to describe your
algorithm in pseudo code. Note that we only care about the shortest
distance so you don't have to find the actual path.
Ans:
Won't halt (-3%)
min_dis = 0 if isStartingNode() else INT_MAX
for m in all messages {
if m.value < min_dis {
min_dis = m.value
}
}
if min_dis < self.value {
self.value = min_dis
for e in all edges {
sendMessage(e, min_dis + e.value)
}
}
voteToHalt()
6 (20%)(Bonus) K-Means in Spark
In machine learning, the K-means algorithm is very useful for solving
unsupervised clustering problem, which tries to partition a set of data
points into K groups where K is a given number. The algorithm is simplified
as following:
1. Given K, the number of groups we'd like to partition the D-dimensional
data point set X into.
2. Randomly generate K points, C_(0~(K-1)), as centroids of the groups.
3. Each point in X take the id of it's nearest centroid as it's group
label.
4. Update C_i as the arithmetic mean of all points taking C_i as it's
centroid.
5. Repeat 3 & 4 until iteration limit exceeds or convergence (centroids
not changing much).
圖解:http://i.imgur.com/9B5WpbT.png
However, the computation on finding centroid would be a great bottleneck
when we have enormous amount of data points, and the iterative intrinsic is
make it infeasible for using MapReduce framework. So in this section, you're
asked to
┌─────┐ ┌───┐ ┌─────┐
│input file│─→│links │ │ ranks_0 │
└─────┘map └───┘ └─────┘
│ ↓
│ join ┌─────┐
├────→│contribs_0│
│ └─────┘
│ ↓ reduce + map
│ ┌─────┐
│ │ ranks_1 │
│ └─────┘
│ ↓
│ ┌─────┐
├────→│contribs_1│
│ └─────┘
│ ↓
│ ┌─────┐
│ │ ranks_2 │
│ └─────┘
│ ↓
│ ┌─────┐
└────→│contribs_2│
└─────┘
↓
…
Figure 2: PageRank Lineage
1. (5%) Draw out the Lineage, like something in Figure 2.
Ans:
┌─────┐ ┌────┐ ┌──────┐
│input file│─→│points X│──→ │ C_0 │
└─────┘map └────┘sample └──────┘
│ ↓
│ ┌──────┐
├─────→│ (C_0, X) │
│ map └──────┘
│ ↓ group by key
│ ┌──────┐
│ │Group Result│
│ └──────┘
│ ↓ mapValues + map
│ ┌──────┐
│ │ C_1 │
│ └──────┘
│ ↓
│ ┌──────┐
└─────→│ (C_1, X) │
map └──────┘
↓
┌──────┐
│Group Result│
└──────┘
↓
…
2. (15%) Write Spark Driver Program psuedo code for K-means here and briefly
explain your design.
Ans:
def main() {
// Initiate data
var data = parse_points_from_file("data.txt").cache()
var centroids = data.sample(K) // sample K points from data set
var idx = range(0, K)
var grouped_points
// Iterate until convergence or limit exceeds
for ( i <- 0 until ITERATION_LIMIT ) {
grouped_points = data.map(p => (closest(p, centroids), p))
.groupByKey()
var new_centroids = grouped_points
.mapValues(p_in_group => avg(p_in_group))
.map(x => x._2)
centroids = new_centroids;
if (converged(centroids, new_centroids))
break
}
// Print results
grouped_points.collect()
for ( key <- grouped_points.keys() ){
for ( point <- grouped_points[key] ){
print(key, point)
}
}
}
Again, PSEUDO-CODE is enough, but your answer must be clearly commented and
preserve critical properties of Spark. (Operation granularity, RDD creation,
Broadcast variable)
Here are some assumptions:
1. For simplicity, just print out the clustering result as
[LABEL POINT CONTENT]. e.g.
1 0.882 0.24 0.25213 ...
1 0.1238 1.8102 1230.33 ...
2 3.212 223.1 120.12
...
2. Data is provided as a file: each line, consisting of D fields of real
numbers corresponding to the value of the a coordinate respectively in
order, represents a data point.
3. K, D, and ITERATION_LIMIT are predefined, meaning that you can use them
directly.
4. Use Euclidean distance as your distance measure.
5. euc_dist(a, b), avg(point_lists), closest(centroids_list, p) are built
in; use them directly.
6. Mark each cluster with integer IDs.
7. Although in Spark you can use "for" with slightly different syntax to
apply operations on an object parallelly, please use for_each( data ) ...
instead for such case. It's helpful if you don't know names of certain
built-in operations, but use it carefully. In fact, you can finish this
section without it.
8. Make your own trivial assumptions (ex: file name)
Hints
1. You can invoke operations on RDDs in chain. For example:
var a = read_from_file("sample.txt") // [1,2,3,4,5]
var b = a.map(i => i*2).map(i => i*4) // [8,16,24,32,40]
2. K is very small, say 5 or 10.
3. In Spark, if you have some information that should be shared among
workers, say, a look up table for example, you should make it a Broadcast
variable. e.g.
var lookup_table = parse_graph_from_file("graph.txt")
lookup_table = spark.braodcast(lookup_table)
4. groupByKey() is helpful.
5. Figure 3 is a table of transformations and actions of RDD. Again,
pseudo code is fine.
┌───────────────────────────────────┐
│ Transformations │
├───────────────────────────────────┤
│ map(f: T => U) : RDD[T] => RDD[U] │
│ filter(f: T => Bool) : RDD[T] => RDD[T] │
│ flatMap(f: T => Seq[U]) : RDD[T] => RDD[U] │
│ sample(fraction: Float) : RDD[T] => RDD[T] │
│ (Deterministic sampling)│
│ groupByKey() : RDD[(K, V)] =RDD[(K, Seq[V])] │
│ reduceByKey(f: (V, V) => V) : RDD[(K, V)] => RDD[(K, V)] │
│ union() : (RDD[T], RDD[T]) => RDD[T] │
│ join() : (RDD[(K, V)], RDD[(K, W)]) = │
│ RDD[(K, (V, W))]│
│ cogroup() : (RDD[(K, V)], RDD[(K, W)]) => │
│ RDD[(K, (Seq[V], Seq[W]))]│
│ crossProduct() : (RDD[T], RDD[U]) => RDD[(T, U)] │
│ mapValues(f: V => W) : RDD[(K, V)] =RDD[(K, W)] │
│ (Preserves partitioning)│
│ sort(c: Comparator[K]) : RDD[(K, V)] => RDD[(K, V)] │
│partitionBy(p: Partitioner[K]) : RDD[(K, V)] => RDD[(K. V)] │
└───────────────────────────────────┘
┌───────────────────────────────────┐
│ Actions │
├───────────────────────────────────┤
│ count() : RDD[T] => Long │
│ collect() : RDD[T] => Seq[T] │
│reduce(f: (T, T) => T) : RDD[T] => T │
│ lookup(k: K) : RDD[(K, V)] => Seq[V] │
│ (On hash/range partitioned RDDs)│
│ save(path: String) : Outputs RDD to a storage system, e.g., HDFS│
└───────────────────────────────────┘
Figure 3: RDD Transformation and Actions