MJay
Spark에서 왜 결과값이 16*16이 아니라 4*4 인지 Code를 통해 설명 본문
Spark에서 왜 결과값이 16*16이 아니라 4*4 인지 Code를 통해 설명
private[mllib] object GridPartitioner {
/** Creates a new [[GridPartitioner]] instance. */
def apply(rows: Int, cols: Int, rowsPerPart: Int, colsPerPart: Int): GridPartitioner = {
new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
}
/** Creates a new [[GridPartitioner]] instance with the input suggested number of partitions. */
def apply(rows: Int, cols: Int, suggestedNumPartitions: Int): GridPartitioner = {
require(suggestedNumPartitions > 0)
val scale = 1.0 / math.sqrt(suggestedNumPartitions)
val rowsPerPart = math.round(math.max(scale * rows, 1.0)).toInt
val colsPerPart = math.round(math.max(scale * cols, 1.0)).toInt
new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
}
}
def multiply(
other: BlockMatrix,
numMidDimSplits: Int): BlockMatrix = {
require(numCols() == other.numRows(), "The number of columns of A and the number of rows " +
s"of B must be equal. A.numCols: ${numCols()}, B.numRows: ${other.numRows()}. If you " +
"think they should be equal, try setting the dimensions of A and B explicitly while " +
"initializing them.")
require(numMidDimSplits > 0, "numMidDimSplits should be a positive integer.")
if (colsPerBlock == other.rowsPerBlock) {
// resultPartitioner represent only the shpae of the output matrix.
// In case of multipliation of 1*1 and 1*32 matrix, the rp becomes (1, 32, 1, 6) --> distribution of output task
val resultPartitioner = GridPartitioner(numRowBlocks, other.numColBlocks,
math.max(blocks.partitions.length, other.blocks.partitions.length))
// leftDestinations looks like val leftDestinations = List((0,0)->Set(0,4,8,12),(0,1)->Set(0,4,8,12),(1,0)->Set(1,5,9,13),(1,1)->Set(1,5,9,13),(2,0)->Set(2,6,10,14),(2,1)->Set(2,6,10,14),(3,0)->Set(3,7,11,15),(3,1)->Set(3,7,11,15)).toMap
// for left matrix of 4X2 and right matrix of 2X4, output matrix is 4X4, and the leftDestinations contains
// block ID -> Set(the index in output matrix), For example, output is in the column major order, (0,0) block of left matrix is used for output of 0,4,8,12
val (leftDestinations, rightDestinations)
= simulateMultiply(other, resultPartitioner, numMidDimSplits)
// Each block of A must be multiplied with the corresponding blocks in the columns of B.
// flatA looks like the index of output matrix, and the left matrix block index and blocks that exist in the current worker.
// In this step, the shuffle does not happen. The executor simply write the output ID with the corresponding left/right matrix block position and blocks - For 2X2 %*% 2X2 matrix, a block is written twice, and the same block is written twice.
val flatA = blocks.flatMap { case ((blockRowIndex, blockColIndex), block) =>
val destinations = leftDestinations.getOrElse((blockRowIndex, blockColIndex), Set.empty)
destinations.map(j => (j, (blockRowIndex, blockColIndex, block)))
}
// Each block of B must be multiplied with the corresponding blocks in each row of A.
// flatB contains the index of output matrix and the block ID of right matrix and block
val flatB = other.blocks.flatMap { case ((blockRowIndex, blockColIndex), block) =>
val destinations = rightDestinations.getOrElse((blockRowIndex, blockColIndex), Set.empty)
destinations.map(j => (j, (blockRowIndex, blockColIndex, block)))
}
val intermediatePartitioner = new Partitioner {
override def numPartitions: Int = resultPartitioner.numPartitions * numMidDimSplits
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
}
// cogroup allow to map the output id with corresponding left and right matrix block
val newBlocks = flatA.cogroup(flatB, intermediatePartitioner).flatMap { case (pId, (a, b)) =>
a.flatMap { case (leftRowIndex, leftColIndex, leftBlock) =>
b.filter(_._1 == leftColIndex).map { case (rightRowIndex, rightColIndex, rightBlock) =>
val C = rightBlock match {
case dense: DenseMatrix => leftBlock.multiply(dense)
case sparse: SparseMatrix => leftBlock.multiply(sparse.toDense)
case _ =>
throw new SparkException(s"Unrecognized matrix type ${rightBlock.getClass}.")
}
((leftRowIndex, rightColIndex), C.asBreeze)
}
}
}.reduceByKey(resultPartitioner, (a, b) => a + b).mapValues(Matrices.fromBreeze)
// TODO: Try to use aggregateByKey instead of reduceByKey to get rid of intermediate matrices
new BlockMatrix(newBlocks, rowsPerBlock, other.colsPerBlock, numRows(), other.numCols())
} else {
throw new SparkException("colsPerBlock of A doesn't match rowsPerBlock of B. " +
s"A.colsPerBlock: $colsPerBlock, B.rowsPerBlock: ${other.rowsPerBlock}")
}
}
}
flatMap
결과물 0번에 들어가는 left part 0,0, 0,1
local directory
cogroup -> 결과값에 들어가갈 인덱스끼리 모은다. 그리고 행렬 곱셈을 한다. 그 다음에 reduceByKey를 통해 알아본다.
'Cloud Computing > Spark' 카테고리의 다른 글
r4 Instance Scalability 조사 (0) | 2017.10.02 |
---|---|
Spark-EC2 Too large frame (0) | 2017.10.02 |
Spark Cluster 구조 (0) | 2017.10.02 |
Spark-EC2에서 EBS Instance Storage를 올리는 방법 (1) | 2017.10.02 |
SparkCL 논문을 읽어봄 (0) | 2017.09.12 |