Created
September 8, 2015 18:23
-
-
Save marmbrus/50da3d78eaa1acbd7bee to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.databricks.spark.jira | |
import scala.io.Source | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.sql._ | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.sources.{TableScan, BaseRelation, RelationProvider} | |
import java.nio.charset.Charset | |
import com.fasterxml.jackson.databind._ | |
import com.fasterxml.jackson.module.scala.DefaultScalaModule | |
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper | |
import org.apache.commons.codec.binary.Base64 | |
import org.apache.http.client.methods._ | |
import org.apache.http.entity.StringEntity | |
import org.apache.http.impl.client.DefaultHttpClient | |
/** | |
* A simple Spark SQL Data Source for JIRA. | |
*/ | |
class DefaultSource extends RelationProvider { | |
override def createRelation( | |
sqlContext: SQLContext, | |
parameters: Map[String, String]): BaseRelation = { | |
JiraRelation( | |
parameters("url"), | |
parameters("user"), | |
parameters("password"), | |
parameters.getOrElse("query", ""))(sqlContext) | |
} | |
} | |
case class JiraRelation(url: String, user: String, password: String, query: String)( | |
@transient val sqlContext: SQLContext) | |
extends BaseRelation with TableScan { | |
import sqlContext.implicits._ | |
val client = new JiraRestClient(url, user, password) | |
val baseData = { | |
var startAt = 0L | |
val rdds = new scala.collection.mutable.ArrayBuffer[RDD[String]] | |
def getNext = { | |
val curRDD = sqlContext.sparkContext.parallelize(client.getIssues(startAt, query) :: Nil) | |
val curDF = sqlContext.jsonRDD(curRDD) | |
val issues = curDF.select(explode($"issues")) | |
startAt += issues.count() | |
(curRDD, issues.count()) | |
} | |
var (curRDD, count) = getNext | |
while (count > 0) { | |
println(startAt) // NOLINT | |
rdds += curRDD | |
val x = getNext | |
curRDD = x._1 | |
count = x._2 | |
} | |
val allDFs = sqlContext.jsonRDD(rdds.reduceLeft(_ union _)) | |
allDFs.printSchema() | |
allDFs.select(explode('issues).as('issue)) | |
} | |
def schema = baseData.schema | |
def buildScan() = baseData.rdd | |
} | |
case class JiraIssue(key: String, summary: String) | |
/** | |
* A homebrewed JIRA client | |
* @param user Username to authenticate with | |
* @param password to authenticate with | |
*/ | |
class JiraRestClient(url: String, user: String, password: String) { | |
trait JiraQuery { | |
def toJson = mapper.writeValueAsString(this) | |
} | |
case class JqlQuery( | |
jql: String, | |
startAt: Int = 0, | |
maxResults: Int = 10, | |
validateQuery: Boolean = false, | |
fields: List[String] = List.empty) extends JiraQuery | |
private val mapper = new ObjectMapper with ScalaObjectMapper { | |
registerModule(DefaultScalaModule) | |
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) | |
} | |
/** | |
* Given a JQL query, extract the flaky tests from the Affected Tests field | |
* @param jql The query to run | |
*/ | |
def getIssues(startAt: Long, query: String): String = { | |
val post = new HttpPost(url) | |
post.setEntity(new StringEntity(JqlQuery(query, startAt = startAt.toInt).toJson)) | |
val json = getRestContent(post) | |
json | |
} | |
private val base64Auth = | |
Base64.encodeBase64String(s"$user:$password".getBytes(Charset.forName("UTF-8"))) | |
private def getContent(url: String): String = { | |
val request = new HttpGet(url) | |
getRestContent(request) | |
} | |
private def getRestContent(request: HttpUriRequest): String = { | |
request.setHeader("Authorization", s"Basic $base64Auth") | |
request.setHeader("Content-Type", "application/json") | |
val httpClient = new DefaultHttpClient() | |
val httpResponse = httpClient.execute(request) | |
val content = Option(httpResponse.getEntity()).map { entity => | |
val inputStream = entity.getContent() | |
try { | |
Source.fromInputStream(inputStream).getLines.mkString | |
} finally { | |
inputStream.close() | |
httpClient.getConnectionManager().shutdown() | |
} | |
}.getOrElse("") | |
return content | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment