Skip to content

Instantly share code, notes, and snippets.

@tsaeki-7bk
Last active August 13, 2019 05:40
Show Gist options
  • Save tsaeki-7bk/57c0d8ea41418d16b187467c5cb1c8bf to your computer and use it in GitHub Desktop.
Save tsaeki-7bk/57c0d8ea41418d16b187467c5cb1c8bf to your computer and use it in GitHub Desktop.
Queue Storageの情報からBlob Storageのファイルをダウンロードする
dependencies {
(snip)
implementation("com.microsoft.rest.v2:client-runtime:2.0.0")
implementation("com.microsoft.azure:azure-storage-blob:11.0.1")
implementation("com.microsoft.azure:azure-storage-queue:10.0.1-Preview")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.+")
(snip)
}
/*
* This Kotlin source file was generated by the Gradle 'init' task.
*/
package adaptor
import java.net.URL
import java.util.Base64
import com.fasterxml.jackson.module.kotlin.*
import java.io.File;
import java.io.FileWriter;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
// Queue
import com.microsoft.azure.storage.queue.SharedKeyCredentials as QueueSharedKeyCredentials
import com.microsoft.azure.storage.queue.StorageURL as QueueStorageURL
import com.microsoft.azure.storage.queue.ServiceURL as QueueServiceURL
import com.microsoft.azure.storage.queue.PipelineOptions as QueuePipelineOptions
import com.microsoft.azure.storage.queue.MessagesURL as QueueMessagesURL
import com.microsoft.azure.storage.queue.models.*
// Blob
import com.microsoft.azure.storage.blob.PipelineOptions as BlobPipelineOptions
import com.microsoft.azure.storage.blob.ServiceURL as BlobServiceURL
import com.microsoft.azure.storage.blob.SharedKeyCredentials as BlobSharedKeyCredentials
import com.microsoft.azure.storage.blob.StorageURL as BlobStorageURL
import com.microsoft.azure.storage.blob.TransferManager as BlobTransferManager
import com.microsoft.azure.storage.blob.BlockBlobURL as BlobBlockBlobURL
data class queueMessage (
val topic: String,
val subject: String,
val eventType: String,
val eventTime: String,
val id: String,
val data: queueuMessageData,
val dataVersion: String,
val metadataVersion: String
)
data class queueuMessageData (
val api: String,
val clientRequestId: String,
val requestId: String,
val eTag: String,
val contentType: String,
val contentLength: String,
val blobType: String,
val url: String,
val sequencer: String,
val storageDiagnostics: StorageDiagnostics
)
data class StorageDiagnostics (
val batchId: String
)
class BlobStorage(accessName: String, accessKey: String) {
val bCredentials = BlobSharedKeyCredentials(accessName, accessKey)
var bServiceURL: BlobServiceURL
init {
bServiceURL = BlobServiceURL(
URL("https://$accessName.blob.core.windows.net"),
BlobStorageURL.createPipeline(bCredentials, BlobPipelineOptions())
)
}
/*
donwload a file from Blob Storage
*/
fun downloadBlob(url: URL) {
val containerURL = bServiceURL.createContainerURL("test-eventgrid-blob")
val fileName = Paths.get(url.getPath()).getFileName().toString()
val blobURL = containerURL.createBlockBlobURL(fileName)
var file = File("/Users/tsaeki/Develop/isid/adaptor/logs/$fileName")
var fileChannel = AsynchronousFileChannel.open(file.toPath(),StandardOpenOption.CREATE, StandardOpenOption.WRITE)
BlobTransferManager.downloadBlobToFile(fileChannel, blobURL, null, null).blockingGet()
}
}
/*
decode base64
*/
fun decodeBase64(encodeString: String): String {
var actualByte = Base64.getDecoder().decode(encodeString)
return String(actualByte)
}
/*
delete a queue message
*/
fun deleteMessage(messageURL: QueueMessagesURL, messageId: String, popReceipt: String) {
println("delete message. message id: $messageId, popReceipt: $popReceipt")
messageURL.createMessageIdUrl(messageId).delete(popReceipt).blockingGet()
}
fun main(args: Array<String>) {
println("start ")
val accessName: String = System.getenv("AZURE_STORAGE_ACCOUNT")
val accessKey: String = System.getenv("AZURE_STORAGE_ACCESS_KEY")
val qCredentials = QueueSharedKeyCredentials(accessName, accessKey)
val qServiceURL = QueueServiceURL(
URL("https://$accessName.queue.core.windows.net"),
QueueStorageURL.createPipeline(qCredentials, QueuePipelineOptions())
)
val queueURL = qServiceURL.createQueueUrl("test-eventgrid-queue")
val queueCreateSingle = queueURL.create()
val messageURL = queueURL.createMessagesUrl()
queueCreateSingle
.flatMap {
response ->
messageURL.dequeue(32, 5)
}
.doOnSuccess {
messageDequeueResponse ->
val mapper = jacksonObjectMapper()
for(item in messageDequeueResponse.body()){
val message = mapper.readValue<queueMessage>(decodeBase64(item.messageText()))
// download a file from Blob Storage
val blob = BlobStorage(accessName, accessKey)
blob.downloadBlob(URL(message.data.url))
// delete a message
deleteMessage(messageURL, item.messageId(), item.popReceipt())
}
}
.blockingGet()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment