Skip to content

Instantly share code, notes, and snippets.

@wang-zhijun
Last active March 16, 2016 13:45
Show Gist options
  • Save wang-zhijun/17a65a1e6aade47f45f8 to your computer and use it in GitHub Desktop.
Save wang-zhijun/17a65a1e6aade47f45f8 to your computer and use it in GitHub Desktop.
RiakCS offline delete
#!/usr/bin/env escript
%% ---------------------------------------------------------------------
%%
%% Copyright (c) 2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% ---------------------------------------------------------------------
%% @doc This is an offline deletion script that'll directly opens
%% bitcask files and reads some file where keys and partitions which
%% should be deleted are written, and then delete them, without
%% bothering KV.
%%
%% Note: make sure you remove AAE tree after this script was run, and
%% turn off AAE on other nodes that's running on the cluster.
-module(offline_delete).
-compile(export_all).
-mode(compile).
-type arg_spec() :: arg_type() | {arg_type(), arg_value()} | undefined.
-type arg_type() :: 'atom' | 'binary' | 'boolean' | 'float' | 'integer' | 'string'.
%% Data type that an argument can be converted to.
-type arg_value() :: atom() | binary() | boolean() | float() | integer() | string().
-spec options() -> {
Name :: atom(),
Short :: char() | undefined,
Long :: string() | undefined,
ArgSpec :: arg_spec(),
Help :: string() | undefined
}.
options() ->
[{ring_size, $r, "ring-size", {integer, 64}, "Ring size"},
{old_format, $O, "old-format", {boolean, false},
"Use old (version 0) format for bkeys in bitcask"},
{dry_run, undefined, "dry-run", {boolean, false},
"if set, actual deletion does not happen"},
{yes, undefined, "yes", {boolean, false}, "Automatic yes to prompt"}].
main(Args) ->
%% bitcaskモジュールがロードされていることを確保する、すでにロードされていたら`{error, embedded}を返す
case code:ensure_loaded(bitcask) of
{module, bitcask} ->
ok;
{error, _} ->
io:format(standard_error,
"\033[31m\033[1m[Error] Riak modules are not loaded. Make sure the script run with 'riak escript', not 'riak-cs escript'.\033[0m~n",
[]),
halt(1)
end,
case getopt:parse(options(), Args) of
{ok, {Options, [BitcaskDir, BlocksListFile]}} ->
% `BitcaskDir`は`/var/lib/riak/bitcask`のようなもの
% `BlocksListFile`は`/tmp/tmp.txt`のようなもの
offline_delete(BitcaskDir, BlocksListFile, Options);
_Other ->
getopt:usage(options(), "offline_delete.erl",
"<bitcask_dir> <blocks_list_file>"),
io:format(standard_error,
"\033[31m\033[1m[Caution] Make sure Riak is not running!!!\033[0m~n"
"It'd be better if all hinted handoff have been finished before stopping Riak.~n",
[])
end.
-spec open_all_bitcask(filename:filename()) ->
orddict:orddict(non_neg_integer(), reference()).
open_all_bitcask(BitcaskDir) ->
%% 3> file:list_dir("/home/").
%% {ok,["wang","lfs"]}
%% 隠しファイルを含めて出力
{ok, List} = file:list_dir(BitcaskDir),
FilterFun = fun(X) ->
%% 0-9で始まるディレクトリを取得
%% Todo `re`に`+`の詳細. 一個以上? 1を含む?
case re:run(X, "^[0-9]+$") of
{match, _} ->
true;
_ ->
io:format("skipping ~p in the bitcask dir.~n", [X]),
false
end
end,
%% 上の無名関数を利用して、0-9で始まるディレクトリのリストを取得
%% ディレクトリは絶対パス名ではなく、相対ファイル名
%% `981946412581700398168100746981252653831329677312`のようなもののリスト[98..., ...]
DataDirs = lists:filter(FilterFun, List),
%% `File`は`981946412581700398168100746981252653831329677312`のようなもの
Result = lists:map(fun(File) ->
% フルパスのFilenameファイル名を組み立てる。結果は
% /var/lib/riak/bitcask/981946412581700398168100746981252653831329677312
% のようなもの
Filename = filename:join(BitcaskDir, File),
case bitcask:open(Filename, [read_write]) of
Ref when is_reference(Ref) ->
{list_to_integer(File), Ref};
Other ->
error({File, Other})
end
end, DataDirs),
%% 14> D = orddict:from_list([{1, "a"},{2,"b"}]).
%% [{1,"a"},{2,"b"}]
%% 15> orddict:fetch(1,D).
%% "a"
orddict:from_list(Result).
-spec close_all_bitcask(orddict:orddict(non_neg_integer(), reference())) -> ok.
close_all_bitcask(Bitcasks) ->
orddict:map(fun(_, Ref) ->
bitcask:close(Ref)
end, Bitcasks).
%% New bitcask 1.7 format (Riak 2.0 or later)
-define(VERSION_1, 1).
-define(VERSION_BYTE, ?VERSION_1).
make_sure(Dir, AutomaticYes) ->
io:format(standard_error,
"\033[31m[Warning]\033\[0m~n"
"Make sure any Riak process using '~s' is not running "
"or your data may corrupt.~n", [filename:absname(Dir)]),
case AutomaticYes of
true ->
io:format(standard_error, "Accept the terms of conditions? [y/N] y~n", []);
false ->
"y\n" = io:get_line("Accept the terms of conditions? [y/N] ")
end.
%% BitcaskDir -> /var/lib/riak/bitcask/
%% BlocksListFile -> /tmp/tmp.txt
offline_delete(BitcaskDir, BlocksListFile, Options) ->
make_sure(BitcaskDir, proplists:get_value(yes, Options)),
% ファイルをオープン
{ok, Fd} = file:open(BlocksListFile, [read]),
%% `BitcaskDir`を開いて、大量のディレクトリをorddictに保存する
%% `BC`のtypeはorddict
BC = open_all_bitcask(BitcaskDir),
io:format(standard_error, "~p bitcask directories at ~s opened.~n",
[length(BC), BitcaskDir]),
BKVersion = case proplists:get_value(old_format, Options) of
false -> ?VERSION_1;
true -> 0
end,
% BKVersionをチェック、1になっているはず、
io:format(standard_error, "Using bitcask key version: ~p.~n",
[BKVersion]),
%% 現状に合わせて、option/0関数にRingSizeを修正する必要があるよう
RingSize = proplists:get_value(ring_size, Options),
% 仮実行の場合bitcask:get()が実行される。bitcask:delete()が実行されない
DryRun = proplists:get_value(dry_run, Options),
{ok, Deleted} = for_each_line(Fd, BC, RingSize, DryRun, 0, BKVersion),
%% io:format(standard_error, "~p~n", [BC]),
Verb = case DryRun of
true -> "scanned";
false -> "deleted"
end,
io:format(standard_error,
"~p blocks at ~s was ~s (dry run: ~p).~n",
[Deleted, BitcaskDir, Verb, DryRun]),
close_all_bitcask(BC),
ok = file:close(Fd).
%% `Fd`は/tmp/tmp.txtのこと
for_each_line(Fd, BC, RingSize, DryRun, Count, BKVersion) ->
case Count rem 1000 of
500 ->
io:format(standard_error,
"~p blocks has been deleted.~n",
[Count]);
_ ->
noop
end,
%% /tmp/tmp.txtを一行ずつ読み込む
case file:read_line(Fd) of
{ok, Line} ->
%% `\t`, ` `, `\n`は全部seperatorとして扱う
Tokens = string:tokens(Line, "\t \n"),
% `B`と`K`は/tmp/tmp.txtの一個目と2個目のフィールド
[B, K | _Rest] = Tokens,
% `B`をバイナリにする
Bucket = mochihex:to_bin(B),
% `K`をバイナリにする
Key = mochihex:to_bin(K),
%% 削除しようとしているオブジェクトストレージの準備ができているようで
%% 三つのレプリケーションを全部削除しようとしている
[V1, V2, V3] = vnode_ids({Bucket, Key}, RingSize, 3),
%% io:format("trying ~p~n", [{V1, V2, V3,
%% _UUIDStr,
%% list_to_integer(_SeqNo)}]),
%% maybe_deleteは実際に削除を行う関数
%% 削除としている対象Setは`BC`
%% V1, V2, V3は三つのレプリケーションのvnodeかな?
C0 = maybe_delete(BC, V1, Bucket, Key, DryRun, BKVersion),
C1 = maybe_delete(BC, V2, Bucket, Key, DryRun, BKVersion),
C2 = maybe_delete(BC, V3, Bucket, Key, DryRun, BKVersion),
for_each_line(Fd, BC, RingSize, DryRun, Count+C0+C1+C2, BKVersion);
eof ->
{ok, Count};
{error, Reason} ->
io:format(standard_error, "Error: ~p~n", Reason)
end.
maybe_delete(BC, Idx, Bucket, Key, DryRun, BKVersion) ->
case orddict:find(Idx, BC) of
{ok, Bitcask} ->
%% bitcaskのバージョン(0か1)によって、keyの組み立て方違う
BitcaskKey = make_bk(BKVersion, Bucket, Key),
case (case DryRun of
true ->
bitcask:get(Bitcask, BitcaskKey);
false ->
bitcask:delete(Bitcask, BitcaskKey)
end) of
{ok, _Value} ->
1;
ok ->
1;
%% bitcask:delete失敗すると、エラーが出る
Error ->
io:format(standard_error, "error: ~p ~n", [Error]),
0
end;
error ->
%% Key does not exist here. Ignore.
0
end.
%% Old bitcask format (Riak 1.4 or before)
make_bk(0, Bucket, Key) ->
term_to_binary({Bucket, Key});
%% New bitcask 1.7 format (Riak 2.0 or later)
make_bk(1, {Type, Bucket}, Key) ->
TypeSz = size(Type),
BucketSz = size(Bucket),
<<?VERSION_BYTE:7, 1:1, TypeSz:16/integer, Type/binary,
BucketSz:16/integer, Bucket/binary, Key/binary>>;
%% New bitcask 1.7 format (Riak 2.0 or later)
make_bk(1, Bucket, Key) ->
BucketSz = size(Bucket),
<<?VERSION_BYTE:7, 0:1, BucketSz:16/integer,
Bucket/binary, Key/binary>>.
vnode_ids(BKey, RingSize, NVal) ->
%% chase:key_ofはriak_coreの関数
%% https://github.com/basho/riak_core/blob/develop/src/chash.erl
<<HashKey:160/integer>> = chash:key_of(BKey),
Inc = chash:ring_increment(RingSize),
PartitionId = ((HashKey div Inc) + 1) rem RingSize,
[((PartitionId+N) rem RingSize) * Inc || N <- lists:seq(0, NVal-1)].
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment