Created
November 8, 2016 23:18
-
-
Save BryanCutler/930ecd1de1c6d9484931505dcb6bb321 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import struct | |
import sys | |
from pyarrow.ipc import ArrowFileReader | |
def _read_int(stream): | |
length = stream.read(4) | |
if not length: | |
raise EOFError | |
return struct.unpack("!i", length)[0] | |
def _read_with_length(stream): | |
length = _read_int(stream) | |
obj = stream.read(length) | |
if len(obj) < length: | |
raise EOFError | |
return obj | |
def _read_record_batch(obj): | |
reader = ArrowFileReader(obj) | |
print("ArrowFileReader, num batches %d" % reader.num_record_batches) | |
return reader.get_record_batch(0) | |
if __name__ == '__main__': | |
if len(sys.argv) != 2: | |
print("Usage: arrow_client_reader.py <PORT>") | |
port = int(sys.argv[1]) | |
sock = None | |
# Support for both IPv4 and IPv6. | |
# On most of IPv6-ready systems, IPv6 will take precedence. | |
for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): | |
af, socktype, proto, canonname, sa = res | |
sock = socket.socket(af, socktype, proto) | |
try: | |
sock.connect(sa) | |
except socket.error: | |
sock.close() | |
sock = None | |
continue | |
break | |
if not sock: | |
raise Exception("could not open socket on port %d" % port) | |
try: | |
rf = sock.makefile("rb", 65536) | |
obj = _read_with_length(rf) | |
batch = _read_record_batch(obj) | |
finally: | |
sock.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.netty.buffer.ArrowBuf; | |
import org.apache.arrow.memory.BufferAllocator; | |
import org.apache.arrow.memory.RootAllocator; | |
import org.apache.arrow.vector.file.ArrowWriter; | |
import org.apache.arrow.vector.schema.ArrowFieldNode; | |
import org.apache.arrow.vector.schema.ArrowRecordBatch; | |
import org.apache.arrow.vector.types.pojo.Field; | |
import org.apache.arrow.vector.types.pojo.ArrowType; | |
import org.apache.arrow.vector.types.pojo.Schema; | |
import static java.util.Arrays.asList; | |
import java.io.*; | |
import java.net.ServerSocket; | |
import java.net.Socket; | |
import java.nio.channels.Channels; | |
import java.util.Collections; | |
import java.util.concurrent.CountDownLatch; | |
public class ArrowJavaToPython { | |
public final static int PORT = 8080; | |
static ArrowBuf buf(byte[] bytes, BufferAllocator allocator) { | |
ArrowBuf buffer = allocator.buffer(bytes.length); | |
buffer.writeBytes(bytes); | |
return buffer; | |
} | |
public static void writeBatchToStream(OutputStream out, BufferAllocator allocator) throws IOException { | |
Schema schema = new Schema(asList(new Field("testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList()))); | |
byte[] validity = new byte[] { (byte)255, 0}; | |
// second half is "undefined" | |
byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; | |
try (ArrowWriter writer = new ArrowWriter(Channels.newChannel(out), schema)) { | |
ArrowBuf validityb = buf(validity, allocator); | |
ArrowBuf valuesb = buf(values, allocator); | |
writer.writeRecordBatch(new ArrowRecordBatch(16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb))); | |
writer.close(); | |
} | |
} | |
public static void main(String[] args) { | |
CountDownLatch latch = new CountDownLatch(1); | |
Thread serverThread = new Thread(new Runnable() { | |
@Override | |
public void run() { | |
try { | |
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); | |
// Listen on port | |
ServerSocket serverSocket = new ServerSocket(PORT); | |
System.out.println("Waiting for clients to connect..."); | |
latch.countDown(); | |
Socket clientSocket = serverSocket.accept(); | |
System.out.println("... Client Connected"); | |
// Write record batch to ByteArray | |
ByteArrayOutputStream arr = new ByteArrayOutputStream(); | |
writeBatchToStream(arr, allocator); | |
// Send framed-ByteArray over socket | |
DataOutputStream dataOut = new DataOutputStream(new BufferedOutputStream(clientSocket.getOutputStream())); | |
dataOut.writeInt(arr.size()); | |
dataOut.write(arr.toByteArray()); | |
dataOut.close(); | |
} catch (IOException e) { | |
System.err.println("Unable to process client request"); | |
} | |
} | |
}); | |
serverThread.start(); | |
Thread pyThread = new Thread(new Runnable() { | |
@Override | |
public void run() { | |
ProcessBuilder builder = new ProcessBuilder("/usr/bin/python", "arrow_client_reader.py", Integer.toString(PORT)); | |
builder.environment().putAll(System.getenv()); | |
Process process = null; | |
try { | |
builder.inheritIO(); | |
latch.await(); | |
System.out.println("Starting python process"); | |
process = builder.start(); | |
int exitCode = process.waitFor(); | |
if (exitCode != 0) { | |
throw new RuntimeException("failed with exit code: " + exitCode); | |
} | |
} catch (Exception e) { | |
System.err.println("Python process exception: " + e); | |
} finally { | |
if (process != null) { | |
process.destroy(); | |
} | |
} | |
} | |
}); | |
pyThread.start(); | |
try { | |
pyThread.join(); | |
serverThread.join(); | |
} catch (InterruptedException e) {} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment