[TOC]
KeystoneML Source Code An KeystoneML Example Programming Guide
A Pipeline is a dataflow that takes some input data and maps it to some output data through a series of nodes
.
package workflow
trait Pipeline[A, B] {
// ...
def apply(in: A): B
def apply(in: RDD[A]): RDD[B]
//...
final def andThen[C](next: Pipeline[B, C]): Pipeline[A, C] = //...
}
Nodes come in two flavors: Transformers
and Estimators
.
It takes an input, and deterministically transforms it into an output
package workflow
abstract class Transformer[A, B : ClassTag] extends TransformerNode[B] with Pipeline[A, B] {
def apply(in: A): B
def apply(in: RDD[A]): RDD[B] = in.map(apply)
//...
}
That is Estimator
takes in training data as an RDD
to its fit()
method, and outputs a Transformer
package workflow
abstract class Estimator[A, B] extends EstimatorNode {
protected def fit(data: RDD[A]): Transformer[A, B]
// ...
}
val labels: RDD[Vector[Double]] = //...
val trainImages: RDD[Image] = //...
val pipe = GrayScaler andThen
ImageVectorizer andThen
(LinearMapEstimator(), trainImages, trainLabels) andThen
MaxClassifier
Nodes : images nodes useful for image processing : learning extends Estimator, as estimators for learning process : stats nodes useful for statistics : nlp : util : provides some utility functions
Loaders : load data (extends transformer) :
Evaluation: some evaluation criterion :
Pipelines (the running program) :
Utils
: images : ImageUtils : some utility functions (we should write some functions in this class)
Pipelines : images : provides some examples for doing image classification pipelines
Fft2: breeze.signal.fourierTr Ifft2: breeze.signal.IfourierTr ...
A DataLoarder
transformer using ImageLoaderUtils class which includes an ImageExtractor and an LabelExtractor
angle
: Angle phase of a complex matrix
im2bw
:Convert image to binary image, based on threshold (should be easy to implement)
bwlabel
:Label connected components in 2-D binary image
RegiponGroups(imageLabel, ‘'BoundingBox'’)
:Returns the smallest rectangle containing the region
Ismember(A,B)
: Array elements that are members of set array
Mat2gray
: Convert matrix to grayscale image
strel
imerode
repmat :
In utils, create a WebScaleMLUtl, which shuld include breezeVectorToTensor
, tensorToDenseBreeze
/* To Do */
There are two pipelines in our project, in which, one is for training and the other one is for inference.
For the training part, the Pseudocode is shown in Code 1.1 below.
Code 1.1
object BaogangTraining extends Logging{
def run(sc: SparkContext, config: BaogangConfig): Pipeline[Image, Int] = {
val numClasses = 2
val trainData = BaogangLoader(sc, config.trainLocation).cache()
val trainImages = ImageExtractor(trainData)
val labelExtractor = LabelExtractor andThen
ClassLabelIndicatorsFromIntLabels(numClasses) andThen
new Cacher[DenseVector[Double]]
val trainLabels = labelExtractor(trainData)
val predictor = ImageReScaler(a,a) andThen
GrayScaler andThen
new Cacher[DenseVector[Double]] andThen
ImageVectorizer andThen
// (new StandardScaler, trainImages) andThen
ConvolutionalNormalization andThen
new Cacher[DenseVector[Double]] andThen
(ConvolutionalTrainer(conf), trainImages, trainLabels)
val testData = ImageExtractor(BaogangLoader(sc, config.trainLocation)).cache()
val processedTestImage = usefulImageExtractor.apply(testData)
val testPredicted = predictor(testParsedImgs)
}
}
It is easy to find that the wrokflow is: ImageExtractor -> ImageScalar -> GrayScaler -> ImageVectorizer -> ConvolutioanalNormalization (-> StandardScaler) -> ConvolutionalNormalization -> ConvolutionalTrainer
The signature of the corresponding objects are shown in Code1.3.
For the inference part, the Pseudocode is shown in Code 1.2 below. There are two sub-pipelines in the image processing part.
Code 1.2
object BaogangInferrence extends Logging{
def run(sc: SparkContext, config: BaogangConfig): Pipeline[Image, Int] = {
val usefulImageExtractor = Pipeline.gather {Seq(scrapProcessor, acidProcessor)} andThen Combiner
val testImages = ImageExtractor(BaogangLoader(sc, config.trainLocation)).cache()
val scrapProcessor = new Container(GrayScaler andThen
new ImFilter("replicate", k1) andThen
new ImFilter("replicate", k2) andThen
new BinaryConvertor(0.1) andThen
BwLabel andThen
new UsefulBoxExtractor) andThen
new ImageBoxCropper
val acidProcessor = new Container(GrayScaler andThen
ImageReScaler andThen
ImageMatrixizer andThen
FFT2 andThen
ToSaltMaper andThen
MatToGrayConvertor andThen
new UsefulBoxExtractor) andThen
new ImageBoxCropper
val processedTestImage = usefulImageExtractor.apply(testImages)
val usefulImageExtractor = Pipeline.gather { scrapProcessor :: acidProcessor :: Nil} andThen Combiner
val predictor = LoadBaogangPredictor()
val predict = predictor(processedTestImage)
}
}
The wrokflow is: p1 = ImageExtractor -> ImFilter("replicate", k1) -> ImFilter("replicate", k2) -> BinaryConvertor(0.1) -> BwLabel ->UsefulBoxExtractor -> ImageBoxCropper p2 = GrayScaler -> ImageMatrixizer -> FFT2 -> ToSaltMaper -> MatToGrayConvertor -> UsefulBoxExtractor -> ImageBoxCropper
p1 + p2 -> LoadBaogangPredictor-> predict
The signature of the corresponding objects are shown in Code1.3.
Code 1.3
ImFilter(imName: String) extends Transformer[Image,Image]
BinaryConvertor(threshold: Int) extends Transformer[Image,Image]
BwLabel extends Transformer[Image, DenseMatrix[Double]]
UsefulBoxExtractor extends Transformer[DenseMatrix[Double], Seq[BoundingBox]] {
overide apply(labelMatrix: DenseMatrix[Double]) =
boundingBoxGroups(labelMatrix)
def boundingBoxGroups(labelMatrix: DenseMatrix[Double]) : Seq[BoundingBox]
}
FilterSize(length: Int) extends Transformer[Seq[BoundingBox], Seq[BoundingBox]]
ImageBoxCropper extends Transformer[(Image, Seq[BoundingBox]), (Seq[Image], Seq[BoundingBox])] {
override apply(in: RDD[(Image, Seq[BoundingBox])]: Seq[Image] = {
in.map(pair => pair._1.map(box => cropImage(box, pair._2))
}
def cropImage(box: BoundingBox, image: Image)
}
ImageReScaler extends Transformer[Image,Image]
ImageMatrixizer extends Transformer[Image,DenseMatrix[Double]]
case class FFT2 extends Transformer[DenseMatrix[Double], DenseMatrix[Double]]
SaltMaper extends Transformer[DenseMatrix[Double], DenseMatrix[Double]]
MatToGrayConvertor extends Transformer[DenseMatrix[Double], Image]
Combiner extends Transformer[Seq[Seq[Image]], Image]
class ContainerA(p1: Pipeline[A,B]) extends Pipeline{
override def apply(in: RDD[A]) : B={
return (in.zip(p1.apply(in)))
}
}
class ContainerB(p1: Pipeline[A,B]) extends Pipeline{
override def apply(in: RDD[A]) : B={
return (p1.apply(in), attachment)
}
}
class ConvolutionalTrainer(Cofigurations ...) extends LabelEstimator[DenseVector[Double], DenseVector[Double], DenseVector[Double]]