Created
July 17, 2015 12:15
-
-
Save senz/9b957471a257efc61afe to your computer and use it in GitHub Desktop.
Mixpanel API supporting extension and sending ip address
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mixpanel.mixpanelapi | |
import java.io._ | |
import java.net.{URLEncoder, URLConnection, URL, InetAddress} | |
import scala.collection.JavaConverters._ | |
import org.json.{JSONArray, JSONObject} | |
class ExtendedMixpanelAPI(protected final val eventsEndpoint: String, protected final val peopleEndpoint: String) { | |
/** | |
* Constructs a MixpanelAPI object associated with the production, Mixpanel services. | |
*/ | |
def this() { | |
this(Config.BASE_ENDPOINT + "/track", Config.BASE_ENDPOINT + "/engage") | |
} | |
/** | |
* Sends a single message to Mixpanel servers. | |
* | |
* Each call to sendMessage results in a blocking call to remote Mixpanel servers. | |
* To send multiple messages at once, see #{@link #deliver(ClientDelivery)} | |
* | |
* @param message A JSONObject formatted by #{ @link MessageBuilder} | |
* @throws MixpanelMessageException if the given JSONObject is not (apparently) a Mixpanel message. This is a RuntimeException, callers should take care to submit only correctly formatted messages. | |
* @throws IOException if | |
*/ | |
@throws(classOf[MixpanelMessageException]) | |
@throws(classOf[IOException]) | |
def sendMessage(message: JSONObject) { | |
val delivery: ClientDelivery = new ClientDelivery | |
delivery.addMessage(message) | |
deliver(delivery) | |
} | |
/** | |
* Attempts to send a given delivery to the Mixpanel servers. Will block, | |
* possibly on multiple server requests. For most applications, this method | |
* should be called in a separate thread or in a queue consumer. | |
* | |
* @param toSend a ClientDelivery containing a number of Mixpanel messages | |
* @param ipAddress an ip address to be used in event geo localization | |
* @throws IOException | |
* @see ClientDelivery | |
*/ | |
@throws(classOf[IOException]) | |
def deliver(toSend: ClientDelivery, ipAddress: IpAddressParam = IpAddressParam.IgnoreIp) { | |
import scala.collection.JavaConverters._ | |
val ip = "ip=" + (ipAddress match { | |
case IpAddressParam.IgnoreIp => "0" | |
case IpAddressParam.UseSenderIp => "1" | |
case IpAddressParam.SpecifiedIp(ipAddr) => ipAddr.getHostAddress | |
}) | |
val eventsUrl = eventsEndpoint + "?" + ip | |
val events = toSend.getEventsMessages.asScala.toList | |
sendMessages(events, eventsUrl) | |
val peopleUrl = peopleEndpoint + "?" + ip | |
val people = toSend.getPeopleMessages.asScala.toList | |
sendMessages(people, peopleUrl) | |
} | |
/** | |
* Package scope for mocking purposes | |
*/ | |
@throws(classOf[IOException]) | |
private[mixpanelapi] def sendData(dataString: String, endpointUrl: String): Boolean = { | |
val endpoint: URL = new URL(endpointUrl) | |
val conn: URLConnection = endpoint.openConnection | |
conn.setReadTimeout(ReadTimeoutMs) | |
conn.setDoOutput(true) | |
conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=utf8") | |
var utf8data: Array[Byte] = null | |
try { | |
utf8data = dataString.getBytes("utf-8") | |
} | |
catch { | |
case e: UnsupportedEncodingException => { | |
throw new RuntimeException("Mixpanel library requires utf-8 support", e) | |
} | |
} | |
val base64data: String = new String(Base64Coder.encode(utf8data)) | |
val encodedData: String = URLEncoder.encode(base64data, "utf8") | |
val encodedQuery: String = "data=" + encodedData | |
var postStream: OutputStream = null | |
try { | |
postStream = conn.getOutputStream | |
postStream.write(encodedQuery.getBytes) | |
} finally { | |
if (postStream != null) { | |
try { | |
postStream.close() | |
} | |
catch { | |
case e: IOException => | |
} | |
} | |
} | |
var responseStream: InputStream = null | |
var response: String = null | |
try { | |
responseStream = conn.getInputStream | |
response = slurp(responseStream) | |
} finally { | |
if (responseStream != null) { | |
try { | |
responseStream.close() | |
} | |
catch { | |
case e: IOException => | |
} | |
} | |
} | |
(response != null) && (response == "1") | |
} | |
@throws(classOf[IOException]) | |
protected def sendMessages(messages: List[JSONObject], endpointUrl: String) { | |
{ | |
var i: Int = 0 | |
while (i < messages.size) { | |
{ | |
var endIndex: Int = i + Config.MAX_MESSAGE_SIZE | |
endIndex = Math.min(endIndex, messages.size) | |
val batch = messages.slice(i, endIndex) | |
if (batch.nonEmpty) { | |
val messagesString: String = dataString(batch) | |
if (!sendData(messagesString, endpointUrl)) { | |
throw new MixpanelServerException("Server refused to accept messages, they may be malformed.", batch.asJava) | |
} | |
} | |
} | |
i += Config.MAX_MESSAGE_SIZE | |
} | |
} | |
} | |
private def dataString(messages: List[JSONObject]): String = { | |
val array = new JSONArray | |
messages.foreach(array.put) | |
array.toString | |
} | |
@throws(classOf[IOException]) | |
private def slurp(in: InputStream): String = { | |
val out = new StringBuilder | |
val reader = new InputStreamReader(in, "utf8") | |
val readBuffer = new Array[Char](BufferSize) | |
var readCount = 0 | |
do { | |
readCount = reader.read(readBuffer) | |
if (readCount > 0) { | |
out.append(readBuffer, 0, readCount) | |
} | |
} while (readCount != -1) | |
out.toString() | |
} | |
protected val BufferSize: Int = 256 | |
protected val ReadTimeoutMs: Int = 120000 | |
} | |
sealed trait IpAddressParam | |
object IpAddressParam { | |
case object IgnoreIp extends IpAddressParam | |
case object UseSenderIp extends IpAddressParam | |
case class SpecifiedIp(ip: InetAddress) extends IpAddressParam | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment