-
-
Save z5ottu/e310ff6ac9b70199d1aa280b36db0421 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%% ------------------------------------------------------------------- | |
%% | |
%% riak_csv_etl: etl tools for csv files in Riak | |
%% | |
%% Copyright (c) 2011 Bradley Taylor. All Rights Reserved. | |
%% | |
%% This file is provided to you under the Apache License, | |
%% Version 2.0 (the "License"); you may not use this file | |
%% except in compliance with the License. You may obtain | |
%% a copy of the License at | |
%% | |
%% http://www.apache.org/licenses/LICENSE-2.0 | |
%% | |
%% Unless required by applicable law or agreed to in writing, | |
%% software distributed under the License is distributed on an | |
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
%% KIND, either express or implied. See the License for the | |
%% specific language governing permissions and limitations | |
%% under the License. | |
%% | |
%% ------------------------------------------------------------------- | |
%% Example usage: | |
%% {ok, C} = riak:local_client(). | |
%% B = <<"HealthScores_CSV">>. | |
%% P = "../resturants.csv". | |
%% riak_csv_etl:load_file(C, B, P). | |
%% F = riak_csv_etl:get_fields(C, B). | |
%% {ok, JList} = C:mapred(B,[{map, {modfun, riak_csv_etl,json_encode_map},F,true}]). | |
%% B2 = <<"HealthScores_JSON">>. | |
%% riak_csv_etl:put_list(C, B2, JList). | |
-module(riak_csv_etl). | |
-export([load_file/3, json_encode_map/3, json_encode_and_put_map/3, put_list/3, put_fields/3, get_fields/2]). | |
%% Loads contents of a csv into a bucket. | |
%% -generates md5 key | |
%% -assumes first line contains field names and stashes in bucket props | |
%% -stores value as erlang list of binary values | |
%% -use mapreduce to transform to other formats | |
%% | |
%% To get the stashed fields: | |
%% F = riak_csv_etl:get_fields(C, B). | |
%% | |
load_file(Client, Bucket, Path) -> | |
{ok, File} = file:open(Path, read), | |
load_line(Client, Bucket, File, first). | |
%% Encode value to json | |
json_encode_map(Value, _KeyData, Fields) -> | |
V = riak_object:get_value(Value), | |
K = riak_object:key(Value), | |
J = json_encode(Fields, V), | |
[{K,J}]. | |
%% Encode value to json and put in specified Bucket | |
json_encode_and_put_map(Value, KeyData, {Fields, Bucket}) -> | |
[{K, V}] = json_encode_map(Value, KeyData, Fields), | |
{ok, C} = riak:local_client(), | |
Res = C:put(riak_object:new(Bucket, K, V)), | |
[Res]. | |
put_list(C, B, List) -> | |
PFun = fun({K,V}) -> C:put(riak_object:new(B, K, V)) end, | |
lists:foreach(PFun, List). | |
get_fields(Client, Bucket) -> | |
proplists:get_value(csv_fields, Client:get_bucket(Bucket)). | |
put_fields(Client, Bucket, Fields) -> | |
Client:set_bucket(Bucket, [{csv_fields, Fields}]). | |
%% Given a list of keys and list of values, make JSON | |
json_encode(Fields, Value) -> | |
Z = lists:zip(Fields, Value), | |
J = mochijson2:encode({struct,Z}), | |
iolist_to_binary(J). | |
%% Read a line and delegate action | |
load_line(C, B, F) -> | |
load_line(C, B, F, file:read_line(F)). | |
%% read first line and stash list of fields as atoms in bucket properties | |
%% access these later in map functions. | |
load_line(C, B, F, first) -> | |
{ok, Data} = file:read_line(F), | |
% convert to field names to atoms | |
Fields = [list_to_atom(Field) || Field <- read(Data)], | |
put_fields(C, B, Fields), | |
load_line(C, B, F); | |
%% extract and load binary data into Riak | |
load_line(C, B, F, {ok, D}) -> | |
Data = [list_to_binary(V) || V <- read(D)], | |
Key = make_key(Data), | |
Obj = riak_object:new(B, Key, Data), | |
C:put(Obj), | |
load_line(C, B, F); | |
%% eof, we're done! | |
load_line(_, _, _, eof) -> | |
win; | |
%% skip unhandled weirdness | |
load_line(C, B, F, _) -> | |
load_line(C, B, F). | |
%% generate cheezy md5 key since we don't know what the logical key is. | |
make_key(Stuffs) -> | |
crypto:md5(list_to_binary(Stuffs)). | |
%% Awesome (and insane) CSV splitting courtesy of TrapExit | |
%% http://www.trapexit.org/Comma_Separated_Values | |
read(String) -> hd(read(String, [])). | |
read([], Acc) -> | |
lists:reverse(Acc); | |
read(String, []) -> | |
{Line, Rest} = read_line(String), | |
read(Rest, [Line]); | |
read([10|String], Acc) -> | |
{Line, Rest} = read_line(String), | |
read(Rest, [Line|Acc]); | |
read([13,10|String], Acc) -> | |
{Line, Rest} = read_line(String), | |
read(Rest, [Line|Acc]). | |
add_spaces(0, String) -> String; | |
add_spaces(Count, String) -> add_spaces(Count-1, [$ |String]). | |
read_item([34|T]) -> read_item_quoted(T, []); | |
read_item(Other) -> read_item(Other, 0, []). | |
read_item([32|T], 0, []) -> read_item(T, 0, []); | |
read_item([9|T], 0, []) -> read_item(T, 0, []); | |
read_item([10|T], _SpaceCount, Acc) -> {lists:reverse(Acc), [10|T]}; | |
read_item([13,10|T], _SpaceCount, Acc) -> {lists:reverse(Acc), [13,10|T]}; | |
read_item([$,|T], _SpaceCount, Acc) -> {lists:reverse(Acc), [$,|T]}; | |
read_item([], _SpaceCount, Acc) -> {lists:reverse(Acc), []}; | |
read_item([9|T], SpaceCount, Acc) -> read_item(T, SpaceCount+1, Acc); | |
read_item([32|T], SpaceCount, Acc) -> read_item(T, SpaceCount+1, Acc); | |
read_item([C|T], SpaceCount, Acc) -> read_item(T, 0, [C|add_spaces(SpaceCount, Acc)]). | |
read_item_quoted([34,34|T], Acc) -> read_item_quoted(T, [34|Acc]); | |
read_item_quoted([34|T], Acc) -> {lists:reverse(Acc), T}; | |
read_item_quoted([C|T], Acc) -> read_item_quoted(T, [C|Acc]). | |
read_line(String) -> read_line(String,[]). | |
read_line([10|T], Acc) -> {lists:reverse(Acc), [10|T]}; | |
read_line([13,10|T], Acc) -> {lists:reverse(Acc), [13|T]}; | |
read_line([], Acc) -> {lists:reverse(Acc), []}; | |
read_line(String, []) -> {Item, Rest} = read_item(String), read_line(Rest, [Item]); | |
read_line([$,|String], Acc) -> {Item, Rest} = read_item(String), read_line(Rest, [Item|Acc]). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment