But before some questions
Do you know riak?
Do you know riak_core? at least from name?
git clone https://github.com/basho/rebar_riak_core.git
cd rebar_riak_core
make install
mkdir flaviodb
cd flaviodb
# download rebar and set executable permissions
wget http://cloud.github.com/downloads/basho/rebar/rebar
chmod u+x rebar
# create project from riak_core template with app id set to flavio
./rebar create template=riak_core appid=flavio
edit rebar.config
riak_core 2.0.0
lager 2.0.3
make rel
if you use Erlang >= 17
Start console
./rel/flavio/bin/flavio console
Play with it
(flavio@> flavio:ping().
ping() ->
DocIdx = riak_core_util:chash_key({<<"ping">>,
PrefList = riak_core_apl:get_primary_apl(DocIdx, 1, flavio),
[{IndexNode, _Type}] = PrefList,
riak_core_vnode_master:sync_spawn_command(IndexNode, ping,
-export([start_vnode/1, init/1, terminate/2,
handle_command/3, is_empty/1, delete/1,
handle_handoff_command/3, handoff_starting/2,
handoff_cancelled/1, handoff_finished/2,
handle_handoff_data/2, encode_handoff_item/2,
handle_coverage/4, handle_exit/3]).
-record(state, {partition}).
start_vnode(I) ->
riak_core_vnode_master:get_vnode_pid(I, ?MODULE).
init([Partition]) ->
{ok, #state { partition=Partition }}.
handle_command(ping, _Sender, State) ->
{reply, {pong, State#state.partition}, State};
handle_command(Message, _Sender, State) ->
?PRINT({unhandled_command, Message}),
{noreply, State}.
make devrel
mkdir -p dev
rel/gen_dev dev1 rel/vars/dev_vars.config.src rel/vars/dev1_vars.config
Generating dev1 - node='[email protected]' http=10018 handoff=10019
(cd rel && /home/mariano/src/rct/flaviodb/rebar generate
target_dir=../dev/dev1 overlay_vars=vars/dev1_vars.config)
mkdir -p dev
rel/gen_dev dev4 rel/vars/dev_vars.config.src rel/vars/dev4_vars.config
Generating dev4 - node='[email protected]' http=10048 handoff=10049
(cd rel && /home/mariano/src/rct/flaviodb/rebar generate
target_dir=../dev/dev4 overlay_vars=vars/dev4_vars.config)
for d in dev/dev*; do $d/bin/flavio start; done
for d in dev/dev*; do $d/bin/flavio ping; done
$ dev/dev1/bin/flavio-admin member-status
================================= Membership ==================================
Status Ring Pending Node
valid 100.0% -- '[email protected]'
Valid:1 / Leaving:0 / Exiting:0 / Joining:0 / Down:0
$ dev/dev4/bin/flavio-admin member-status
================================= Membership ==================================
Status Ring Pending Node
valid 100.0% -- '[email protected]'
Valid:1 / Leaving:0 / Exiting:0 / Joining:0 / Down:0
$ for d in dev/dev{2,3,4}; do
$d/bin/flavio-admin cluster join [email protected];
Success: staged join request for '[email protected]' to '[email protected]'
Success: staged join request for '[email protected]' to '[email protected]'
Success: staged join request for '[email protected]' to '[email protected]'
$ dev/dev1/bin/flavio-admin member-status
================================= Membership ==================================
Status Ring Pending Node
joining 0.0% -- '[email protected]'
joining 0.0% -- '[email protected]'
joining 0.0% -- '[email protected]'
valid 100.0% -- '[email protected]'
Valid:1 / Leaving:0 / Exiting:0 / Joining:3 / Down:0
$ dev/dev1/bin/flavio-admin cluster plan
=============================== Staged Changes ================================
Action Details(s)
join '[email protected]'
join '[email protected]'
join '[email protected]'
NOTE: Applying these changes will result in 1 cluster transition
After cluster transition 1/1
================================= Membership ==================================
Status Ring Pending Node
valid 100.0% 25.0% '[email protected]'
valid 0.0% 25.0% '[email protected]'
valid 0.0% 25.0% '[email protected]'
valid 0.0% 25.0% '[email protected]'
Valid:4 / Leaving:0 / Exiting:0 / Joining:0 / Down:0
Transfers resulting from cluster changes: 48
16 transfers from '[email protected]' to '[email protected]'
16 transfers from '[email protected]' to '[email protected]'
16 transfers from '[email protected]' to '[email protected]'
$ dev/dev1/bin/flavio-admin cluster commit
Cluster changes committed
$ dev/dev1/bin/flavio-admin member-status
================================= Membership ==================================
Status Ring Pending Node
valid 25.0% -- '[email protected]'
valid 25.0% -- '[email protected]'
valid 25.0% -- '[email protected]'
valid 25.0% -- '[email protected]'
Valid:4 / Leaving:0 / Exiting:0 / Joining:0 / Down:0
From node 1:
$ dev/dev1/bin/flavio attach
Attaching to /tmp/.../flaviodb/dev/dev1/erlang.pipe.1 (^D to exit)
(flavio1@> flavio:ping().
(flavio1@> [Quit]
From node 3:
$ dev/dev3/bin/flavio attach
Attaching to /tmp/.../flaviodb/dev/dev3/erlang.pipe.1 (^D to exit)
(flavio3@> flavio:ping().
(flavio3@> [Quit]
-export([ping/0, add/2]).
add(A, B) ->
DocIdx = riak_core_util:chash_key({<<"add">>,
term_to_binary({A, B})}),
PrefList = riak_core_apl:get_primary_apl(DocIdx, 1, flavio),
[{IndexNode, _Type}] = PrefList,
riak_core_vnode_master:sync_spawn_command(IndexNode, {add, A, B},
handle_command({add, A, B}, _Sender, State) ->
{reply, {A + B, State#state.partition}, State};
rm -rf rel/flavio
make rel
./rel/flavio/bin/flavio console
(flavio@> flavio:add(2, 5).
(flavio@> flavio:add(2, 5).
(flavio@> flavio:add(3, 5).
(flavio@> flavio:add(3, 5).
(flavio@> flavio:add(2, 9).
(flavio@> flavio:add(2, 9).
-record(state, {partition, ops_count=0}).
handle_command({add, A, B}, _Sender,
State=#state{ops_count=CurrentCount}) ->
NewCount = CurrentCount + 1,
NewState = State#state{ops_count=NewCount},
{reply, {A + B, State#state.partition}, NewState};
stats() ->
Timeout = 5000,
flavio_coverage_fsm:start(stats, Timeout).
init(_Args) ->
VMaster = { flavio_vnode_master,
{riak_core_vnode_master, start_link, [flavio_vnode]},
permanent, 5000, worker, [riak_core_vnode_master]},
CoverageFSMs = {flavio_coverage_fsm_sup,
{flavio_coverage_fsm_sup, start_link, []},
permanent, infinity, supervisor,
{ok, { {one_for_one, 5, 10}, [VMaster, CoverageFSMs]}}.
handle_coverage(stats, _KeySpaces, {_, RefId, _},
State=#state{ops_count=OpsCount}) ->
{reply, {RefId, [{ops_count, OpsCount}]}, State};
handle_coverage(Req, _KeySpaces, _Sender, State) ->
lager:warning("unknown coverage received ~p", [Req]),
{norepl, State}.
(flavio@> flavio:stats().
{ok,[ lot of output here]}
% use the api a little
(flavio@> flavio:add(2, 5).
(flavio@> flavio:stats().
{ok,[ lot of output here, maybe you can see some with ops_count > 0]}
% filter the output to see interesting info
10> lists:filter(fun ({_, _, [{ops_count, OpsCount}]}) ->
OpsCount > 0
end, Stats).
'[email protected]', [{ops_count,3}]},
'[email protected]', [{ops_count,1}]},
'[email protected]', [{ops_count,2}]}]
+------+ +---------+ +---------+ +---------+ +------+
| | | | | | | |remaining = 0 | |
| Init +--->| Prepare +--->| Execute +--->| Waiting +------------->| Stop |
| | | | | | | | | |
+------+ +---------+ +---------+ +-------+-+ +------+
^ | |
| | | +---------+
+---+ +------->| |
| Timeout |
remaining > 0 timeout | |
handle_command({RefId, {add, {A, B}}}, _Sender,
State=#state{ops_count=CurrentCount}) ->
NewCount = CurrentCount + 1,
NewState = State#state{ops_count=NewCount},
{reply, {RefId, {A + B, State#state.partition}}, NewState};
add(A, B) ->
N = 3,
W = 3,
Timeout = 5000,
{ok, ReqID} = flavio_op_fsm:op(N, W, {add, {A, B}}),
wait_for_reqid(ReqID, Timeout).
(flavio@> flavio:add(2, 4).
(flavio@> flavio:add(12, 4).
flavio:post_msg(Username, Stream, Msg)
{fixstt, ".*", {git, "git://github.com/marianoguerra/fixstt",
{branch, "master"}}}
post_msg(Username, Stream, Msg) ->
N = 3,
W = 3,
Timeout = 5000,
{ok, ReqID} = flavio_op_fsm:op(N, W, {post_msg, {Username, Stream, Msg}},
{Username, Stream}),
wait_for_reqid(ReqID, Timeout).
handle_command({RefId, {post_msg, {Username, Stream, Msg}}}, _Sender,
State=#state{partition=Partition}) ->
PartitionStr = integer_to_list(Partition),
StreamPath = filename:join([PartitionStr, Username, Stream, "msgs"]),
ok = filelib:ensure_dir(StreamPath),
{ok, StreamIo} = fixsttio:open(StreamPath),
Entry = fixstt:new(Msg),
{ok, _NewStream, EntryId} = fixsttio:append(StreamIo, Entry),
EntryWithId = fixstt:set(Entry, id, EntryId),
{reply, {RefId, {EntryWithId, State#state.partition}}, State};
(flavio@> flavio:post_msg(<<"mariano">>, <<"english">>,
<<"hello world!">>).
{ok,[{{fixstt,1,9001,9001,12,1416928004032,0,0, <<"hello world!">>},
{{fixstt,1,9001,9001,12,1416928004032,0,0, <<"hello world!">>},
{{fixstt,1,9001,9001,12,1416928004032,0,0, <<"hello world!">>},
(flavio@> flavio:post_msg(<<"mariano">>, <<"spanish">>,
<<"hola mundo!">>).
{ok,[{{fixstt,1,9001,9001,11,1416928004035,0,0, <<"hola mundo!">>},
{{fixstt,1,9001,9001,11,1416928004035,0,0,<<"hola mundo!">>},
{{fixstt,1,9001,9001,11,1416928004035,0,0,<<"hola mundo!">>},
$ cd rel/flavio
$ find -name msgs
handle_command({RefId, {get_msgs, {Username, Stream, Id, Count}}}, _Sender,
State) ->
{ok, StreamIo} = get_stream(State, Username, Stream),
Result = case fixsttio:read(StreamIo, Id, Count) of
{ok, StreamIo1, Entries} ->
{ok, _StreamIo2} = fixstt:close(StreamIo1),
{ok, Entries};
Other -> Other
{reply, {RefId, {Result, State#state.partition}}, State};
(flavio@> % query from mariano/spanish from id 1, get 1 post
(flavio@> flavio:get_msgs(<<"mariano">>, <<"spanish">>, 1, 1).
{ok,[{{ok,[{fixstt,1,9001.0,9001.0,11,1416930275765,0,0, <<"hola mundo!">>}]},
{{ok,[{fixstt,1,9001.0,9001.0,11,1416930275765,0,0, <<"hola mundo!">>}]},
{{ok,[{fixstt,1,9001.0,9001.0,11,1416930275765,0,0, <<"hola mundo!">>}]},
list_streams(Username) ->
Timeout = 5000,
flavio_coverage_fsm:start({list_streams, Username}, Timeout).
and the implementation:
handle_coverage({list_streams, Username}, _KeySpaces, {_, RefId, _}, State) ->
Streams = lists:sort(list_streams(State, Username)),
{reply, {RefId, {ok, Streams}}, State};
list users is implemented similarly
+-----------+      +----------+        +----------+
|           | true |          | false  |          |
| Starting  +------> is_empty +--------> fold_req |
|           |      |          |        |          |
+-----+-----+      +----+-----+        +----+-----+
      |                 |                   |
      | false           | true              | ok
      |                 |                   |
+-----v-----+           |              +----v-----+     +--------+
|           |           |              |          |     |        |
| Cancelled |           +--------------> finished +-----> delete |
|           |                          |          |     |        |
+-----------+                          +----------+     +--------+
handle_handoff_command(?FOLD_REQ{foldfun=Fun, acc0=Acc0}, _Sender, State) ->
% pseudocode
for Stream in AllUserStreams:
for Key, Entry in get_entries(Stream):
% pardon the mutability, it is just to make the code smaller
Acc = Fun(Key, Entry, Acc)
return reply, Acc, State
encode_handoff_item(Key, Value) ->
term_to_binary({Key, Value}).
handle_handoff_data(BinData, State) ->
TermData = binary_to_term(BinData),
{Key, Value} = TermData,
% do something with it here
handle_handoff_command(?FOLD_REQ{foldfun=Fun, acc0=Acc0}, _Sender,
State=#state{partition=Partition}) ->
AllPairs = list_all(State),
HandlePair = fun (Key={Username, StreamName}, AccIn) ->
HandleEntry = fun (Entry, AccIn0) ->
AccIn1 = Fun(Key, Entry, AccIn0),
{continue, AccIn1}
StreamPath = stream_path(State, Username, StreamName),
{ok, FixSttIo} = fixsttio:open(StreamPath),
{ok, AccIn1} = fixsttio:iterate(FixSttIo, HandleEntry, AccIn),
{ok, _ClosedFixSttIo} = fixsttio:close(FixSttIo),
AccFinal = lists:foldl(HandlePair, Acc0, AllPairs),
{reply, AccFinal, State};
build devrel
start 1 node
write to it
start another node
join the first node
watch handoff
confirm data moved
multiply it by 32
([email protected])12>
10:53:57.316 [info] '[email protected]' joined cluster with status 'joining'
10:54:26.600 [info] handoff starting 456...
10:54:26.602 [info] handoff is empty? false 228...
10:54:26.603 [info] handoff cancelled 114...
10:54:26.619 [info] Starting ownership_transfer transfer of flavio_vnode
from '[email protected]' 228... to '[email protected]' 228...
10:54:26.620 [info] fold req 456...
10:54:26.620 [info] handling handoff for patrick/spanish
10:54:26.667 [info] ownership_transfer transfer of flavio_vnode
from '[email protected]' 456...
to '[email protected]' 456...
completed: sent 1.08 KB bytes in 10 of 10 objects in 0.05 seconds
(23.45 KB/second)
10:54:26.668 [info] handoff finished 228...
10:54:26.681 [info] handoff delete flavio_data/456...
10:54:26.683 [info] terminate 456...: normal
multiply it by 32
([email protected])1>
10:54:21.864 [info] '[email protected]' changed from 'joining' to 'valid'
10:54:26.620 [info] Receiving handoff data for partition flavio_vnode:456...
from {"",34478}
10:54:26.669 [info] Handoff receiver for partition 228... exited after
processing 10 objects from {"",32835}
10:54:36.614 [info] Receiving handoff data for partition flavio_vnode:137...
from {"",53206}
10:55:23.619 [info] handoff starting 685...
10:55:23.639 [info] handoff is empty? true 137...
10:55:23.639 [info] handoff delete flavio_data/137...
10:55:23.640 [info] terminate 890...: normal
$ tree dev/dev1/flavio_data
├── 0
│  └── patrick
│  └── spanish
│  └── msgs
├── 1073290264914881830555831049026020342559825461248
│  └── gary
│  └── english
│  └── msgs
├── 1164634117248063262943561351070788031288321245184
│  ├── bob
│  │  └── riak_core
│  │  └── msgs
│  └── gary
│  └── spanish
│  └── msgs
├── 707914855582156101004909840846949587645842325504
│  └── sandy
│  └── erlang
│  └── msgs
└── 91343852333181432387730302044767688728495783936
└── sandy
└── english
└── msgs
63 directories, 22 files
$ tree dev/dev2/flavio_data
├── 1118962191081472546749696200048404186924073353216
│  ├── bob
│  │  └── riak_core
│  │  └── msgs
│  └── gary
│  └── english
│  └── msgs
├── 662242929415565384811044689824565743281594433536
│  ├── patrick
│  │  └── english
│  │  └── msgs
│  └── sandy
│  └── erlang
│  └── msgs
└── 685078892498860742907977265335757665463718379520
├── patrick
│  └── english
│  └── msgs
└── sandy
└── erlang
└── msgs
67 directories, 26 files
{cowboy, "1.0.0", {git, "https://github.com/ninenines/cowboy", {tag, "1.0.0"}}},
{jsxn, ".*", {git, "https://github.com/talentdeficit/jsxn", {tag, "v2.1.1"}}}
start(_StartType, _StartArgs) ->
Dispatch = cowboy_router:compile([
{'_', [{"/msgs/:user/:topic", handler_flavio_msgs, []}]}
ApiPort = 8080,
ApiAcceptors = 100,
{ok, _} = cowboy:start_http(http, ApiAcceptors, [{port, ApiPort}], [
{env, [{dispatch, Dispatch}]}
-record(state, {username, topic}).
init({tcp, http}, _Req, _Opts) -> {upgrade, protocol, cowboy_rest}.
rest_init(Req, []) ->
{Username, Req1} = cowboy_req:binding(username, Req),
{Topic, Req2} = cowboy_req:binding(topic, Req1),
{ok, Req2, #state{username=Username, topic=Topic}}.
allowed_methods(Req, State) -> {[<<"POST">>], Req, State}.
content_types_accepted(Req, State) ->
{[{{<<"application">>, <<"json">>, '*'}, from_json}], Req, State}.
from_json(Req, State=#state{username=Username, topic=Topic}) ->
{ok, Body, Req1} = cowboy_req:body(Req),
case jsx:is_json(Body) of
true ->
Data = jsx:decode(Body),
Msg = proplists:get_value(<<"msg">>, Data, nil),
if is_binary(Msg) ->
{ok, [FirstResponse|_]} = flavio:post_msg(Username,
Topic, Msg),
{{ok, Entity}, _Partition} = FirstResponse,
EntityPList = fixstt:to_proplist(Entity),
EntityJson = jsx:encode(EntityPList),
response(Req, State, EntityJson);
true ->
bad_request(Req1, State, <<"{\"type\": \"no-msg\"}">>)
false ->
bad_request(Req1, State, <<"{\"type\": \"invalid-body\"}">>)
$ curl -X POST http://localhost:8080/msgs/mariano/english \
-H "Content-Type: application/json" -d '{"msg": "hello world"}'
"msg":"hello world"}
$ curl -X POST http://localhost:8080/msgs/mariano/english \
-H "Content-Type: application/json" -d '{"msg": "hello world again"}'
"msg":"hello world again"}
$ curl -X POST http://localhost:8080/msgs/mariano/spanish \
-H "Content-Type: application/json" -d '{"msg": "hola mundo"}'
"msg":"hola mundo"}
$ curl -X POST http://localhost:8080/msgs/mariano/spanish \
-H "Content-Type: application/json" -d '{"msg": "hola mundo nuevamente"}'
"msg":"hola mundo nuevamente"}
-record(state, {username, topic, from, limit}).
rest_init(Req, []) ->
{Username, Req1} = cowboy_req:binding(username, Req),
{Topic, Req2} = cowboy_req:binding(topic, Req1),
{FromStr, Req3} = cowboy_req:qs_val(<<"from">>, Req2, <<"">>),
{LimitStr, Req4} = cowboy_req:qs_val(<<"limit">>, Req3, <<"1">>),
From = to_int_or(FromStr, nil),
Limit = to_int_or(LimitStr, 1),
{ok, Req4,
#state{username=Username, topic=Topic, from=From, limit=Limit}}.
allowed_methods(Req, State) -> {[<<"POST">>, <<"GET">>], Req, State}.
content_types_provided(Req, State) ->
{[{{<<"application">>, <<"json">>, '*'}, to_json}], Req, State}.
to_json(Req, State=#state{username=Username, topic=Topic, from=From,
limit=Limit}) ->
{ok, [FirstResponse|_]} = flavio:get_msgs(Username, Topic, From,
{{ok, Entities}, _Partition} = FirstResponse,
EntitiesPList = lists:map(fun fixstt:to_proplist/1, Entities),
EntitiesJson = jsx:encode(EntitiesPList),
{EntitiesJson, Req, State}.
$ curl http://localhost:8080/msgs/mariano/spanish\?from\=1\&limit\=1
"msg":"hola mundo"}]
$ curl http://localhost:8080/msgs/mariano/spanish\?from\=1\&limit\=2
"msg":"hola mundo"},
"msg":"hola mundo nuevamente"}]
$ curl http://localhost:8080/msgs/mariano/spanish\?from\=1\&limit\=20
"msg":"hola mundo"},
"msg":"hola mundo nuevamente"}]
$ curl http://localhost:8080/msgs/mariano/spanish\?limit\=20
"msg":"hola mundo"},
"msg":"hola mundo nuevamente"}]
$ curl http://localhost:8080/msgs/mariano/euskera\?limit\=20
cache stream handles
instead of open/close for each request
pub/sub for users/topics with bullet
riak_core_security for auth/permissions
a web ui