Created
August 18, 2020 13:51
-
-
Save Partyschaum/ad03a973dd689d7b6f7320ce4015a9f9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class THttpClientOfferingRawRequest : TTransport { | |
private var url: URL? = null | |
private val requestBuffer = ByteArrayOutputStream() | |
private var inputStream: InputStream? = null | |
private var connectTimeout = 0 | |
private var readTimeout = 0 | |
private var customHeaders: MutableMap<String, String>? = null | |
private var host: HttpHost? = null | |
private var client: HttpClient? = null | |
lateinit var data: ByteArray | |
fun rawRequest(): String = String(data) | |
class Factory : TTransportFactory { | |
private val url: String | |
private val client: HttpClient? | |
constructor(url: String) { | |
this.url = url | |
this.client = null | |
} | |
constructor(url: String, client: HttpClient?) { | |
this.url = url | |
this.client = client | |
} | |
override fun getTransport(trans: TTransport): TTransport? { | |
return try { | |
if (null != client) { | |
org.apache.thrift.transport.THttpClient(url, client) | |
} else { | |
org.apache.thrift.transport.THttpClient(url) | |
} | |
} catch (tte: TTransportException) { | |
null | |
} | |
} | |
} | |
constructor(url: String?) { | |
try { | |
this.url = URL(url) | |
this.client = null | |
host = null | |
} catch (iox: IOException) { | |
throw TTransportException(iox) | |
} | |
} | |
constructor(url: String?, client: HttpClient?) { | |
try { | |
this.url = URL(url) | |
this.client = client | |
host = HttpHost( | |
this.url!!.host, | |
if (-1 == this.url!!.port) this.url!!.defaultPort else this.url!!.port, | |
this.url!!.protocol | |
) | |
} catch (iox: IOException) { | |
throw TTransportException(iox) | |
} | |
} | |
fun setConnectTimeout(timeout: Int) { | |
connectTimeout = timeout | |
// WARNING, this modifies the HttpClient params, this might have an impact elsewhere if the | |
// same HttpClient is used for something else. | |
this.client?.params?.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, connectTimeout) | |
} | |
fun setReadTimeout(timeout: Int) { | |
readTimeout = timeout | |
client?.params?.setParameter(CoreConnectionPNames.SO_TIMEOUT, readTimeout) | |
} | |
fun setCustomHeaders(headers: MutableMap<String, String>?) { | |
customHeaders = headers | |
} | |
fun setCustomHeader(key: String, value: String) { | |
if (customHeaders == null) { | |
customHeaders = HashMap() | |
} | |
customHeaders!![key] = value | |
} | |
override fun open() {} | |
override fun close() { | |
if (null != inputStream) { | |
try { | |
inputStream!!.close() | |
} catch (ioe: IOException) { | |
} | |
inputStream = null | |
} | |
} | |
override fun isOpen(): Boolean { | |
return true | |
} | |
@Throws(TTransportException::class) | |
override fun read(buf: ByteArray, off: Int, len: Int): Int { | |
if (inputStream == null) { | |
throw TTransportException("Response buffer is empty, no request.") | |
} | |
return try { | |
val ret = inputStream!!.read(buf, off, len) | |
if (ret == -1) { | |
throw TTransportException("No more data available.") | |
} | |
ret | |
} catch (iox: IOException) { | |
throw TTransportException(iox) | |
} | |
} | |
override fun write(buf: ByteArray, off: Int, len: Int) { | |
requestBuffer.write(buf, off, len) | |
} | |
@Throws(TTransportException::class) | |
private fun flushUsingHttpClient() { | |
if (null == this.client) { | |
throw TTransportException("Null HttpClient, aborting.") | |
} | |
// Extract request and reset buffer | |
val data = requestBuffer.toByteArray() | |
requestBuffer.reset() | |
var post: HttpPost? = null | |
var `is`: InputStream? = null | |
try { | |
// Set request to path + query string | |
post = HttpPost(url!!.file) | |
// | |
// Headers are added to the HttpPost instance, not | |
// to HttpClient. | |
// | |
post.setHeader("Content-Type", "application/x-thrift") | |
post.setHeader("Accept", "application/x-thrift") | |
post.setHeader("User-Agent", "Java/THttpClient/HC") | |
if (null != customHeaders) { | |
for ((key, value) in customHeaders!!) { | |
post.setHeader(key, value) | |
} | |
} | |
post.entity = ByteArrayEntity(data) | |
val response = this.client!!.execute(host, post) | |
val responseCode = response.statusLine.statusCode | |
// | |
// Retrieve the inputstream BEFORE checking the status code so | |
// resources get freed in the finally clause. | |
// | |
`is` = response.entity.content | |
if (responseCode != HttpStatus.SC_OK) { | |
throw TTransportException("HTTP Response code: $responseCode") | |
} | |
// Read the responses into a byte array so we can release the connection | |
// early. This implies that the whole content will have to be read in | |
// memory, and that momentarily we might use up twice the memory (while the | |
// thrift struct is being read up the chain). | |
// Proceeding differently might lead to exhaustion of connections and thus | |
// to app failure. | |
val buf = ByteArray(1024) | |
val baos = ByteArrayOutputStream() | |
var len = 0 | |
do { | |
len = `is`.read(buf) | |
if (len > 0) { | |
baos.write(buf, 0, len) | |
} | |
} while (-1 != len) | |
try { | |
// Indicate we're done with the content. | |
consume(response.entity) | |
} catch (ioe: IOException) { | |
// We ignore this exception, it might only mean the server has no | |
// keep-alive capability. | |
} | |
inputStream = ByteArrayInputStream(baos.toByteArray()) | |
} catch (ioe: IOException) { | |
// Abort method so the connection gets released back to the connection manager | |
post?.abort() | |
throw TTransportException(ioe) | |
} finally { | |
if (null != `is`) { | |
// Close the entity's input stream, this will release the underlying connection | |
try { | |
`is`.close() | |
} catch (ioe: IOException) { | |
throw TTransportException(ioe) | |
} | |
} | |
post?.releaseConnection() | |
} | |
} | |
@Throws(TTransportException::class) | |
override fun flush() { | |
if (null != this.client) { | |
flushUsingHttpClient() | |
return | |
} | |
// Extract request and reset buffer | |
data = requestBuffer.toByteArray() | |
requestBuffer.reset() | |
try { | |
// Create connection object | |
val connection = url!!.openConnection() as HttpURLConnection | |
// Timeouts, only if explicitly set | |
if (connectTimeout > 0) { | |
connection.connectTimeout = connectTimeout | |
} | |
if (readTimeout > 0) { | |
connection.readTimeout = readTimeout | |
} | |
// Make the request | |
connection.requestMethod = "POST" | |
connection.setRequestProperty("Content-Type", "application/x-thrift") | |
connection.setRequestProperty("Accept", "application/x-thrift") | |
connection.setRequestProperty("User-Agent", "Java/THttpClient") | |
if (customHeaders != null) { | |
for ((key, value) in customHeaders!!) { | |
connection.setRequestProperty(key, value) | |
} | |
} | |
connection.doOutput = true | |
connection.connect() | |
connection.outputStream.write(data) | |
val responseCode = connection.responseCode | |
if (responseCode != HttpURLConnection.HTTP_OK) { | |
throw TTransportException("HTTP Response code: $responseCode") | |
} | |
// Read the responses | |
inputStream = connection.inputStream | |
} catch (iox: IOException) { | |
throw TTransportException(iox) | |
} | |
} | |
companion object { | |
/** | |
* copy from org.apache.http.util.EntityUtils#consume. Android has it's own httpcore | |
* that doesn't have a consume. | |
*/ | |
@Throws(IOException::class) | |
private fun consume(entity: HttpEntity?) { | |
if (entity == null) { | |
return | |
} | |
if (entity.isStreaming) { | |
val instream = entity.content | |
instream?.close() | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment