Skip to content

Instantly share code, notes, and snippets.

@ianmcook
Last active March 10, 2024 16:43
Show Gist options
  • Save ianmcook/30b9f58591ff6393d93afa6313ef974c to your computer and use it in GitHub Desktop.
Save ianmcook/30b9f58591ff6393d93afa6313ef974c to your computer and use it in GitHub Desktop.
C GLib example to receive Arrow record batches over HTTP and write to file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <stdlib.h>
#include <arrow-glib/arrow-glib.h>
#include <libsoup/soup.h>
int
main(int argc, char **argv)
{
int exit_code = EXIT_FAILURE;
SoupSession *session = soup_session_new();
SoupMessage *message = soup_message_new(SOUP_METHOD_GET,
"http://localhost:8000");
GTimer *timer = g_timer_new();
GError *error = NULL;
GInputStream *input = soup_session_send(session, message, NULL, &error);
if (error) {
g_printerr("Failed to download: %s\n", error->message);
g_error_free(error);
goto exit;
}
GArrowGIOInputStream *arrow_input = garrow_gio_input_stream_new(input);
GArrowRecordBatchStreamReader *reader =
garrow_record_batch_stream_reader_new(GARROW_INPUT_STREAM(arrow_input),
&error);
if (error) {
g_printerr("Failed to create reader: %s\n", error->message);
g_error_free(error);
g_object_unref(arrow_input);
goto exit;
}
GArrowTable *table =
garrow_record_batch_reader_read_all(GARROW_RECORD_BATCH_READER(reader),
&error);
if (error) {
g_printerr("Failed to read record batches: %s\n", error->message);
g_error_free(error);
g_object_unref(reader);
g_object_unref(arrow_input);
goto exit;
}
GArrowChunkedArray *chunked_array = garrow_table_get_column_data(table, 0);
guint n_received_record_batches =
garrow_chunked_array_get_n_chunks(chunked_array);
g_object_unref(chunked_array);
g_object_unref(reader);
g_object_unref(arrow_input);
g_timer_stop(timer);
g_print("%u record batches received\n", n_received_record_batches);
g_print("%.2f seconds elapsed\n", g_timer_elapsed(timer, NULL));
/******** BEGIN CODE ADDED TO WRITE FILE *************************************/
GArrowFileOutputStream *file_output_stream;
const gchar *path = "output.arrows"; // Output file path
error = NULL;
file_output_stream = garrow_file_output_stream_new(path, FALSE, &error);
if (error) {
g_printerr("Failed to open file for writing: %s\n", error->message);
g_error_free(error);
g_object_unref(table);
goto exit;
}
GArrowRecordBatchStreamWriter *stream_writer;
stream_writer = garrow_record_batch_stream_writer_new(GARROW_OUTPUT_STREAM(file_output_stream),
GARROW_SCHEMA(garrow_table_get_schema(table)),
&error);
if (error) {
g_printerr("Failed to create file writer: %s\n", error->message);
g_error_free(error);
g_object_unref(file_output_stream);
g_object_unref(table);
goto exit;
}
error = NULL;
garrow_record_batch_writer_write_table(GARROW_RECORD_BATCH_WRITER(stream_writer), table, &error);
if (error) {
g_printerr("Failed to write table to file: %s\n", error->message);
g_error_free(error);
g_object_unref(stream_writer);
g_object_unref(file_output_stream);
g_object_unref(table);
goto exit;
}
// Clean up
g_object_unref(stream_writer);
g_object_unref(file_output_stream);
g_object_unref(table);
/******** END CODE ADDED TO WRITE FILE ***************************************/
exit_code = EXIT_SUCCESS;
exit:
g_object_unref(input);
g_timer_destroy(timer);
g_object_unref(message);
g_object_unref(session);
return exit_code;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment