Last active
August 13, 2019 05:40
-
-
Save tsaeki-7bk/57c0d8ea41418d16b187467c5cb1c8bf to your computer and use it in GitHub Desktop.
Queue Storageの情報からBlob Storageのファイルをダウンロードする
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
dependencies { | |
(snip) | |
implementation("com.microsoft.rest.v2:client-runtime:2.0.0") | |
implementation("com.microsoft.azure:azure-storage-blob:11.0.1") | |
implementation("com.microsoft.azure:azure-storage-queue:10.0.1-Preview") | |
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.+") | |
(snip) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* This Kotlin source file was generated by the Gradle 'init' task. | |
*/ | |
package adaptor | |
import java.net.URL | |
import java.util.Base64 | |
import com.fasterxml.jackson.module.kotlin.* | |
import java.io.File; | |
import java.io.FileWriter; | |
import java.nio.channels.AsynchronousFileChannel; | |
import java.nio.channels.FileChannel; | |
import java.nio.file.Paths; | |
import java.nio.file.StandardOpenOption; | |
// Queue | |
import com.microsoft.azure.storage.queue.SharedKeyCredentials as QueueSharedKeyCredentials | |
import com.microsoft.azure.storage.queue.StorageURL as QueueStorageURL | |
import com.microsoft.azure.storage.queue.ServiceURL as QueueServiceURL | |
import com.microsoft.azure.storage.queue.PipelineOptions as QueuePipelineOptions | |
import com.microsoft.azure.storage.queue.MessagesURL as QueueMessagesURL | |
import com.microsoft.azure.storage.queue.models.* | |
// Blob | |
import com.microsoft.azure.storage.blob.PipelineOptions as BlobPipelineOptions | |
import com.microsoft.azure.storage.blob.ServiceURL as BlobServiceURL | |
import com.microsoft.azure.storage.blob.SharedKeyCredentials as BlobSharedKeyCredentials | |
import com.microsoft.azure.storage.blob.StorageURL as BlobStorageURL | |
import com.microsoft.azure.storage.blob.TransferManager as BlobTransferManager | |
import com.microsoft.azure.storage.blob.BlockBlobURL as BlobBlockBlobURL | |
data class queueMessage ( | |
val topic: String, | |
val subject: String, | |
val eventType: String, | |
val eventTime: String, | |
val id: String, | |
val data: queueuMessageData, | |
val dataVersion: String, | |
val metadataVersion: String | |
) | |
data class queueuMessageData ( | |
val api: String, | |
val clientRequestId: String, | |
val requestId: String, | |
val eTag: String, | |
val contentType: String, | |
val contentLength: String, | |
val blobType: String, | |
val url: String, | |
val sequencer: String, | |
val storageDiagnostics: StorageDiagnostics | |
) | |
data class StorageDiagnostics ( | |
val batchId: String | |
) | |
class BlobStorage(accessName: String, accessKey: String) { | |
val bCredentials = BlobSharedKeyCredentials(accessName, accessKey) | |
var bServiceURL: BlobServiceURL | |
init { | |
bServiceURL = BlobServiceURL( | |
URL("https://$accessName.blob.core.windows.net"), | |
BlobStorageURL.createPipeline(bCredentials, BlobPipelineOptions()) | |
) | |
} | |
/* | |
donwload a file from Blob Storage | |
*/ | |
fun downloadBlob(url: URL) { | |
val containerURL = bServiceURL.createContainerURL("test-eventgrid-blob") | |
val fileName = Paths.get(url.getPath()).getFileName().toString() | |
val blobURL = containerURL.createBlockBlobURL(fileName) | |
var file = File("/Users/tsaeki/Develop/isid/adaptor/logs/$fileName") | |
var fileChannel = AsynchronousFileChannel.open(file.toPath(),StandardOpenOption.CREATE, StandardOpenOption.WRITE) | |
BlobTransferManager.downloadBlobToFile(fileChannel, blobURL, null, null).blockingGet() | |
} | |
} | |
/* | |
decode base64 | |
*/ | |
fun decodeBase64(encodeString: String): String { | |
var actualByte = Base64.getDecoder().decode(encodeString) | |
return String(actualByte) | |
} | |
/* | |
delete a queue message | |
*/ | |
fun deleteMessage(messageURL: QueueMessagesURL, messageId: String, popReceipt: String) { | |
println("delete message. message id: $messageId, popReceipt: $popReceipt") | |
messageURL.createMessageIdUrl(messageId).delete(popReceipt).blockingGet() | |
} | |
fun main(args: Array<String>) { | |
println("start ") | |
val accessName: String = System.getenv("AZURE_STORAGE_ACCOUNT") | |
val accessKey: String = System.getenv("AZURE_STORAGE_ACCESS_KEY") | |
val qCredentials = QueueSharedKeyCredentials(accessName, accessKey) | |
val qServiceURL = QueueServiceURL( | |
URL("https://$accessName.queue.core.windows.net"), | |
QueueStorageURL.createPipeline(qCredentials, QueuePipelineOptions()) | |
) | |
val queueURL = qServiceURL.createQueueUrl("test-eventgrid-queue") | |
val queueCreateSingle = queueURL.create() | |
val messageURL = queueURL.createMessagesUrl() | |
queueCreateSingle | |
.flatMap { | |
response -> | |
messageURL.dequeue(32, 5) | |
} | |
.doOnSuccess { | |
messageDequeueResponse -> | |
val mapper = jacksonObjectMapper() | |
for(item in messageDequeueResponse.body()){ | |
val message = mapper.readValue<queueMessage>(decodeBase64(item.messageText())) | |
// download a file from Blob Storage | |
val blob = BlobStorage(accessName, accessKey) | |
blob.downloadBlob(URL(message.data.url)) | |
// delete a message | |
deleteMessage(messageURL, item.messageId(), item.popReceipt()) | |
} | |
} | |
.blockingGet() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment