Skip to content

Instantly share code, notes, and snippets.

@krishnanraman
Last active December 15, 2015 05:29
Show Gist options
  • Save krishnanraman/5209602 to your computer and use it in GitHub Desktop.
Save krishnanraman/5209602 to your computer and use it in GitHub Desktop.
Pail example : writejob - a job to create Pails, readjob - a job to read Pails that have been created.
import com.backtype.hadoop.pail.PailStructure
import java.util.{ List => JList }
import scala.collection.JavaConverters._
import com.twitter.scalding._
import com.twitter.scalding.commons.source.{PailSource,CodecPailStructure}
import com.twitter.bijection.{NumericInjections, Injection}
class PailTest2Write(args : Args) extends Job(args) {
args("io") match {
case "read" => readjob
case "write" => writejob
}
def mkPailStructure = {
val func = ((obj:Int) => if( obj < 50) List("./belowfifty/" + (obj % 7)) else List("./abovefifty/" + (obj % 7)))
val validator = ((x:List[String])=>true)
val mytype = classOf[Int]
val myinjection = new NumericInjections{}.int2BigEndian
val cps = new MyCodecPailStructure[Int]
cps.setParams(func, validator, mytype, myinjection)
cps
}
def readjob = {
val cps = mkPailStructure
val paths = List("belowfifty", "abovefifty").map( x=> (0 to 6).toList.map( y=> x+ "/" + y)).flatten
paths.foreach( k => println( k.mkString ))
val src = paths.map( path => PailSource.source[Int]( "pailtest", cps, Array(List(path))))
src.zipWithIndex.foreach( s => s._1.read.write( new Tsv("pailoutput"+s._2)))
}
def writejob = {
val cps = mkPailStructure
val pipe = IterableSource((1 to 100), "src").read
val sink = PailSource.sink[Int]( "pailtest", cps)
pipe.write(sink)
}
}
class MyCodecPailStructure[T] extends PailStructure[T] {
var targetFn: T => List[String] = ((x:T)=> List("test"))
var validator :List[String] => Boolean = ((x:List[String])=> true)
var mytype: java.lang.Class[T] = null
var injection: Injection[T, Array[Byte]] = null
def setParams( targetFn: T => List[String],
validator: List[String] => Boolean,
mytype:java.lang.Class[T],
injection: Injection[T, Array[Byte]]) = {
this.targetFn = targetFn
this.validator = validator
this.mytype = mytype
this.injection = injection
}
override def isValidTarget(paths: String*): Boolean = validator(paths.toList)
override def getTarget(obj: T): JList[String] = targetFn(obj).toList.asJava
override def serialize(obj: T): Array[Byte] = injection.apply(obj)
override def deserialize(bytes: Array[Byte]): T = injection.invert(bytes).get
override val getType = mytype
}
WRITEJOB:
$ scald --hdfs-local PailTest2.scala --io write
READJOB:
$ scald --hdfs-local PailTest2.scala --io read
RESULT OF WRITE JOB:
$ tree pailtest/
pailtest/
├── abovefifty
│   ├── 0
│   │   └── part-0000013.pailfile
│   ├── 1
│   │   └── part-000007.pailfile
│   ├── 2
│   │   └── part-000008.pailfile
│   ├── 3
│   │   └── part-000009.pailfile
│   ├── 4
│   │   └── part-0000010.pailfile
│   ├── 5
│   │   └── part-0000011.pailfile
│   └── 6
│   └── part-0000012.pailfile
├── belowfifty
│   ├── 0
│   │   └── part-000006.pailfile
│   ├── 1
│   │   └── part-000000.pailfile
│   ├── 2
│   │   └── part-000001.pailfile
│   ├── 3
│   │   └── part-000002.pailfile
│   ├── 4
│   │   └── part-000003.pailfile
│   ├── 5
│   │   └── part-000004.pailfile
│   └── 6
│   └── part-000005.pailfile
└── pail.meta
RESULT OF READ JOB
$ ls pailoutput*
pailoutput0:
part-00000
pailoutput1:
part-00000
pailoutput10:
part-00000
pailoutput11:
part-00000
pailoutput12:
part-00000
pailoutput13:
part-00000
pailoutput2:
part-00000
pailoutput3:
part-00000
pailoutput4:
part-00000
pailoutput5:
part-00000
pailoutput6:
part-00000
pailoutput7:
part-00000
pailoutput8:
part-00000
pailoutput9:
part-00000
$ cat pailoutput*/*
7
14
21
28
35
42
49
1
8
15
22
29
36
43
52
59
66
73
80
87
94
53
60
67
74
81
88
95
54
61
68
75
82
89
96
55
62
69
76
83
90
97
2
9
16
23
30
37
44
3
10
17
24
31
38
45
4
11
18
25
32
39
46
5
12
19
26
33
40
47
6
13
20
27
34
41
48
56
63
70
77
84
91
98
50
57
64
71
78
85
92
99
51
58
65
72
79
86
93
100
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment