Last active
March 29, 2017 05:47
-
-
Save erasmas/1e371d8521328fcad114 to your computer and use it in GitHub Desktop.
Cascalog workflow to copy data from CSV to Parquet. How do I fix this so that schema fields are not prepended with '?' ?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
id | airport_ref | airport_ident | type | description | frequency_mhz | |
---|---|---|---|---|---|---|
70518 | 6528 | 00CA | CTAF | CTAF | 122.9 | |
307581 | 6589 | 01FL | ARCAL | 122.9 | ||
75239 | 6589 | 01FL | CTAF | CEDAR KNOLL TRAFFIC | 122.8 | |
60191 | 6756 | 04CA | CTAF | CTAF | 122.9 | |
59287 | 6779 | 04MS | UNIC | UNICOM | 122.8 | |
60682 | 6784 | 04NV | UNIC | UNICOM | 123 | |
60091 | 6812 | 05CL | CTAF | CTAF | 122.9 | |
63835 | 6853 | 05UT | UNIC | UNICOM | 122.8 | |
70676 | 6868 | 06FA | APP | PALM BEACH APP | 124.6 | |
70677 | 6868 | 06FA | GND | GND | 121.65 | |
70678 | 6868 | 06FA | TWR | TWR | 120.4 | |
65868 | 6887 | 06MO | CTAF | CTAF | 122.9 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns cascalog-sandbox.core | |
(:require [cascalog.cascading.tap :as tap] | |
[cascalog.cascading.operations :refer [rename*]] | |
[cascalog.more-taps :refer [lfs-delimited]] | |
[cascalog.api :refer :all]) | |
(:import (parquet.cascading ParquetTupleScheme) | |
(cascading.tuple Fields) | |
(parquet.schema PrimitiveType MessageType PrimitiveType$PrimitiveTypeName Type$Repetition))) | |
(def repititions | |
{:optional Type$Repetition/OPTIONAL | |
:repeated Type$Repetition/REPEATED | |
:required Type$Repetition/REQUIRED}) | |
(def types | |
{:fixed-len-byte-array PrimitiveType$PrimitiveTypeName/FIXED_LEN_BYTE_ARRAY | |
:float PrimitiveType$PrimitiveTypeName/FLOAT | |
:double PrimitiveType$PrimitiveTypeName/DOUBLE | |
:boolean PrimitiveType$PrimitiveTypeName/BOOLEAN | |
:binary PrimitiveType$PrimitiveTypeName/BINARY | |
:int32 PrimitiveType$PrimitiveTypeName/INT32 | |
:int64 PrimitiveType$PrimitiveTypeName/INT64 | |
:int96 PrimitiveType$PrimitiveTypeName/INT96 | |
}) | |
(defn create-primitive-type | |
[repitition type name] | |
(PrimitiveType. (repitition repititions) (type types) name)) | |
(defn create-parquet-schema | |
[name mappings] | |
(let [types (map (fn [{:keys [repitition type name]}] | |
(create-primitive-type repitition type name)) mappings) | |
message-type (MessageType. name types)] | |
(.toString message-type))) | |
;; Use cascalog.cascading.operations/rename* as suggested here: | |
;; https://github.com/nathanmarz/cascalog/issues/18 | |
(let [fields ["id" "airport_ref" "airport_ident" "type" "description" "frequency_mhz"] | |
out-fields (map #(str "?" %) fields) | |
parquet-fields (Fields. (into-array String fields)) | |
schema (create-parquet-schema "schema" [{:repitition :optional :type :int32 :name "id"} | |
{:repitition :optional :type :int32 :name "airport_ref"} | |
{:repitition :optional :type :binary :name "airport_ident"} | |
{:repitition :optional :type :binary :name "type"} | |
{:repitition :optional :type :binary :name "description"} | |
{:repitition :optional :type :float :name "frequency_mhz"}]) | |
scheme (ParquetTupleScheme. parquet-fields parquet-fields schema) | |
sink (tap/lfs-tap scheme "/Users/dmi3y/Downloads/airport-frequencies.parquet" :sinkmode :replace) | |
source (lfs-delimited "/Users/dmi3y/Downloads/airport-frequencies.csv" :skip-header? true :delimiter ",") | |
query (<- out-fields (source :>> out-fields))] | |
(?- sink | |
(rename* query out-fields fields))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# here's how the schema of the Parquet file I get looks like | |
$ parquet-schema /data/out/airport-frequencies.parquet/part-00000-m-00000.parquet | |
message schema { | |
optional int32 ?id; | |
optional int32 ?airport_ref; | |
optional binary ?airport_ident; | |
optional binary ?type; | |
optional binary ?description; | |
optional float ?frequency_mhz; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defproject cascalog-sandbox "0.1.0-SNAPSHOT" | |
:description "FIXME: write description" | |
:url "http://example.com/FIXME" | |
:license {:name "Eclipse Public License" | |
:url "http://www.eclipse.org/legal/epl-v10.html"} | |
:repositories [["conjars.org" "http://conjars.org/repo"]] | |
:dependencies [[org.clojure/clojure "1.6.0"] | |
[cascalog "2.1.1"] | |
[cascalog-more-taps "0.3.0"] | |
[com.twitter/parquet-cascading "1.5.0"] | |
[com.twitter/parquet-column "1.5.0"]] | |
:profiles {:uberjar {:aot :all} | |
:provided {:dependencies [[org.apache.hadoop/hadoop-client "2.2.0"] | |
[org.apache.hadoop/hadoop-mapreduce-client-core "2.2.0"]]} | |
:dev {:dependencies [[org.apache.hadoop/hadoop-minicluster "2.2.0"]]}}) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Same workflow but using Cascading, output fields in Parquet file are obviously fine and not prepended with '?' | |
*/ | |
package cascading.sandbox; | |
import cascading.flow.Flow; | |
import cascading.flow.FlowDef; | |
import cascading.flow.hadoop.HadoopFlowConnector; | |
import cascading.pipe.Pipe; | |
import cascading.property.AppProps; | |
import cascading.scheme.hadoop.TextDelimited; | |
import cascading.tap.SinkMode; | |
import cascading.tap.Tap; | |
import cascading.tap.hadoop.Lfs; | |
import cascading.tuple.Fields; | |
import org.xml.sax.SAXException; | |
import parquet.cascading.ParquetTupleScheme; | |
import parquet.schema.MessageType; | |
import parquet.schema.PrimitiveType; | |
import javax.xml.parsers.ParserConfigurationException; | |
import java.io.IOException; | |
import java.util.Properties; | |
import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; | |
import static parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; | |
import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; | |
import static parquet.schema.Type.Repetition.OPTIONAL; | |
public class WriteToParquetExample { | |
public static void main(String[] args) throws ParserConfigurationException, SAXException, IOException { | |
final String[] fieldNames = { "id", "airport_ref", "airport_ident", "type", "description", "frequency_mhz" }; | |
final Fields fields = new Fields(fieldNames); | |
PrimitiveType[] types = new PrimitiveType[] { | |
new PrimitiveType(OPTIONAL, INT32, "id"), | |
new PrimitiveType(OPTIONAL, INT32, "airport_ref"), | |
new PrimitiveType(OPTIONAL, BINARY, "airport_ident"), | |
new PrimitiveType(OPTIONAL, BINARY, "type"), | |
new PrimitiveType(OPTIONAL, BINARY, "description"), | |
new PrimitiveType(OPTIONAL, FLOAT, "frequency_mhz") | |
}; | |
final MessageType message = new MessageType("schema", types); | |
final String schema = message.toString(); | |
ParquetTupleScheme scheme = new ParquetTupleScheme(fields, fields, schema); | |
Tap source = new Lfs(new TextDelimited(true, ","), "/data/in/airport-frequencies.csv"); | |
Tap sink = new Lfs(scheme, "/data/out/airport-frequencies.parquet", SinkMode.REPLACE); | |
Pipe copyPipe = new Pipe("copy"); | |
FlowDef flowDef = FlowDef.flowDef() | |
.addSource(copyPipe, source) | |
.addTailSink(copyPipe, sink); | |
final Properties properties = new Properties(); | |
AppProps.setApplicationJarClass(properties, ReadFromParquetExample.class); | |
HadoopFlowConnector flowConnector = new HadoopFlowConnector(); | |
Flow flow = flowConnector.connect(flowDef); | |
flow.complete(); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment