Last active
December 15, 2015 05:29
-
-
Save krishnanraman/5209602 to your computer and use it in GitHub Desktop.
Pail example : writejob - a job to create Pails, readjob - a job to read Pails that have been created.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.backtype.hadoop.pail.PailStructure | |
import java.util.{ List => JList } | |
import scala.collection.JavaConverters._ | |
import com.twitter.scalding._ | |
import com.twitter.scalding.commons.source.{PailSource,CodecPailStructure} | |
import com.twitter.bijection.{NumericInjections, Injection} | |
class PailTest2Write(args : Args) extends Job(args) { | |
args("io") match { | |
case "read" => readjob | |
case "write" => writejob | |
} | |
def mkPailStructure = { | |
val func = ((obj:Int) => if( obj < 50) List("./belowfifty/" + (obj % 7)) else List("./abovefifty/" + (obj % 7))) | |
val validator = ((x:List[String])=>true) | |
val mytype = classOf[Int] | |
val myinjection = new NumericInjections{}.int2BigEndian | |
val cps = new MyCodecPailStructure[Int] | |
cps.setParams(func, validator, mytype, myinjection) | |
cps | |
} | |
def readjob = { | |
val cps = mkPailStructure | |
val paths = List("belowfifty", "abovefifty").map( x=> (0 to 6).toList.map( y=> x+ "/" + y)).flatten | |
paths.foreach( k => println( k.mkString )) | |
val src = paths.map( path => PailSource.source[Int]( "pailtest", cps, Array(List(path)))) | |
src.zipWithIndex.foreach( s => s._1.read.write( new Tsv("pailoutput"+s._2))) | |
} | |
def writejob = { | |
val cps = mkPailStructure | |
val pipe = IterableSource((1 to 100), "src").read | |
val sink = PailSource.sink[Int]( "pailtest", cps) | |
pipe.write(sink) | |
} | |
} | |
class MyCodecPailStructure[T] extends PailStructure[T] { | |
var targetFn: T => List[String] = ((x:T)=> List("test")) | |
var validator :List[String] => Boolean = ((x:List[String])=> true) | |
var mytype: java.lang.Class[T] = null | |
var injection: Injection[T, Array[Byte]] = null | |
def setParams( targetFn: T => List[String], | |
validator: List[String] => Boolean, | |
mytype:java.lang.Class[T], | |
injection: Injection[T, Array[Byte]]) = { | |
this.targetFn = targetFn | |
this.validator = validator | |
this.mytype = mytype | |
this.injection = injection | |
} | |
override def isValidTarget(paths: String*): Boolean = validator(paths.toList) | |
override def getTarget(obj: T): JList[String] = targetFn(obj).toList.asJava | |
override def serialize(obj: T): Array[Byte] = injection.apply(obj) | |
override def deserialize(bytes: Array[Byte]): T = injection.invert(bytes).get | |
override val getType = mytype | |
} | |
WRITEJOB: | |
$ scald --hdfs-local PailTest2.scala --io write | |
READJOB: | |
$ scald --hdfs-local PailTest2.scala --io read | |
RESULT OF WRITE JOB: | |
$ tree pailtest/ | |
pailtest/ | |
├── abovefifty | |
│ ├── 0 | |
│ │ └── part-0000013.pailfile | |
│ ├── 1 | |
│ │ └── part-000007.pailfile | |
│ ├── 2 | |
│ │ └── part-000008.pailfile | |
│ ├── 3 | |
│ │ └── part-000009.pailfile | |
│ ├── 4 | |
│ │ └── part-0000010.pailfile | |
│ ├── 5 | |
│ │ └── part-0000011.pailfile | |
│ └── 6 | |
│ └── part-0000012.pailfile | |
├── belowfifty | |
│ ├── 0 | |
│ │ └── part-000006.pailfile | |
│ ├── 1 | |
│ │ └── part-000000.pailfile | |
│ ├── 2 | |
│ │ └── part-000001.pailfile | |
│ ├── 3 | |
│ │ └── part-000002.pailfile | |
│ ├── 4 | |
│ │ └── part-000003.pailfile | |
│ ├── 5 | |
│ │ └── part-000004.pailfile | |
│ └── 6 | |
│ └── part-000005.pailfile | |
└── pail.meta | |
RESULT OF READ JOB | |
$ ls pailoutput* | |
pailoutput0: | |
part-00000 | |
pailoutput1: | |
part-00000 | |
pailoutput10: | |
part-00000 | |
pailoutput11: | |
part-00000 | |
pailoutput12: | |
part-00000 | |
pailoutput13: | |
part-00000 | |
pailoutput2: | |
part-00000 | |
pailoutput3: | |
part-00000 | |
pailoutput4: | |
part-00000 | |
pailoutput5: | |
part-00000 | |
pailoutput6: | |
part-00000 | |
pailoutput7: | |
part-00000 | |
pailoutput8: | |
part-00000 | |
pailoutput9: | |
part-00000 | |
$ cat pailoutput*/* | |
7 | |
14 | |
21 | |
28 | |
35 | |
42 | |
49 | |
1 | |
8 | |
15 | |
22 | |
29 | |
36 | |
43 | |
52 | |
59 | |
66 | |
73 | |
80 | |
87 | |
94 | |
53 | |
60 | |
67 | |
74 | |
81 | |
88 | |
95 | |
54 | |
61 | |
68 | |
75 | |
82 | |
89 | |
96 | |
55 | |
62 | |
69 | |
76 | |
83 | |
90 | |
97 | |
2 | |
9 | |
16 | |
23 | |
30 | |
37 | |
44 | |
3 | |
10 | |
17 | |
24 | |
31 | |
38 | |
45 | |
4 | |
11 | |
18 | |
25 | |
32 | |
39 | |
46 | |
5 | |
12 | |
19 | |
26 | |
33 | |
40 | |
47 | |
6 | |
13 | |
20 | |
27 | |
34 | |
41 | |
48 | |
56 | |
63 | |
70 | |
77 | |
84 | |
91 | |
98 | |
50 | |
57 | |
64 | |
71 | |
78 | |
85 | |
92 | |
99 | |
51 | |
58 | |
65 | |
72 | |
79 | |
86 | |
93 | |
100 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment