-
-
Save lehoff/f44b776e734fa59058da3c016cd9ede9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(mrts). | |
-compile([export_all]). | |
%----------------------------------------------------------------------- | |
% Buckets we know about | |
%----------------------------------------------------------------------- | |
bucket() -> | |
{<<"ts_weather_demo">>,<<"ts_weather_demo">>}. | |
locationbucket() -> | |
locationbucket(mr). | |
locationbucket(mr) -> | |
{<<"locationupdateevents">>,<<"locationupdateevents">>}; | |
locationbucket(lk) -> | |
<<"locationupdateevents">>. | |
geobucket() -> | |
geobucket(mr). | |
geobucket(mr) -> | |
{<<"GeoCheckin">>,<<"GeoCheckin">>}; | |
geobucket(lk) -> | |
<<"GeoCheckin">>. | |
%----------------------------------------------------------------------- | |
% Use map reduce to count all keys in the bucket | |
%----------------------------------------------------------------------- | |
count(DevNo) -> | |
count(geobucket(), DevNo). | |
count(Bucket, DevNo) -> | |
mrtsTest(Bucket, count, DevNo). | |
%----------------------------------------------------------------------- | |
% Use map reduce to return all keys in the bucket | |
%----------------------------------------------------------------------- | |
anonkeys() -> | |
{ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", 10017), | |
riakc_pb_socket:mapred_bucket(Riak, geobucket(), []). | |
keys(Method, DevNo) -> | |
keys(Method, geobucket(Method), DevNo). | |
keys(mr, Bucket, DevNo) -> | |
mrtsTest(Bucket, keys, DevNo); | |
keys(lk, Bucket, DevNo) -> | |
Riak = pb_socket(DevNo), | |
riakc_ts:stream_list_keys(Riak, Bucket, []), | |
receive_keys([]). | |
receive_keys(Keys) -> | |
receive | |
{_, {keys, KeyList}} -> | |
receive_keys(Keys ++ KeyList); | |
{_, done} -> | |
Keys; | |
Ret -> | |
io:format(user, "Ret = ~p~n", [Ret]) | |
end. | |
%----------------------------------------------------------------------- | |
% General MR request | |
%----------------------------------------------------------------------- | |
mrtsTest(Bucket, Stat, DevNo) -> | |
Riak = pb_socket(DevNo), | |
case Stat of | |
keys -> | |
{ok, [{0, Ret}]} = riakc_pb_socket:mapred_bucket(Riak, Bucket,[{map, {modfun, riak_kv_mapreduce, map_object_value},none,true}]), | |
Ret; | |
count -> | |
{ok, [{1, [Nkey]}]} = | |
riakc_pb_socket:mapred_bucket(Riak, Bucket, | |
[{map, {qfun, fun(I,_,_) -> [1] end}, none,false}, | |
{reduce, {modfun, riak_kv_mapreduce, reduce_sum}, none, true}]), | |
Nkey | |
end. | |
pb_socket(DevNo) -> | |
{ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", dev_port(DevNo)), | |
Riak. | |
dev_port(DevNo) -> | |
10017 + (DevNo-1)*10. | |
%----------------------------------------------------------------------- | |
% Issue a formatted query | |
%----------------------------------------------------------------------- | |
query(Tmin, Tmax, Weather, Fam) -> | |
"SELECT * FROM ts_weather_demo " | |
"WHERE time >= " ++ integer_to_list(Tmin) ++ " AND time <= " ++ integer_to_list(Tmax) ++ | |
"AND weather = '" ++ Weather ++ "' " ++ | |
"AND family = '" ++ Fam ++ "' ". | |
%----------------------------------------------------------------------- | |
% Issue an arbitrary query | |
%----------------------------------------------------------------------- | |
query(location) -> | |
Q = "SELECT * FROM locationupdateevents " | |
"WHERE tstamp >= 1416321188291 AND tstamp <= 1416321228319 " | |
"AND userid = 'anon:' " | |
"AND eventid = 'a11fbcae-23bf-4535-b3f8-e5013b35e366' ", | |
query(Q); | |
query(default) -> | |
Q = "SELECT * FROM ts_weather_demo " | |
"WHERE time >= 0 AND time <= 400 " | |
"AND weather = 'crap' " | |
"AND family = 'family' ", | |
query(Q); | |
query(native) -> | |
Q = "SELECT * FROM ts_weather_demo " | |
"WHERE time >= 0 AND time <= 400 " | |
"AND weather = 'native' " | |
"AND family = 'family' ", | |
query(Q); | |
query(nonnative) -> | |
Q = "SELECT * FROM ts_weather_demo " | |
"WHERE time >= 0 AND time <= 400 " | |
"AND weather = 'non-native' " | |
"AND family = 'family' ", | |
query(Q); | |
query(Query) -> | |
io:format("Executing query: ~p~n", [Query]), | |
{ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", 10017), | |
riakc_ts:query(Riak, Query). | |
query(C, Query) -> | |
io:format("Executing query: ~p~n", [Query]), | |
riakc_ts:query(C, Query). | |
%----------------------------------------------------------------------- | |
% List buckets | |
%----------------------------------------------------------------------- | |
listBuckets() -> | |
{ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", 8087), | |
riakc_pb_socket:list_buckets(Riak). | |
getClient(UseNativeEncoding) -> | |
C = pb_socket(1), | |
riakc_pb_socket:use_native_encoding(C, UseNativeEncoding), | |
C. | |
putC(C) -> | |
Data = [[<<"test">>, 54.0, <<"family">>, 0.2, 100, 500.0]], | |
riakc_ts:put(C, <<"ts_weather_demo">>, Data). | |
putTs(UseNativeEncoding) -> | |
{ok, C} = riakc_pb_socket:start_link("127.0.0.1", 8087), | |
riakc_pb_socket:use_native_encoding(C, UseNativeEncoding), | |
case UseNativeEncoding of | |
true -> | |
Data = [[<<"native">>, <<"family">>, 100, 54.0, 0.2, 500.0]]; | |
_ -> | |
Data = [[<<"non-native">>, <<"family">>, 100, 54.0, 0.2, 500.0]] | |
end, | |
riakc_ts:put(C, <<"ts_weather_demo">>, Data). | |
putTestKeys() -> | |
{ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", 8087), | |
riakc_pb_socket:put(Riak, riakc_obj:new(<<"TestBucket">>, <<"myKey1">>, <<"val1">>)), | |
riakc_pb_socket:put(Riak, riakc_obj:new(<<"TestBucket">>, <<"myKey2">>, <<"val2">>)), | |
riakc_pb_socket:put(Riak, riakc_obj:new(<<"TestBucket">>, <<"myKey3">>, <<"val3">>)). | |
listLength(L) -> | |
docount(L). | |
docount(L) -> | |
docount(L, 0). | |
docount({}, Acc) -> | |
Acc; | |
docount([], Acc) -> | |
Acc; | |
docount(L, Acc) -> | |
[H|R] = L, | |
docount(R,Acc+1). | |
keycount(Method, _DevNo, 0, _Nexpected) -> | |
ok; | |
keycount(Method, DevNo, N, Nexpected) -> | |
Nexpected = keycount(Method, DevNo), | |
keycount(Method, DevNo, N-1, Nexpected). | |
keycount(Method, DevNo) -> | |
Keys = keys(Method, DevNo), | |
docount(Keys). | |
keylist(Method, DevNo) -> | |
Keys = keys(Method, DevNo), | |
{ok, Log} = file:open("keys.out", [append]), | |
listkey(Keys, 0, Log, Method). | |
listkey({}, Acc, Log, Method) -> | |
Acc; | |
listkey([], Acc, Log, Method) -> | |
Acc; | |
listkey(L, Acc, Log, mr) -> | |
[H|R] = L, | |
{_,Userid} = lists:nth(1,H), | |
{_,Tstamp} = lists:nth(3,H), | |
io:format(Log,"~s~s~n",[Userid,integer_to_list(Tstamp)]), | |
listkey(R,Acc+1,Log,mr); | |
listkey(L, Acc, Log, lk) -> | |
[H|R] = L, | |
T = tuple_to_list(H), | |
Userid = lists:nth(1,T), | |
Tstamp = lists:nth(3,T), | |
io:format(Log,"~s~s~n",[Userid,integer_to_list(Tstamp)]), | |
listkey(R,Acc+1,Log,lk). | |
clientTest(pb) -> | |
Cttb = getClient(true), | |
Cpb = getClient(false), | |
unlink(Cttb), | |
unlink(Cpb), | |
{Cpb, Cttb}; | |
clientTest(ttb) -> | |
Cpb = getClient(false), | |
Cttb = getClient(true), | |
unlink(Cttb), | |
unlink(Cpb), | |
{Cpb, Cttb}. | |
pt() -> | |
Tuple = process_info(self(), current_function), | |
io:format("Process info = ~p~n", [Tuple]). | |
dq() -> | |
query("SELECT * FROM GeoCheckin WHERE time >= 0 AND time < 1000 AND myfamily = 'family1' AND myseries = 'seriesX'"). | |
dq(_UseNative) -> | |
{ok, C} = riakc_pb_socket:start_link("127.0.0.1", 10017), | |
%% riakc_pb_socket:use_native_encoding(C, UseNative), | |
Query = "SELECT * FROM GeoCheckin WHERE time >= 0 AND time < 1000 AND myfamily = 'family1' AND myseries = 'seriesX'", | |
query(C, Query). | |
dp(UseNative) -> | |
dp(UseNative, 1). | |
dp(_UseNative, Time) -> | |
{ok, C} = riakc_pb_socket:start_link("127.0.0.1", 10017), | |
%% riakc_pb_socket:use_native_encoding(C, UseNative), | |
qfPutC(C, Time). | |
dp(UseNativeInit, UseNativeSend, Time) -> | |
{ok, C} = riakc_pb_socket:start_link("127.0.0.1", 10017), | |
riakc_pb_socket:use_native_encoding(C, UseNativeInit), | |
qfPutC(C, UseNativeSend, Time). | |
qfPutC(C, Time) -> | |
Data = [[<<"family1">>, <<"seriesX">>, Time, 1, <<"test1">>, 1.0, true]], | |
riakc_ts:put(C, <<"GeoCheckin">>, Data). | |
qfPutC(C, UseNativeSend, Time) -> | |
Data = [[<<"family1">>, <<"seriesX">>, Time, 1, <<"test1">>, 1.0, true]], | |
riakc_ts:put(C, <<"GeoCheckin">>, Data, UseNativeSend). | |
queryfailtest() -> | |
Data = [[<<"family1">>, <<"seriesX">>, 100, 1, <<"test1">>, 1.0, true]], | |
{ok, Cttb} = riakc_pb_socket:start_link("127.0.0.1", 10017), | |
riakc_pb_socket:use_native_encoding(Cttb, true), | |
riakc_ts:put(Cttb, <<"GeoCheckin">>, Data), | |
{ok, Cpb} = riakc_pb_socket:start_link("127.0.0.1", 10017), | |
riakc_pb_socket:use_native_encoding(Cpb, false), | |
riakc_ts:put(Cpb, <<"GeoCheckin">>, Data), | |
riakc_ts:put(Cttb, <<"GeoCheckin">>, Data). | |
append_file(Filename, Bytes) -> | |
case file:open(Filename, [append]) of | |
{ok, IoDevice} -> | |
file:write(IoDevice, Bytes), | |
file:close(IoDevice); | |
{error, Reason} -> | |
io:format("~s open error reason:~s~n", [Filename, Reason]) | |
end. | |
getids(CoverageVNodes) -> | |
[riak_core_coverage_plan:index_to_id(X,64) || {X, _} <- CoverageVNodes]. | |
putNormal() -> | |
C = getClient(), | |
Key = <<"test">>, | |
Data = [[<<"family1">>, <<"seriesX">>, 100, 1, <<"test1">>, 1.0, true]], | |
Obj = riakc_obj:new({<<"GeoCheckin">>,<<"GeoCheckin">>}, Key, Data), | |
riakc_pb_socket:put(C, Obj). | |
putTs() -> | |
C = getClient(), | |
Data = [[<<"family1">>, <<"seriesX">>, 100, 1, <<"test1">>, 1.0, true]], | |
riakc_ts:put(C, <<"GeoCheckin">>, Data). | |
%----------------------------------------------------------------------- | |
% Return the node ids for the current ring | |
%----------------------------------------------------------------------- | |
getringnodes() -> | |
{_,_,_,Tuple,_,_,_,_,_,_,_} = riak_core_ring:fresh(), | |
{8, NodeList} = Tuple, | |
[X || {X, _} <- NodeList]. | |
%----------------------------------------------------------------------- | |
% Get the ID of the vnode to which this Bucket,Key pair will be hashed | |
%----------------------------------------------------------------------- | |
getnodeid(Bucket, Key) -> | |
riak_core_apl:get_primary_apl(riak_core_util:chash_key({Bucket, Key}), 1, riak_kv). | |
%----------------------------------------------------------------------- | |
% Find unique keys for the current ring partitions | |
%----------------------------------------------------------------------- | |
findkeys() -> | |
findkeys(getringnodes()). | |
%----------------------------------------------------------------------- | |
% Find unique keys for the passed vnode ids | |
%----------------------------------------------------------------------- | |
findkeys(Nodes) -> | |
findkeys(Nodes, Nodes, [], 0). | |
findkeys(_Nodes, [], Keys, _Acc) -> | |
Keys; | |
findkeys(Nodes, RemainingNodes, Keys, Acc) -> | |
Key = integer_to_binary(Acc), | |
[{{NodeId,_},primary}] = getnodeid(Key), | |
case lists:filter(fun(Elem) -> Elem == NodeId end, RemainingNodes) of | |
[] -> | |
NodesLeft = RemainingNodes, | |
NewKeys = Keys; | |
_ -> | |
NodesLeft = lists:delete(NodeId, RemainingNodes), | |
NewKeys = lists:append(Keys, [Key]) | |
end, | |
findkeys(Nodes, NodesLeft, NewKeys, Acc+1). | |
%----------------------------------------------------------------------- | |
% Write keys to the specified bucket (KV write) | |
%----------------------------------------------------------------------- | |
writeToKvPartitions(Bucket, Keys) -> | |
C = getClient(), | |
PutFun = | |
fun(Key) -> | |
Obj = riakc_obj:new(Bucket, Key, Key), | |
ok = riakc_pb_socket:put(C, Obj) | |
end, | |
[PutFun(X) || X <- Keys]. | |
%----------------------------------------------------------------------- | |
% Write one key per partition to the specified bucket | |
%----------------------------------------------------------------------- | |
writeOneKeyPerKvPartition(Bucket) -> | |
Keys = findkeys(), | |
writeToKvPartitions(Bucket, Keys). | |
%%======================================================================= | |
%% Torben's additions below | |
%%======================================================================= | |
%% adapted from riak_test ts_cluster_coverage | |
quantum_ms() -> | |
15*60*1000. | |
step() -> | |
3124. | |
upper_bound_excl(QuantaTally) -> | |
QuantaTally * quantum_ms(). | |
timestamps(QuantaTally) -> | |
Bound = upper_bound_excl(QuantaTally), | |
lists:seq(1, Bound-1, step()). | |
timestamps() -> | |
timestamps(100). | |
%% key analysis | |
key_timestamp({_,_,T}) -> T. | |
timestamps_of_keys(Keys) -> | |
lists:map( fun key_timestamp/1, Keys). | |
timestamp_analysis(TSList) -> | |
List = lists:sort(TSList), | |
Expected = timestamps(), | |
Duplicates = List -- Expected, | |
UniqueDuplicates = lists:usort(Duplicates), | |
Missing = Expected -- List, | |
Unique = lists:usort(List), | |
UniqueCount = length(Unique), | |
DuplicatesCount = length(Duplicates), | |
UniqueDuplicatesCount = length(UniqueDuplicates), | |
MissingCount = length(Missing), | |
ReceivedCount = length(List), | |
[{received_count, ReceivedCount}, | |
{unique_count, UniqueCount}, | |
{missing_count, MissingCount}, | |
{duplicates_count, DuplicatesCount}, | |
{unique_duplicates_count, UniqueDuplicatesCount}, | |
{missing, Missing}, | |
{duplicates, Duplicates}, | |
{unique_duplicates, UniqueDuplicates}]. | |
get_keys_and_analyse(DevNo) -> | |
TSList = mrts:timestamps_of_keys(mrts:keys(lk, DevNo)), | |
timestamp_analysis(TSList). | |
analysis(DurationInSecs) -> | |
StartTime = current_time(), | |
EndTime = end_time(StartTime, DurationInSecs), | |
start_mrts_pick_dev(), | |
Res = run_analysis(initial_stats(), EndTime), | |
stop_mrts_pick_dev(), | |
Res. | |
run_once() -> | |
Start = current_time(), | |
DevNo = pick_dev(), | |
Res = get_keys_and_analyse(DevNo), | |
Hash = erlang:phash2(Res), | |
{Hash, Res, DevNo, Start}. | |
pick_dev() -> | |
mrts_pick_dev ! {pick, self()}, | |
receive | |
DevNo -> | |
DevNo | |
end. | |
devs() -> | |
lists:seq(1,3). | |
start_mrts_pick_dev() -> | |
Pid = spawn( fun() -> pick_dev_loop(devs()) end ), | |
register(mrts_pick_dev, Pid). | |
stop_mrts_pick_dev() -> | |
case whereis(mrts_pick_dev) of | |
Pid when is_pid(Pid) -> | |
exit(Pid, kill); | |
_ -> | |
ok | |
end. | |
pick_dev_loop([]) -> | |
pick_dev_loop(shuffle(devs())); | |
pick_dev_loop([Dev|Devs]) -> | |
receive | |
{pick, From} -> | |
From ! Dev, | |
pick_dev_loop(Devs) | |
end. | |
shuffle(L) -> | |
[X || | |
{_,X} <- lists:sort([ {random:uniform(), N} | |
|| N <- L])]. | |
run_analysis(Stats, EndTime) -> | |
{_,_,_, Start} = Res = run_once(), | |
NewStats = update_stats(Stats, Res), | |
case Start > EndTime of | |
true -> | |
NewStats; | |
false -> | |
run_analysis(NewStats, EndTime) | |
end. | |
initial_stats() -> | |
E = orddict:new(), | |
{E, E, E, E, E}. | |
update_stats({HashRes, DevCounts, DevTimes, Counts, Times}, {Hash, Res, DevNo, Start}) -> | |
NewHashRes = orddict:store(Hash, Res, HashRes), | |
Key = {Hash, DevNo}, | |
NewDevCounts = inc_count(Key, DevCounts), | |
NewDevTimes = add_time(Key, Start, DevTimes), | |
NewCounts = inc_count(Hash, Counts), | |
NewTimes = add_time(Hash, Start, Times), | |
{NewHashRes, NewDevCounts, NewDevTimes, NewCounts, NewTimes}. | |
inc_count(Key, Counts) -> | |
orddict:update_counter(Key, 1, Counts). | |
add_time(Key, Start, Times) -> | |
orddict:append(Key, Start, Times). | |
current_time() -> | |
erlang:localtime(). | |
end_time(StartTime, DurationInSecs) -> | |
S = calendar:datetime_to_gregorian_seconds(StartTime), | |
E = S + DurationInSecs, | |
calendar:gregorian_seconds_to_datetime(E). | |
write_results(HashRes) -> | |
List = orddict:to_list(HashRes), | |
lists:foreach( fun write_res/1, | |
HashRes ). | |
write_res({Hash, Res}) -> | |
Filename = io_lib:format("~B.txt", [Hash]), | |
ResStr = io_lib:format("~p~n", [Res]), | |
file:write_file(Filename, ResStr). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment