Skip to content

Instantly share code, notes, and snippets.

@davideanastasia
Created June 5, 2018 22:03
Show Gist options
  • Save davideanastasia/b01e5a4411e6a3c5a3fdb62372d49bf9 to your computer and use it in GitHub Desktop.
Save davideanastasia/b01e5a4411e6a3c5a3fdb62372d49bf9 to your computer and use it in GitHub Desktop.
Apache Beam Getting Started - #3
public class ReadFileFn extends DoFn<FileIO.ReadableFile, KV<Metadata, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
try {
FileIO.ReadableFile f = c.element();
String filename = f.getMetadata().resourceId().getFilename();
ReadableByteChannel rbc = f.open();
try (InputStream stream = Channels.newInputStream(rbc);
BufferedReader br = new BufferedReader(new InputStreamReader(stream))) {
long lineId = 1;
String line;
while ((line = br.readLine()) != null) {
c.output(KV.of(Metadata.of(filename, lineId), line));
lineId++;
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment