Last active
March 16, 2016 13:45
-
-
Save wang-zhijun/17a65a1e6aade47f45f8 to your computer and use it in GitHub Desktop.
RiakCS offline delete
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env escript | |
%% --------------------------------------------------------------------- | |
%% | |
%% Copyright (c) 2015 Basho Technologies, Inc. All Rights Reserved. | |
%% | |
%% This file is provided to you under the Apache License, | |
%% Version 2.0 (the "License"); you may not use this file | |
%% except in compliance with the License. You may obtain | |
%% a copy of the License at | |
%% | |
%% http://www.apache.org/licenses/LICENSE-2.0 | |
%% | |
%% Unless required by applicable law or agreed to in writing, | |
%% software distributed under the License is distributed on an | |
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
%% KIND, either express or implied. See the License for the | |
%% specific language governing permissions and limitations | |
%% under the License. | |
%% | |
%% --------------------------------------------------------------------- | |
%% @doc This is an offline deletion script that'll directly opens | |
%% bitcask files and reads some file where keys and partitions which | |
%% should be deleted are written, and then delete them, without | |
%% bothering KV. | |
%% | |
%% Note: make sure you remove AAE tree after this script was run, and | |
%% turn off AAE on other nodes that's running on the cluster. | |
-module(offline_delete). | |
-compile(export_all). | |
-mode(compile). | |
-type arg_spec() :: arg_type() | {arg_type(), arg_value()} | undefined. | |
-type arg_type() :: 'atom' | 'binary' | 'boolean' | 'float' | 'integer' | 'string'. | |
%% Data type that an argument can be converted to. | |
-type arg_value() :: atom() | binary() | boolean() | float() | integer() | string(). | |
-spec options() -> { | |
Name :: atom(), | |
Short :: char() | undefined, | |
Long :: string() | undefined, | |
ArgSpec :: arg_spec(), | |
Help :: string() | undefined | |
}. | |
options() -> | |
[{ring_size, $r, "ring-size", {integer, 64}, "Ring size"}, | |
{old_format, $O, "old-format", {boolean, false}, | |
"Use old (version 0) format for bkeys in bitcask"}, | |
{dry_run, undefined, "dry-run", {boolean, false}, | |
"if set, actual deletion does not happen"}, | |
{yes, undefined, "yes", {boolean, false}, "Automatic yes to prompt"}]. | |
main(Args) -> | |
%% bitcaskモジュールがロードされていることを確保する、すでにロードされていたら`{error, embedded}を返す | |
case code:ensure_loaded(bitcask) of | |
{module, bitcask} -> | |
ok; | |
{error, _} -> | |
io:format(standard_error, | |
"\033[31m\033[1m[Error] Riak modules are not loaded. Make sure the script run with 'riak escript', not 'riak-cs escript'.\033[0m~n", | |
[]), | |
halt(1) | |
end, | |
case getopt:parse(options(), Args) of | |
{ok, {Options, [BitcaskDir, BlocksListFile]}} -> | |
% `BitcaskDir`は`/var/lib/riak/bitcask`のようなもの | |
% `BlocksListFile`は`/tmp/tmp.txt`のようなもの | |
offline_delete(BitcaskDir, BlocksListFile, Options); | |
_Other -> | |
getopt:usage(options(), "offline_delete.erl", | |
"<bitcask_dir> <blocks_list_file>"), | |
io:format(standard_error, | |
"\033[31m\033[1m[Caution] Make sure Riak is not running!!!\033[0m~n" | |
"It'd be better if all hinted handoff have been finished before stopping Riak.~n", | |
[]) | |
end. | |
-spec open_all_bitcask(filename:filename()) -> | |
orddict:orddict(non_neg_integer(), reference()). | |
open_all_bitcask(BitcaskDir) -> | |
%% 3> file:list_dir("/home/"). | |
%% {ok,["wang","lfs"]} | |
%% 隠しファイルを含めて出力 | |
{ok, List} = file:list_dir(BitcaskDir), | |
FilterFun = fun(X) -> | |
%% 0-9で始まるディレクトリを取得 | |
%% Todo `re`に`+`の詳細. 一個以上? 1を含む? | |
case re:run(X, "^[0-9]+$") of | |
{match, _} -> | |
true; | |
_ -> | |
io:format("skipping ~p in the bitcask dir.~n", [X]), | |
false | |
end | |
end, | |
%% 上の無名関数を利用して、0-9で始まるディレクトリのリストを取得 | |
%% ディレクトリは絶対パス名ではなく、相対ファイル名 | |
%% `981946412581700398168100746981252653831329677312`のようなもののリスト[98..., ...] | |
DataDirs = lists:filter(FilterFun, List), | |
%% `File`は`981946412581700398168100746981252653831329677312`のようなもの | |
Result = lists:map(fun(File) -> | |
% フルパスのFilenameファイル名を組み立てる。結果は | |
% /var/lib/riak/bitcask/981946412581700398168100746981252653831329677312 | |
% のようなもの | |
Filename = filename:join(BitcaskDir, File), | |
case bitcask:open(Filename, [read_write]) of | |
Ref when is_reference(Ref) -> | |
{list_to_integer(File), Ref}; | |
Other -> | |
error({File, Other}) | |
end | |
end, DataDirs), | |
%% 14> D = orddict:from_list([{1, "a"},{2,"b"}]). | |
%% [{1,"a"},{2,"b"}] | |
%% 15> orddict:fetch(1,D). | |
%% "a" | |
orddict:from_list(Result). | |
-spec close_all_bitcask(orddict:orddict(non_neg_integer(), reference())) -> ok. | |
close_all_bitcask(Bitcasks) -> | |
orddict:map(fun(_, Ref) -> | |
bitcask:close(Ref) | |
end, Bitcasks). | |
%% New bitcask 1.7 format (Riak 2.0 or later) | |
-define(VERSION_1, 1). | |
-define(VERSION_BYTE, ?VERSION_1). | |
make_sure(Dir, AutomaticYes) -> | |
io:format(standard_error, | |
"\033[31m[Warning]\033\[0m~n" | |
"Make sure any Riak process using '~s' is not running " | |
"or your data may corrupt.~n", [filename:absname(Dir)]), | |
case AutomaticYes of | |
true -> | |
io:format(standard_error, "Accept the terms of conditions? [y/N] y~n", []); | |
false -> | |
"y\n" = io:get_line("Accept the terms of conditions? [y/N] ") | |
end. | |
%% BitcaskDir -> /var/lib/riak/bitcask/ | |
%% BlocksListFile -> /tmp/tmp.txt | |
offline_delete(BitcaskDir, BlocksListFile, Options) -> | |
make_sure(BitcaskDir, proplists:get_value(yes, Options)), | |
% ファイルをオープン | |
{ok, Fd} = file:open(BlocksListFile, [read]), | |
%% `BitcaskDir`を開いて、大量のディレクトリをorddictに保存する | |
%% `BC`のtypeはorddict | |
BC = open_all_bitcask(BitcaskDir), | |
io:format(standard_error, "~p bitcask directories at ~s opened.~n", | |
[length(BC), BitcaskDir]), | |
BKVersion = case proplists:get_value(old_format, Options) of | |
false -> ?VERSION_1; | |
true -> 0 | |
end, | |
% BKVersionをチェック、1になっているはず、 | |
io:format(standard_error, "Using bitcask key version: ~p.~n", | |
[BKVersion]), | |
%% 現状に合わせて、option/0関数にRingSizeを修正する必要があるよう | |
RingSize = proplists:get_value(ring_size, Options), | |
% 仮実行の場合bitcask:get()が実行される。bitcask:delete()が実行されない | |
DryRun = proplists:get_value(dry_run, Options), | |
{ok, Deleted} = for_each_line(Fd, BC, RingSize, DryRun, 0, BKVersion), | |
%% io:format(standard_error, "~p~n", [BC]), | |
Verb = case DryRun of | |
true -> "scanned"; | |
false -> "deleted" | |
end, | |
io:format(standard_error, | |
"~p blocks at ~s was ~s (dry run: ~p).~n", | |
[Deleted, BitcaskDir, Verb, DryRun]), | |
close_all_bitcask(BC), | |
ok = file:close(Fd). | |
%% `Fd`は/tmp/tmp.txtのこと | |
for_each_line(Fd, BC, RingSize, DryRun, Count, BKVersion) -> | |
case Count rem 1000 of | |
500 -> | |
io:format(standard_error, | |
"~p blocks has been deleted.~n", | |
[Count]); | |
_ -> | |
noop | |
end, | |
%% /tmp/tmp.txtを一行ずつ読み込む | |
case file:read_line(Fd) of | |
{ok, Line} -> | |
%% `\t`, ` `, `\n`は全部seperatorとして扱う | |
Tokens = string:tokens(Line, "\t \n"), | |
% `B`と`K`は/tmp/tmp.txtの一個目と2個目のフィールド | |
[B, K | _Rest] = Tokens, | |
% `B`をバイナリにする | |
Bucket = mochihex:to_bin(B), | |
% `K`をバイナリにする | |
Key = mochihex:to_bin(K), | |
%% 削除しようとしているオブジェクトストレージの準備ができているようで | |
%% 三つのレプリケーションを全部削除しようとしている | |
[V1, V2, V3] = vnode_ids({Bucket, Key}, RingSize, 3), | |
%% io:format("trying ~p~n", [{V1, V2, V3, | |
%% _UUIDStr, | |
%% list_to_integer(_SeqNo)}]), | |
%% maybe_deleteは実際に削除を行う関数 | |
%% 削除としている対象Setは`BC` | |
%% V1, V2, V3は三つのレプリケーションのvnodeかな? | |
C0 = maybe_delete(BC, V1, Bucket, Key, DryRun, BKVersion), | |
C1 = maybe_delete(BC, V2, Bucket, Key, DryRun, BKVersion), | |
C2 = maybe_delete(BC, V3, Bucket, Key, DryRun, BKVersion), | |
for_each_line(Fd, BC, RingSize, DryRun, Count+C0+C1+C2, BKVersion); | |
eof -> | |
{ok, Count}; | |
{error, Reason} -> | |
io:format(standard_error, "Error: ~p~n", Reason) | |
end. | |
maybe_delete(BC, Idx, Bucket, Key, DryRun, BKVersion) -> | |
case orddict:find(Idx, BC) of | |
{ok, Bitcask} -> | |
%% bitcaskのバージョン(0か1)によって、keyの組み立て方違う | |
BitcaskKey = make_bk(BKVersion, Bucket, Key), | |
case (case DryRun of | |
true -> | |
bitcask:get(Bitcask, BitcaskKey); | |
false -> | |
bitcask:delete(Bitcask, BitcaskKey) | |
end) of | |
{ok, _Value} -> | |
1; | |
ok -> | |
1; | |
%% bitcask:delete失敗すると、エラーが出る | |
Error -> | |
io:format(standard_error, "error: ~p ~n", [Error]), | |
0 | |
end; | |
error -> | |
%% Key does not exist here. Ignore. | |
0 | |
end. | |
%% Old bitcask format (Riak 1.4 or before) | |
make_bk(0, Bucket, Key) -> | |
term_to_binary({Bucket, Key}); | |
%% New bitcask 1.7 format (Riak 2.0 or later) | |
make_bk(1, {Type, Bucket}, Key) -> | |
TypeSz = size(Type), | |
BucketSz = size(Bucket), | |
<<?VERSION_BYTE:7, 1:1, TypeSz:16/integer, Type/binary, | |
BucketSz:16/integer, Bucket/binary, Key/binary>>; | |
%% New bitcask 1.7 format (Riak 2.0 or later) | |
make_bk(1, Bucket, Key) -> | |
BucketSz = size(Bucket), | |
<<?VERSION_BYTE:7, 0:1, BucketSz:16/integer, | |
Bucket/binary, Key/binary>>. | |
vnode_ids(BKey, RingSize, NVal) -> | |
%% chase:key_ofはriak_coreの関数 | |
%% https://github.com/basho/riak_core/blob/develop/src/chash.erl | |
<<HashKey:160/integer>> = chash:key_of(BKey), | |
Inc = chash:ring_increment(RingSize), | |
PartitionId = ((HashKey div Inc) + 1) rem RingSize, | |
[((PartitionId+N) rem RingSize) * Inc || N <- lists:seq(0, NVal-1)]. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment