Riak の削除周り[Lv1](Riak Source Code Reading @東京 #7)
Riak Source Code Reading @東京 #7 - connpassいってきた。
私の発表のはずだったんですが、ちょっとバタバタしてて事前の準備ができなかったので
会場の皆様にご協力いただいてソースコードを読んできました。
特に@ksauzz さん、 @shkumagai さん、ありがとうございました。
とりあえず、覚えてる内容を書き留めておきます。
RiakのdeleteおよびTombstone
今日は分散システムでの鬼門である削除についてお話しします。
とりあえず鬼門なのは以下みたいな感じ。
- レプリケーション
- VectorClock
- 格納してるノードが死んでたらどうすんの?
- sibling
とかとかいろいろあります。
削除の基本的な流れ
- 削除するキーを空値(これをRiakではTombstone[お墓]と呼ぶ)で更新する。
- この先どうなるかは不明.(宿題扱い)
- 多分スケジューラーで実際のキーに基づいてる値が削除される。
- 物理削除する?しない?答えはbitcask/eleveldbのソースにあるはず
関連するソース
riak_core | dcf8e52fbc867d9d32a897edd9cfbf381d412680 |
riak_kv | 38a1d6dd5cf77262b18fca90a7581e70d84ec266 |
bitcask | f6736fe5d1ff96108f6148198180aaa41f235f7b |
eleveldb | 5f213105b2e2b8a0bb132ade591118862de5f0fa |
関連するドキュメント
RESTful APIでの削除
api
### 処理フロー
|<--- riak_kv --->| webmachine --- riak_kv_wm_object --+ (HTTP) | +--- riak_client --- riak_kv_delete | riak_api --- riak_kv_pb_object --+ (Protocol Buffer)
読んだソースファイル
https://github.com/basho/riak_kv/blob/master/src/riak_kv_wm_object.erl
delete_resource(RD, Ctx=#ctx{bucket=B, key=K, client=C, rw=RW, r=R, w=W, pr=PR, pw=PW, dw=DW, timeout=Timeout}) -> .... %% wrqって何? Result = case wrq:get_req_header(?HEAD_VCLOCK, RD) of undefined -> C:delete(B,K,Options); _ -> C:delete_vclock(B,K,decode_vclock_header(RD),Options) end,
https://github.com/basho/riak_kv/blob/master/src/riak_client.erl
%% delete/3 %% Web Interface から呼ばれる delete(Bucket,Key,Options) when is_list(Options) -> delete(Bucket,Key,Options,?DEFAULT_TIMEOUT); delete(Bucket,Key,RW) -> delete(Bucket,Key,[{rw, RW}],?DEFAULT_TIMEOUT). %% delete/4 %% delete/3 から呼ばれる delete(Bucket,Key,Options,Timeout) when is_list(Options) -> Me = self(), ReqId = mk_reqid(), %% 子プロセスにdelete処理を委譲 riak_kv_delete_sup:start_delete(Node, [ReqId, Bucket, Key, Options, Timeout, Me, ClientId]), wait_for_reqid(ReqId, Timeout);
https://github.com/basho/riak_kv/blob/master/src/riak_kv_delete.erl
%% deleteは二種類ある %% ↓ riak_client:delete/3 から呼ばれるのはこちら %% 実際にここの処理は子プロセス内で実行される delete(ReqId,Bucket,Key,Options,Timeout,Client,ClientId,undefined) %% ↓ 上のdeleteからこちらが呼ばれる。実際に削除の処理はこちら delete(ReqId,Bucket,Key,Options,Timeout,Client,ClientId,VClock)
delete(ReqId,Bucket,Key,Options,Timeout,Client,ClientId,undefined)
%% GETするときにはriak:local_client 関数をClientIdなしで呼ぶ. %% POSTのときにはClientIdをつけて呼ぶ. %% ここで言ってるClientっていうのは各PUT/GETのリクエストごとに動くerlangのprocess %% process数はpoolとかをつかってるのかなぁ(宿題) {ok, C} = riak:local_client(), case C:get(Bucket,Key,[{r,R},{pr,PR},{timeout,Timeout}]) of {ok, OrigObj} -> RemainingTime = Timeout - (riak_core_util:moment() - RealStartTime), %% 実際の削除はここから呼ぶ delete(ReqId,Bucket,Key,Options,RemainingTime,Client,ClientId,riak_object:vclock(OrigObj)); case ....
delete(ReqId,Bucket,Key,Options,Timeout,Client,ClientId,VClock)
%% Bucket/Key の値を上書きする空値を作る Obj0 = riak_object:new(Bucket, Key, <<>>, dict:store(?MD_DELETED, "true", dict:new())), %% Tobmstoneきました!最新のVClockをつける Tombstone = riak_object:set_vclock(Obj0, VClock), {ok,C} = riak:local_client(ClientId), %% 実際に書き込み Reply = C:put(Tombstone, [{w,W},{pw,PW},{dw, DW},{timeout,Timeout}]), Client ! {ReqId, Reply}, case Reply of ok -> %% 書き込み成功 {ok, C2} = riak:local_client(), AsyncTimeout = 60*1000, % Avoid client-specified value %% Bucket/Keyの値を取得 Res = C2:get(Bucket, Key, all, AsyncTimeout), ?DTRACE(?C_DELETE_REAPER_GET_DONE, [1], [<<"reap">>]), Res; _ -> ...
まとめ
まだまだ、わからないところも多いので、もう少し継続して読む予定です。
特にTombStoneを立てた後で、backendでの実際の削除が走るところが知りたいなーと思ってます。
どこでやってるんでしょうねぇ。。。。
わかる方、内容でここ間違えてるよってところがあればぜひ教えてください。
その他
IntelliJのErlangプラグインを使ってソースコードリーディングを
行いました。
ソースコードからの関数ジャンプ,grep, "?から始まる変数" の展開などなどできます。
IntelliJいいすよ。無償のCommunityエディションもあるし、今では侍ズムさんが取り扱ってくれるようになったので
日本の商習慣に合わせた購入もできるようになりました。
興味のある方、軽く使ってみるといいかも。