Created
February 2, 2014 17:43
-
-
Save hsyed/8771986 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package edu.hsyed.nlp | |
import edu.hsyed.nlp.pgforumdb.forumstats.MyPostgresDriver | |
import scala.collection.JavaConverters._ | |
import scala.collection.JavaConversions._ | |
import forumavroschema._ | |
import MyPostgresDriver.simple._ | |
import Database.threadLocalSession | |
import edu.hsyed.nlp.pgforumdb.CrawlerModel.DB.{ForumPosts, ForumUsers, ForumTopics} | |
import edu.hsyed.nlp.pgforumdb.DBCrawler | |
import parquet.hadoop.{ParquetOutputFormat, ParquetInputFormat} | |
import parquet.avro._ | |
import org.apache.hadoop.fs.Path | |
import edu.hsyed.nlp.pgforumdb.CrawlerModel.Extractor.TopicCrawlState | |
import org.apache.spark.SparkContext | |
import SparkContext._ | |
import org.apache.hadoop.mapreduce.Job | |
import java.util.concurrent.atomic.AtomicInteger | |
import java.util.Properties | |
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP} | |
import edu.stanford.nlp.trees.TreeCoreAnnotations.TreeAnnotation | |
import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation | |
import org.apache.avro.specific.SpecificData | |
import edu.hsyed.nlp.pgforumdb.CrawlerModel.Extractor.ForumTopic | |
import com.esotericsoftware.kryo.Kryo | |
import org.apache.spark.serializer.KryoRegistrator | |
class MyRegistrator extends KryoRegistrator { | |
override def registerClasses(kryo: Kryo) { | |
kryo.register(classOf[User]) | |
kryo.register(classOf[Quote]) | |
kryo.register(classOf[Post]) | |
kryo.register(classOf[Topic]) | |
} | |
} | |
object genForum { | |
private def makeUser(forumId: Int, userId: Int): User = { | |
DBCrawler.db.withSession { | |
val fstartedBy = Query(ForumUsers).filter(y => y.forumId === forumId && y.id === userId).first() | |
new User(forumId, userId, fstartedBy.name.getOrElse(""), fstartedBy.joinDate.map(_.toString).getOrElse("")) | |
} | |
} | |
def genForum { | |
class MyWriter extends AvroParquetWriter[Topic](new Path("posts.parq"), Topic.getClassSchema) { | |
override def write(t: Topic) { | |
synchronized { | |
super.write(t) | |
} | |
} | |
} | |
def makeTopic(x: ForumTopic): Topic = { | |
val topic = new Topic() | |
topic.setForumId(x.forumId) | |
topic.setId(x.id) | |
topic.setCategoryId(x.categoryId) | |
topic.setLastPostAt(x.lastPostAt.map(_.toString).getOrElse("")) | |
topic.setIsSticky(x.isSticky) | |
topic.setIsPoll(x.isPoll) | |
topic.setHasPrefix(x.hasPrefix) | |
topic.setTitle(x.title) | |
topic.setUrl("") | |
topic.setPostCount(x.postCount) | |
topic.setViewCount(x.viewCount) | |
topic.setStartedBy(makeUser(x.forumId, x.startedBy)) | |
topic.setStartedAt(x.startedAt.map(_.toString).getOrElse("")) | |
val qpst = | |
DBCrawler.db.withSession { | |
Query(ForumPosts).filter(xx => { | |
xx.forumId === x.forumId && xx.topicId === x.id | |
}).list() | |
} | |
val pst = qpst.map(xxx => { | |
val p = new Post() | |
p.setForumId(xxx.forumId) | |
p.setId(xxx.id) | |
p.setTopicId(xxx.topicId) | |
p.setCategoryId(xxx.categoryId) | |
p.setIndex(xxx.index) | |
p.setPostPage(xxx.postPage) | |
p.setPostBy(makeUser(xxx.forumId, xxx.userId)) | |
p.setPostedAt(xxx.postedAt.toString) | |
p.setPostText(xxx.postText) | |
if (xxx.quotes.nonEmpty) p.setQuotes(xxx.quotes.map(z => { | |
new Quote(z.index, z.startOffset, z.endOffset, z.isDirect, z.quotedId) | |
}).asJava) | |
p | |
}).asJava | |
topic.setPosts(pst) | |
topic | |
} | |
val writer = new MyWriter | |
//val writer = new AvroParquetWriter[Topic](new Path("posts.parq"), Topic.getClassSchema, CompressionCodecName.SNAPPY, ParquetWriter.DEFAULT_BLOCK_SIZE / 2, ParquetWriter.DEFAULT_PAGE_SIZE) | |
val q = | |
DBCrawler.db.withSession { | |
Query(ForumTopics).filter(x => x.crawlState === TopicCrawlState.Done).list() | |
} | |
val sz = q.size | |
val c = new AtomicInteger(0) | |
q.par.foreach { | |
x => | |
writer.write(makeTopic(x)) | |
val count = c.incrementAndGet() | |
print(f"\r${count.toFloat * 100 / sz}%4.2f%%") | |
} | |
writer.close() | |
} | |
object MyAnnotator extends ThreadLocal[StanfordCoreNLP] { | |
val props = new Properties() | |
props.put("annotators", "tokenize,ssplit,pos,lemma,parse") | |
override def initialValue = new StanfordCoreNLP(props) | |
} | |
def localNLPTransformation { | |
//new AvroParquetReader[Topic]() | |
} | |
def sparkNLPTransformation() { | |
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
System.setProperty("spark.kryo.registrator", "edu.hsyed.nlp.MyRegistrator") | |
val sc = new SparkContext("local[8]", "forumAddNlp") | |
// io configuration | |
val job = new Job() | |
ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[Topic]]) | |
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport]) | |
AvroParquetOutputFormat.setSchema(job, Topic.getClassSchema) | |
// configure annotator | |
val props = new Properties() | |
props.put("annotators", "tokenize,ssplit,pos,lemma,parse") | |
// annotator function | |
def annotatePosts(top: Topic): Topic = { | |
val new_p = top.getPosts.map { | |
x => | |
val at = new Annotation(x.getPostText.toString) | |
MyAnnotator.get.annotate(at) | |
val t = at.get(classOf[SentencesAnnotation]).map(_.get(classOf[TreeAnnotation])).toList | |
val r = SpecificData.get().deepCopy[Post](x.getSchema, x) | |
if (t.nonEmpty) r.setTrees(t) | |
r | |
} | |
val new_t = SpecificData.get().deepCopy[Topic](top.getSchema, top) | |
new_t.setPosts(new_p) | |
new_t | |
} | |
// transformation | |
val ds = sc.newAPIHadoopFile("forum_dataset.parq", classOf[ParquetInputFormat[Topic]], classOf[Void], classOf[Topic], job.getConfiguration).repartition(16) | |
val new_ds = ds.map(x => (null, annotatePosts(x._2))) | |
// new_ds.foreach(println(_)) | |
new_ds.saveAsNewAPIHadoopFile("annotated_posts.parq", | |
classOf[Void], | |
classOf[Topic], | |
classOf[ParquetOutputFormat[Topic]], | |
job.getConfiguration | |
) | |
} | |
} | |
object Start extends App { | |
genForum.sparkNLPTransformation() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment