- Starting: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_wm_object.erl#L619
- We create a new
riak_object
and populate the various fields with the headers, metadata supplied by the client. - Big suprise, we eventually call
riak_client:put
: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_client.erl#L143 - If/when the client returns any errors these are handled in
handle_common_errors
and it is nice to return human readable errors to client :)
- The client creates a request ID and the normal path does not increment the vclock(yet).
- We then
start_link
ariak_kv_put_fsm
: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L131 - Eventually the FSM gets back to us at
wait_for_reqid
which is where we wait untiltimout
: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_client.erl#L607
- We start the FSM through sidejob(assuming overload protections isn't disabled) and sidejob may say we're overloaded.
- We then call
gen_fsm:start_link
which creates a new fsm at theinit
ofriak_kv_put_fsm
: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L174 - Start by initializing our state, notifying the monitor count of our start, and check if the
riak_object
is tombstone. We eventually end up inprepare
state: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L210 prepare
gets the bucket properties, hashes the bucket/key(riak_core_util:chash_key(BKey)
), and decides/validates the n_val. We then create the preflist for the hash: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L233- We then check if the local node is in the preflist or if we need to forward to another node for coordination. If as-is is set, skip coordination all together.
- Forwarding to another node starts a brand new FSM which will eventually report back to the
riak_client
waiting on RequestID: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L255 - Handling coordination locally sends us to
validate
state(starting our FSM timeout): https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L295
validate
checks all write specific request options(W,DW,PW,etc) and throws an error for those not met: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L324- We then iterate the hooks in the bucket props, update out state, start the
riak_core_put_core
and enterprecommit
state(assuming there are precommit hooks to run): https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L374 precommit
iterates over all the hook functions, returns the result, and update theriak_object
if necessary. We eventually end up atexecute
: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L390execute
either immediately callsexecute_remote
which skips all coordination(in the case of as-is being set) or(common case)execute_local
: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L402execute_local
callsriak_kv_vnode:coord_put
and then waits inwaiting_local_vnode
state: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L422riak_kv_vnode:coord_put
returns a failure or returns success with or without a newriak_object
depending on if an object existed. We then callexecute_remote
with/without an updated object: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L452execute_remote
callriak_kv_vnode:put
and then sits waiting for vnode responses and adding them toriak_kv_put_core:add_result
: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_core.erl#L81
riak_kv_put_core:add_result
updates theriak_kv_put_core
state and eventuallyenough
will return true or false: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L483riak_kv_put_core:response
then returns the necessary response(success or failure with/without object). We then enterprocess_reply
of the put fsm: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L575
process_reply
callsclient_reply
which sends the results of the put back to the client: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L870- We thenhen iterates over all postcommit hooks. If the client reply is not an error, we call
postcommit
state: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L495 postcommit
state runs all postcommit hooks, even if the client request has timed out, and eventually callsfinish
state: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_put_fsm.erl#L515finish
state finalizes all stats and updates theriak_kv_stat
module. We then callstop
state which stops the fsm.
riak_kv_vnode:coord_put
:https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_vnode.erl#L229- This function starts a
riak_core_vnode_master:command
of type ?KV_PUT_REQ but only on IndexNode and with the specialcoord
option set: https://github.com/basho/riak_core/blob/1.4.2/src/riak_core_vnode_master.erl#L74 - Since this is a list of one node, we only send one
gen_fsm:send_event
to the local vnode we are using as the coordinating put: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_vnode.erl#L395 - We then immediately reply(because not always DW) with
riak_core_vnode:reply
and thendo_put
: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_vnode.erl#L900 do_put
checks if this is a coordinating put by looking for thecoord
option, if true it makes sure to return the body of the put. We then callprepare_put
: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_vnode.erl#L954prepare_put
checks LWW and if true increments the vecotr clock for the coordinating put. If not true, it then calls the nextprepare_put
function: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_vnode.erl#L981prepare_put
callsdo_get_object
which returns a not found or riak_object. If not found, we callprepare_new_put
which simpy increments the vecotr clock for the coordinating put. If existing, we callput_merge
: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_vnode.erl#L1154put_merge
is the function to look at if interested in LWW and allow_mult. Basically, either a new_obj or an old_obj atom are returned back toprepare_put
with an incremented vclock either way in the case of a coordinating put.- Back in
prepare_put
we'll always be in the new_obj code because we are coordinating: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_vnode.erl#L1014 - Once we finish merging siblings and trimming vclocks we return a new state back to
do_put
. This then callsperform_put
: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_vnode.erl#L1071 - For coordinating puts we will always be in the
true
function. We thenencode_and_put
: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_vnode.erl#L1670 encode_and_put
callsMod:put
orMod:object_put
depending on riak_object format used. Mod =riak_kv_X_backend
.- We then update our AAE hashtrees and reply back to the put FSM.
- Exactly the same as coordinating PUT except it does not have the
coord
option set. This operation is run on the remaining vnodes in the preflist after the coordinating put has been succesful. - The main operations are
do_put
andprepare_put
which eventually callsMod:put
responding back to the FSM before and after sending the put to the backend.
- Entry point is
riak_kv_bitcask_backend
:put
orput_object
depending on the riak_object format used: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_bitcask_backend.erl#L166 - After a
binary_to_term
(b2t) call we hitbitcask:put
: https://github.com/basho/bitcask/blob/1.6/src/bitcask.erl#L232 - This function call
do_put
and then returns to the riak_kv backend module. - More work to do here, but it's all in the bitcask backend.
- Entry point is
riak_kv_eleveldb_backend
:put
orput_object
depending on the riak_object format used: https://github.com/basho/eleveldb/blob/develop/src/eleveldb.erl#L148 - We call the
write
function, which in turn calls theasync_write
function: https://github.com/basho/eleveldb/blob/develop/src/eleveldb.erl#L160 - The rest of the work here is in the
leveldb
NIF. Another day.
- Entry point is
riak_kv_memory_backend
:put
orput_object
depending on the riak_object format used: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_memory_backend.erl#L198 - We eventually call
do_put
which callsets:insert
to insert the object: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_memory_backend.erl#L550 - We then update the index table either with a
remove
oradd
atom determining if weets:delete
orets:insert
: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_memory_backend.erl#L559
- Entry point is
riak_kv_multi_backend
:put
orput_object
depending on the riak_object format used:https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_multi_backend.erl#L228 - We call
get_backend
: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_multi_backend.erl#L435 - This queries bucket properties, identifies the backend, and validates it.
- We then call the appropriate
Module:put
where Module is our backend: https://github.com/basho/riak_kv/blob/1.4.2/src/riak_kv_multi_backend.erl#L230