Last active
August 23, 2024 15:49
-
-
Save ianmcook/f7b76163c69eee151715cb8e0c44d795 to your computer and use it in GitHub Desktop.
Java example to receive Arrow record batches over HTTP and write to file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
import org.apache.arrow.memory.BufferAllocator; | |
import org.apache.arrow.memory.RootAllocator; | |
import org.apache.arrow.vector.VectorSchemaRoot; | |
import org.apache.arrow.vector.VectorUnloader; | |
import org.apache.arrow.vector.types.pojo.Schema; | |
import org.apache.arrow.vector.ipc.ArrowStreamReader; | |
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.net.HttpURLConnection; | |
import java.net.URL; | |
import java.util.List; | |
import java.util.ArrayList; | |
/******** BEGIN IMPORTS ADDED FOR FILE WRITING ********/ | |
import org.apache.arrow.vector.VectorLoader; | |
import org.apache.arrow.vector.ipc.ArrowStreamWriter; | |
import java.io.File; | |
import java.io.FileOutputStream; | |
/******** END IMPORTS ADDED FOR FILE WRITING **********/ | |
public class ArrowHttpClient { | |
public static void main(String[] args) { | |
String serverUrl = "http://localhost:8008"; | |
try { | |
long startTime = System.currentTimeMillis(); | |
URL url = new URL(serverUrl); | |
HttpURLConnection connection = (HttpURLConnection) url.openConnection(); | |
connection.setRequestMethod("GET"); | |
if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) { | |
InputStream inputStream = connection.getInputStream(); | |
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); | |
ArrowStreamReader reader = new ArrowStreamReader(inputStream, allocator); | |
VectorSchemaRoot root = reader.getVectorSchemaRoot(); | |
VectorUnloader unloader = new VectorUnloader(root); | |
ArrowRecordBatch batch; | |
Schema schema = root.getSchema(); | |
List<ArrowRecordBatch> batches = new ArrayList<>(); | |
int numRows = 0; | |
while (reader.loadNextBatch()) { | |
numRows += root.getRowCount(); | |
batch = unloader.getRecordBatch(); | |
batches.add(batch); | |
} | |
long endTime = System.currentTimeMillis(); | |
float execTime = (endTime - startTime) / 1000F; | |
System.out.println(reader.bytesRead() + " bytes received"); | |
System.out.println(numRows + " records received"); | |
System.out.println(batches.size() + " record batches received"); | |
System.out.printf("%.2f seconds elapsed\n", execTime); | |
reader.close(); | |
/******** BEGIN CODE ADDED FOR FILE WRITING ********/ | |
File file = new File("output.arrows"); | |
try ( | |
FileOutputStream fileOutputStream = new FileOutputStream(file); | |
VectorSchemaRoot root2 = VectorSchemaRoot.create(schema, allocator); | |
ArrowStreamWriter writer = new ArrowStreamWriter(root2, /*provider*/null, fileOutputStream.getChannel()); | |
) { | |
VectorLoader loader = new VectorLoader(root2); | |
writer.start(); | |
for (ArrowRecordBatch bat : batches) { | |
loader.load(bat); | |
writer.writeBatch(); | |
} | |
writer.end(); | |
} | |
/******** END CODE ADDED FOR FILE WRITING **********/ | |
} else { | |
System.err.println("Failed with response code: " + connection.getResponseCode()); | |
} | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment