MJay

Spark에서 왜 결과값이 16*16이 아니라 4*4 인지 Code를 통해 설명 본문

Cloud Computing/Spark

Spark에서 왜 결과값이 16*16이 아니라 4*4 인지 Code를 통해 설명

MJSon 2017. 10. 2. 16:57
Edit
Spark에서 왜 결과값이 16*16이 아니라 4*4 인지 Code를 통해 설명

private[mllib] object GridPartitioner {
/** Creates a new [[GridPartitioner]] instance. */
def apply(rows: Int, cols: Int, rowsPerPart: Int, colsPerPart: Int): GridPartitioner = {
new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
}
/** Creates a new [[GridPartitioner]] instance with the input suggested number of partitions. */
def apply(rows: Int, cols: Int, suggestedNumPartitions: Int): GridPartitioner = {
require(suggestedNumPartitions > 0)
val scale = 1.0 / math.sqrt(suggestedNumPartitions)
val rowsPerPart = math.round(math.max(scale * rows, 1.0)).toInt
val colsPerPart = math.round(math.max(scale * cols, 1.0)).toInt
new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
}
}

def multiply(
other: BlockMatrix,
numMidDimSplits: Int): BlockMatrix = {
require(numCols() == other.numRows(), "The number of columns of A and the number of rows " +
s"of B must be equal. A.numCols: ${numCols()}, B.numRows: ${other.numRows()}. If you " +
"think they should be equal, try setting the dimensions of A and B explicitly while " +
"initializing them.")
require(numMidDimSplits > 0, "numMidDimSplits should be a positive integer.")
if (colsPerBlock == other.rowsPerBlock) {
// resultPartitioner represent only the shpae of the output matrix.
// In case of multipliation of 1*1 and 1*32 matrix, the rp becomes (1, 32, 1, 6) --> distribution of output task
val resultPartitioner = GridPartitioner(numRowBlocks, other.numColBlocks,
math.max(blocks.partitions.length, other.blocks.partitions.length))
// leftDestinations looks like val leftDestinations = List((0,0)->Set(0,4,8,12),(0,1)->Set(0,4,8,12),(1,0)->Set(1,5,9,13),(1,1)->Set(1,5,9,13),(2,0)->Set(2,6,10,14),(2,1)->Set(2,6,10,14),(3,0)->Set(3,7,11,15),(3,1)->Set(3,7,11,15)).toMap
// for left matrix of 4X2 and right matrix of 2X4, output matrix is 4X4, and the leftDestinations contains
// block ID -> Set(the index in output matrix), For example, output is in the column major order, (0,0) block of left matrix is used for output of 0,4,8,12
val (leftDestinations, rightDestinations)
= simulateMultiply(other, resultPartitioner, numMidDimSplits)
// Each block of A must be multiplied with the corresponding blocks in the columns of B.
// flatA looks like the index of output matrix, and the left matrix block index and blocks that exist in the current worker.
// In this step, the shuffle does not happen. The executor simply write the output ID with the corresponding left/right matrix block position and blocks - For 2X2 %*% 2X2 matrix, a block is written twice, and the same block is written twice.
val flatA = blocks.flatMap { case ((blockRowIndex, blockColIndex), block) =>
val destinations = leftDestinations.getOrElse((blockRowIndex, blockColIndex), Set.empty)
destinations.map(j => (j, (blockRowIndex, blockColIndex, block)))
}
// Each block of B must be multiplied with the corresponding blocks in each row of A.
// flatB contains the index of output matrix and the block ID of right matrix and block
val flatB = other.blocks.flatMap { case ((blockRowIndex, blockColIndex), block) =>
val destinations = rightDestinations.getOrElse((blockRowIndex, blockColIndex), Set.empty)
destinations.map(j => (j, (blockRowIndex, blockColIndex, block)))
}
val intermediatePartitioner = new Partitioner {
override def numPartitions: Int = resultPartitioner.numPartitions * numMidDimSplits
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
}
// cogroup allow to map the output id with corresponding left and right matrix block
val newBlocks = flatA.cogroup(flatB, intermediatePartitioner).flatMap { case (pId, (a, b)) =>
a.flatMap { case (leftRowIndex, leftColIndex, leftBlock) =>
b.filter(_._1 == leftColIndex).map { case (rightRowIndex, rightColIndex, rightBlock) =>
val C = rightBlock match {
case dense: DenseMatrix => leftBlock.multiply(dense)
case sparse: SparseMatrix => leftBlock.multiply(sparse.toDense)
case _ =>
throw new SparkException(s"Unrecognized matrix type ${rightBlock.getClass}.")
}
((leftRowIndex, rightColIndex), C.asBreeze)
}
}
}.reduceByKey(resultPartitioner, (a, b) => a + b).mapValues(Matrices.fromBreeze)
// TODO: Try to use aggregateByKey instead of reduceByKey to get rid of intermediate matrices
new BlockMatrix(newBlocks, rowsPerBlock, other.colsPerBlock, numRows(), other.numCols())
} else {
throw new SparkException("colsPerBlock of A doesn't match rowsPerBlock of B. " +
s"A.colsPerBlock: $colsPerBlock, B.rowsPerBlock: ${other.rowsPerBlock}")
}
}
}

flatMap

결과물 0번에 들어가는 left part 0,0, 0,1

local directory

cogroup -> 결과값에 들어가갈 인덱스끼리 모은다. 그리고 행렬 곱셈을 한다. 그 다음에 reduceByKey를 통해 알아본다.

%23%23%23%23%23%20Spark%uC5D0%uC11C%20%uC65C%20%uACB0%uACFC%uAC12%uC774%2016*16%uC774%20%uC544%uB2C8%uB77C%204*4%20%uC778%uC9C0%20Code%uB97C%20%uD1B5%uD574%20%uC124%uBA85%0A@%28Marxico%29%0A%0A%60%60%60scala%0Aprivate%5Bmllib%5D%20object%20GridPartitioner%20%7B%0A%0A%20%20/**%20Creates%20a%20new%20%5B%5BGridPartitioner%5D%5D%20instance.%20*/%0A%20%20def%20apply%28rows%3A%20Int%2C%20cols%3A%20Int%2C%20rowsPerPart%3A%20Int%2C%20colsPerPart%3A%20Int%29%3A%20GridPartitioner%20%3D%20%7B%0A%20%20%20%20new%20GridPartitioner%28rows%2C%20cols%2C%20rowsPerPart%2C%20colsPerPart%29%0A%20%20%7D%0A%0A%20%20/**%20Creates%20a%20new%20%5B%5BGridPartitioner%5D%5D%20instance%20with%20the%20input%20suggested%20number%20of%20partitions.%20*/%0A%20%20def%20apply%28rows%3A%20Int%2C%20cols%3A%20Int%2C%20suggestedNumPartitions%3A%20Int%29%3A%20GridPartitioner%20%3D%20%7B%0A%20%20%20%20require%28suggestedNumPartitions%20%3E%200%29%0A%20%20%20%20val%20scale%20%3D%201.0%20/%20math.sqrt%28suggestedNumPartitions%29%0A%20%20%20%20val%20rowsPerPart%20%3D%20math.round%28math.max%28scale%20*%20rows%2C%201.0%29%29.toInt%0A%20%20%20%20val%20colsPerPart%20%3D%20math.round%28math.max%28scale%20*%20cols%2C%201.0%29%29.toInt%0A%20%20%20%20new%20GridPartitioner%28rows%2C%20cols%2C%20rowsPerPart%2C%20colsPerPart%29%0A%20%20%7D%0A%7D%0A%60%60%60%0A%0A%21%5BAlt%20text%5D%28./1506648587497.png%29%0A%0A%0A%60%60%60scala%0Adef%20multiply%28%0A%20%20%20%20%20%20other%3A%20BlockMatrix%2C%0A%20%20%20%20%20%20numMidDimSplits%3A%20Int%29%3A%20BlockMatrix%20%3D%20%7B%0A%20%20%20%20require%28numCols%28%29%20%3D%3D%20other.numRows%28%29%2C%20%22The%20number%20of%20columns%20of%20A%20and%20the%20number%20of%20rows%20%22%20+%0A%20%20%20%20%20%20s%22of%20B%20must%20be%20equal.%20A.numCols%3A%20%24%7BnumCols%28%29%7D%2C%20B.numRows%3A%20%24%7Bother.numRows%28%29%7D.%20If%20you%20%22%20+%0A%20%20%20%20%20%20%22think%20they%20should%20be%20equal%2C%20try%20setting%20the%20dimensions%20of%20A%20and%20B%20explicitly%20while%20%22%20+%0A%20%20%20%20%20%20%22initializing%20them.%22%29%0A%20%20%20%20require%28numMidDimSplits%20%3E%200%2C%20%22numMidDimSplits%20should%20be%20a%20positive%20integer.%22%29%0A%20%20%20%20if%20%28colsPerBlock%20%3D%3D%20other.rowsPerBlock%29%20%7B%0A%20%20%20%20%20%20//%20resultPartitioner%20represent%20only%20the%20shpae%20of%20the%20output%20matrix.%0A%20%20%20%20%20%20//%20In%20case%20of%20multipliation%20of%201*1%20and%201*32%20matrix%2C%20the%20rp%20becomes%20%281%2C%2032%2C%201%2C%206%29%20%20--%3E%20distribution%20of%20output%20task%0A%20%20%20%20%20%20val%20resultPartitioner%20%3D%20GridPartitioner%28numRowBlocks%2C%20other.numColBlocks%2C%0A%20%20%20%20%20%20%20%20math.max%28blocks.partitions.length%2C%20other.blocks.partitions.length%29%29%0A%20%20%20%20%20%20//%20leftDestinations%20looks%20like%20val%20leftDestinations%20%3D%20List%28%280%2C0%29-%3ESet%280%2C4%2C8%2C12%29%2C%280%2C1%29-%3ESet%280%2C4%2C8%2C12%29%2C%281%2C0%29-%3ESet%281%2C5%2C9%2C13%29%2C%281%2C1%29-%3ESet%281%2C5%2C9%2C13%29%2C%282%2C0%29-%3ESet%282%2C6%2C10%2C14%29%2C%282%2C1%29-%3ESet%282%2C6%2C10%2C14%29%2C%283%2C0%29-%3ESet%283%2C7%2C11%2C15%29%2C%283%2C1%29-%3ESet%283%2C7%2C11%2C15%29%29.toMap%0A%20%20%20%20%20%20//%20for%20left%20matrix%20of%204X2%20and%20right%20matrix%20of%202X4%2C%20output%20matrix%20is%204X4%2C%20and%20the%20leftDestinations%20contains%0A%20%20%20%20%20%20//%20block%20ID%20-%3E%20Set%28the%20index%20in%20output%20matrix%29%2C%20For%20example%2C%20output%20is%20in%20the%20column%20major%20order%2C%20%280%2C0%29%20block%20of%20left%20matrix%20is%20used%20for%20output%20of%200%2C4%2C8%2C12%0A%20%20%20%20%20%20val%20%28leftDestinations%2C%20rightDestinations%29%0A%20%20%20%20%20%20%20%20%3D%20simulateMultiply%28other%2C%20resultPartitioner%2C%20numMidDimSplits%29%0A%20%20%20%20%20%20//%20Each%20block%20of%20A%20must%20be%20multiplied%20with%20the%20corresponding%20blocks%20in%20the%20columns%20of%20B.%0A%20%20%20%20%20%20//%20flatA%20looks%20like%20the%20index%20of%20output%20matrix%2C%20and%20the%20left%20matrix%20block%20index%20and%20blocks%20that%20exist%20in%20the%20current%20worker.%0A%20%20%20%20%20%20//%20In%20this%20step%2C%20the%20shuffle%20does%20not%20happen.%20The%20executor%20simply%20write%20the%20output%20ID%20with%20the%20corresponding%20left/right%20matrix%20block%20position%20and%20blocks%20-%20For%202X2%20%25*%25%202X2%20matrix%2C%20a%20block%20is%20written%20twice%2C%20and%20the%20same%20block%20is%20written%20twice.%0A%20%20%20%20%20%20val%20flatA%20%3D%20blocks.flatMap%20%7B%20case%20%28%28blockRowIndex%2C%20blockColIndex%29%2C%20block%29%20%3D%3E%0A%20%20%20%20%20%20%20%20val%20destinations%20%3D%20leftDestinations.getOrElse%28%28blockRowIndex%2C%20blockColIndex%29%2C%20Set.empty%29%0A%20%20%20%20%20%20%20%20destinations.map%28j%20%3D%3E%20%28j%2C%20%28blockRowIndex%2C%20blockColIndex%2C%20block%29%29%29%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20//%20Each%20block%20of%20B%20must%20be%20multiplied%20with%20the%20corresponding%20blocks%20in%20each%20row%20of%20A.%0A%20%20%20%20%20%20//%20flatB%20contains%20the%20index%20of%20output%20matrix%20and%20the%20block%20ID%20of%20right%20matrix%20and%20block%0A%20%20%20%20%20%20val%20flatB%20%3D%20other.blocks.flatMap%20%7B%20case%20%28%28blockRowIndex%2C%20blockColIndex%29%2C%20block%29%20%3D%3E%0A%20%20%20%20%20%20%20%20val%20destinations%20%3D%20rightDestinations.getOrElse%28%28blockRowIndex%2C%20blockColIndex%29%2C%20Set.empty%29%0A%20%20%20%20%20%20%20%20destinations.map%28j%20%3D%3E%20%28j%2C%20%28blockRowIndex%2C%20blockColIndex%2C%20block%29%29%29%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20val%20intermediatePartitioner%20%3D%20new%20Partitioner%20%7B%0A%20%20%20%20%20%20%20%20override%20def%20numPartitions%3A%20Int%20%3D%20resultPartitioner.numPartitions%20*%20numMidDimSplits%0A%20%20%20%20%20%20%20%20override%20def%20getPartition%28key%3A%20Any%29%3A%20Int%20%3D%20key.asInstanceOf%5BInt%5D%0A%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20//%20cogroup%20allow%20to%20map%20the%20output%20id%20with%20corresponding%20left%20and%20right%20matrix%20block%0A%20%20%20%20%20%20val%20newBlocks%20%3D%20flatA.cogroup%28flatB%2C%20intermediatePartitioner%29.flatMap%20%7B%20case%20%28pId%2C%20%28a%2C%20b%29%29%20%3D%3E%0A%20%20%20%20%20%20%20%20a.flatMap%20%7B%20case%20%28leftRowIndex%2C%20leftColIndex%2C%20leftBlock%29%20%3D%3E%0A%20%20%20%20%20%20%20%20%20%20b.filter%28_._1%20%3D%3D%20leftColIndex%29.map%20%7B%20case%20%28rightRowIndex%2C%20rightColIndex%2C%20rightBlock%29%20%3D%3E%0A%20%20%20%20%20%20%20%20%20%20%20%20val%20C%20%3D%20rightBlock%20match%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20case%20dense%3A%20DenseMatrix%20%3D%3E%20leftBlock.multiply%28dense%29%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20case%20sparse%3A%20SparseMatrix%20%3D%3E%20leftBlock.multiply%28sparse.toDense%29%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20case%20_%20%3D%3E%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20throw%20new%20SparkException%28s%22Unrecognized%20matrix%20type%20%24%7BrightBlock.getClass%7D.%22%29%0A%20%20%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%20%20%20%20%28%28leftRowIndex%2C%20rightColIndex%29%2C%20C.asBreeze%29%0A%20%20%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%20%20%7D%0A%20%20%20%20%20%20%7D.reduceByKey%28resultPartitioner%2C%20%28a%2C%20b%29%20%3D%3E%20a%20+%20b%29.mapValues%28Matrices.fromBreeze%29%0A%20%20%20%20%20%20//%20TODO%3A%20Try%20to%20use%20aggregateByKey%20instead%20of%20reduceByKey%20to%20get%20rid%20of%20intermediate%20matrices%0A%20%20%20%20%20%20new%20BlockMatrix%28newBlocks%2C%20rowsPerBlock%2C%20other.colsPerBlock%2C%20numRows%28%29%2C%20other.numCols%28%29%29%0A%20%20%20%20%7D%20else%20%7B%0A%20%20%20%20%20%20throw%20new%20SparkException%28%22colsPerBlock%20of%20A%20doesn%27t%20match%20rowsPerBlock%20of%20B.%20%22%20+%0A%20%20%20%20%20%20%20%20s%22A.colsPerBlock%3A%20%24colsPerBlock%2C%20B.rowsPerBlock%3A%20%24%7Bother.rowsPerBlock%7D%22%29%0A%20%20%20%20%7D%0A%20%20%7D%0A%7D%0A%60%60%60%0A%0AflatMap%20%0A%0A%uACB0%uACFC%uBB3C%200%uBC88%uC5D0%20%uB4E4%uC5B4%uAC00%uB294%20left%20part%200%2C0%2C%200%2C1%20%0A%0Alocal%20directory%20%0A%0Acogroup%20-%3E%20%uACB0%uACFC%uAC12%uC5D0%20%uB4E4%uC5B4%uAC00%uAC08%20%uC778%uB371%uC2A4%uB07C%uB9AC%20%uBAA8%uC740%uB2E4.%20%uADF8%uB9AC%uACE0%20%uD589%uB82C%20%uACF1%uC148%uC744%20%uD55C%uB2E4.%20%uADF8%20%20%uB2E4%uC74C%uC5D0%20reduceByKey%uB97C%20%uD1B5%uD574%20%uC54C%uC544%uBCF8%uB2E4.%0A%0A

'Cloud Computing > Spark' 카테고리의 다른 글

r4 Instance Scalability 조사  (0) 2017.10.02
Spark-EC2 Too large frame  (0) 2017.10.02
Spark Cluster 구조  (0) 2017.10.02
Spark-EC2에서 EBS Instance Storage를 올리는 방법  (1) 2017.10.02
SparkCL 논문을 읽어봄  (0) 2017.09.12