Просмотр исходного кода

feat(ft): introduce simple filesystem storage backend + assembler

Andrew Mayorov 3 лет назад
Родитель
Сommit
81e04ce93a

+ 2 - 1
apps/emqx_ft/src/emqx_ft.app.src

@@ -5,7 +5,8 @@
     {mod, {emqx_ft_app, []}},
     {applications, [
         kernel,
-        stdlib
+        stdlib,
+        gproc
     ]},
     {env, []},
     {modules, []}

+ 14 - 0
apps/emqx_ft/src/emqx_ft.erl

@@ -30,6 +30,20 @@
     on_channel_takeovered/3
 ]).
 
+-export_type([clientid/0]).
+-export_type([transfer/0]).
+-export_type([offset/0]).
+
+%% Number of bytes
+-type bytes() :: non_neg_integer().
+
+%% MQTT Client ID
+-type clientid() :: emqx_types:clientid().
+
+-type fileid() :: binary().
+-type transfer() :: {clientid(), fileid()}.
+-type offset() :: bytes().
+
 -type ft_data() :: #{
     nodes := list(node())
 }.

+ 160 - 0
apps/emqx_ft/src/emqx_ft_assembler.erl

@@ -0,0 +1,160 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_assembler).
+
+-export([start_link/2]).
+
+-behaviour(gen_statem).
+-export([callback_mode/0]).
+-export([init/1]).
+% -export([list_local_fragments/3]).
+% -export([list_remote_fragments/3]).
+% -export([start_assembling/3]).
+-export([handle_event/4]).
+
+% -export([handle_continue/2]).
+% -export([handle_call/3]).
+% -export([handle_cast/2]).
+
+-record(st, {
+    storage :: _Storage,
+    transfer :: emqx_ft:transfer(),
+    assembly :: _TODO,
+    file :: io:device(),
+    hash
+}).
+
+-define(RPC_LIST_TIMEOUT, 1000).
+-define(RPC_READSEG_TIMEOUT, 5000).
+
+%%
+
+start_link(Storage, Transfer) ->
+    gen_server:start_link(?MODULE, {Storage, Transfer}, []).
+
+%%
+
+-define(internal(C), {next_event, internal, C}).
+
+callback_mode() ->
+    handle_event_function.
+
+init({Storage, Transfer}) ->
+    St = #st{
+        storage = Storage,
+        transfer = Transfer,
+        assembly = emqx_ft_assembly:new(),
+        hash = crypto:hash_init(sha256)
+    },
+    {ok, list_local_fragments, St, ?internal([])}.
+
+handle_event(list_local_fragments, internal, _, St = #st{assembly = Asm}) ->
+    % TODO: what we do with non-transients errors here (e.g. `eacces`)?
+    {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer),
+    NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
+    NSt = St#st{assembly = NAsm},
+    case emqx_ft_assembly:status(NAsm) of
+        complete ->
+            {next_state, start_assembling, NSt, ?internal([])};
+        {incomplete, _} ->
+            Nodes = ekka:nodelist() -- [node()],
+            {next_state, {list_remote_fragments, Nodes}, NSt, ?internal([])}
+        % TODO: recovery?
+        % {error, _} = Reason ->
+        %     {stop, Reason}
+    end;
+handle_event({list_remote_fragments, Nodes}, internal, _, St) ->
+    % TODO: portable "storage" ref
+    Args = [St#st.storage, St#st.transfer],
+    % TODO
+    % Async would better because we would not need to wait for some lagging nodes if
+    % the coverage is already complete.
+    % TODO: BP API?
+    Results = erpc:multicall(Nodes, emqx_ft_storage_fs, list, Args, ?RPC_LIST_TIMEOUT),
+    NodeResults = lists:zip(Nodes, Results),
+    NAsm = emqx_ft_assembly:update(
+        lists:foldl(
+            fun
+                ({Node, {ok, {ok, Fragments}}}, Asm) ->
+                    emqx_ft_assembly:append(Asm, Node, Fragments);
+                ({Node, Result}, Asm) ->
+                    % TODO: log?
+                    Asm
+            end,
+            St#st.assembly,
+            NodeResults
+        )
+    ),
+    NSt = St#st{assembly = NAsm},
+    case emqx_ft_assembly:status(NAsm) of
+        complete ->
+            {next_state, start_assembling, NSt, ?internal([])};
+        % TODO: retries / recovery?
+        {incomplete, _} = Status ->
+            {stop, {error, Status}}
+    end;
+handle_event(start_assembling, internal, _, St = #st{assembly = Asm}) ->
+    Filemeta = emqx_ft_assembly:filemeta(Asm),
+    Coverage = emqx_ft_assembly:coverage(Asm),
+    % TODO: errors
+    {ok, Handle} = emqx_ft_storage_fs:open_file(St#st.storage, St#st.transfer, Filemeta),
+    {next_state, {assemble, Coverage}, St#st{file = Handle}, ?internal([])};
+handle_event({assemble, [{Node, Segment} | Rest]}, internal, _, St = #st{}) ->
+    % TODO
+    % Currently, race is possible between getting segment info from the remote node and
+    % this node garbage collecting the segment itself.
+    Args = [St#st.storage, St#st.transfer, Segment, 0, segsize(Segment)],
+    % TODO: pipelining
+    case erpc:call(Node, emqx_ft_storage_fs, read_segment, Args, ?RPC_READSEG_TIMEOUT) of
+        {ok, Content} ->
+            {ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content),
+            {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])}
+        % {error, _} ->
+        %     ...
+    end;
+handle_event({assemble, []}, internal, _, St = #st{}) ->
+    {next_state, complete, St, ?internal([])};
+handle_event(complete, internal, _, St = #st{assembly = Asm, file = Handle}) ->
+    Filemeta = emqx_ft_assembly:filemeta(Asm),
+    ok = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle),
+    {stop, shutdown}.
+
+% handle_continue(list_local, St = #st{storage = Storage, transfer = Transfer, assembly = Asm}) ->
+%     % TODO: what we do with non-transients errors here (e.g. `eacces`)?
+%     {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer),
+%     NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
+%     NSt = St#st{assembly = NAsm},
+%     case emqx_ft_assembly:status(NAsm) of
+%         complete ->
+%             {noreply, NSt, {continue}};
+%         {more, _} ->
+%             error(noimpl);
+%         {error, _} ->
+%             error(noimpl)
+%     end,
+%     {noreply, St}.
+
+% handle_call(_Call, _From, St) ->
+%     {reply, {error, badcall}, St}.
+
+% handle_cast(_Cast, St) ->
+%     {noreply, St}.
+
+%%
+
+segsize(#{fragment := {segmentinfo, Info}}) ->
+    maps:get(size, Info).

+ 44 - 0
apps/emqx_ft/src/emqx_ft_assembler_sup.erl

@@ -0,0 +1,44 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_assembler_sup).
+
+-export([start_link/1]).
+-export([start_child/3]).
+
+-behaviour(supervisor).
+-export([init/1]).
+
+-define(REF(ID), {via, gproc, {n, l, {?MODULE, ID}}}).
+
+start_link(ID) ->
+    supervisor:start_link(?REF(ID), ?MODULE, []).
+
+start_child(ID, Storage, Transfer) ->
+    Childspec = #{
+        id => {Storage, Transfer},
+        start => {emqx_ft_assembler, start_link, [Storage, Transfer]},
+        restart => transient
+    },
+    supervisor:start_child(?REF(ID), Childspec).
+
+init(_) ->
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 100,
+        period => 1000
+    },
+    {ok, SupFlags, []}.

+ 364 - 0
apps/emqx_ft/src/emqx_ft_assembly.erl

@@ -0,0 +1,364 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_assembly).
+
+-export([new/0]).
+-export([append/3]).
+-export([update/1]).
+
+-export([status/1]).
+-export([filemeta/1]).
+-export([coverage/1]).
+-export([properties/1]).
+
+-record(asm, {
+    status :: _TODO,
+    coverage :: _TODO,
+    properties :: _TODO,
+    meta :: _TODO,
+    % orddict:orddict(K, V)
+    segs :: _TODO,
+    size
+}).
+
+new() ->
+    #asm{
+        status = {incomplete, {missing, filemeta}},
+        meta = orddict:new(),
+        segs = orddict:new(),
+        size = 0
+    }.
+
+append(Asm, Node, Fragments) when is_list(Fragments) ->
+    lists:foldl(fun(F, AsmIn) -> append(AsmIn, Node, F) end, Asm, Fragments);
+append(Asm, Node, Fragment = #{fragment := {filemeta, _}}) ->
+    append_filemeta(Asm, Node, Fragment);
+append(Asm, Node, Segment = #{fragment := {segmentinfo, _}}) ->
+    append_segmentinfo(Asm, Node, Segment).
+
+update(Asm) ->
+    case status(meta, Asm) of
+        {complete, _Meta} ->
+            case status(coverage, Asm) of
+                {complete, Coverage, Props} ->
+                    Asm#asm{
+                        status = complete,
+                        coverage = Coverage,
+                        properties = Props
+                    };
+                Status ->
+                    Asm#asm{status = Status}
+            end;
+        Status ->
+            Asm#asm{status = Status}
+    end.
+
+status(#asm{status = Status}) ->
+    Status.
+
+filemeta(Asm) ->
+    case status(meta, Asm) of
+        {complete, Meta} -> Meta;
+        _Other -> undefined
+    end.
+
+coverage(#asm{coverage = Coverage}) ->
+    Coverage.
+
+properties(#asm{properties = Properties}) ->
+    Properties.
+
+status(meta, #asm{meta = Meta}) ->
+    status(meta, orddict:to_list(Meta));
+status(meta, [{Meta, {_Node, _Frag}}]) ->
+    {complete, Meta};
+status(meta, []) ->
+    {incomplete, {missing, filemeta}};
+status(meta, [_M1, _M2 | _] = Metas) ->
+    {error, {inconsistent, [Frag#{node => Node} || {_, {Node, Frag}} <- Metas]}};
+status(coverage, #asm{segs = Segments, size = Size}) ->
+    case coverage(orddict:to_list(Segments), 0, Size) of
+        Coverage when is_list(Coverage) ->
+            {complete, Coverage, #{
+                dominant => dominant(Coverage)
+            }};
+        Missing = {missing, _} ->
+            {incomplete, Missing}
+    end.
+
+append_filemeta(Asm, Node, Fragment = #{fragment := {filemeta, Meta}}) ->
+    Asm#asm{
+        meta = orddict:store(Meta, {Node, Fragment}, Asm#asm.meta)
+    }.
+
+append_segmentinfo(Asm, Node, Fragment = #{fragment := {segmentinfo, Info}}) ->
+    Offset = maps:get(offset, Info),
+    Size = maps:get(size, Info),
+    End = Offset + Size,
+    Asm#asm{
+        % TODO
+        % In theory it's possible to have two segments with same offset + size on
+        % different nodes but with differing content. We'd need a checksum to
+        % be able to disambiguate them though.
+        segs = orddict:store({Offset, locality(Node), -End, Node}, Fragment, Asm#asm.segs),
+        size = max(End, Asm#asm.size)
+    }.
+
+coverage([{{Offset, _, _, _}, _Segment} | Rest], Cursor, Sz) when Offset < Cursor ->
+    coverage(Rest, Cursor, Sz);
+coverage([{{Cursor, _Locality, MEnd, Node}, Segment} | Rest], Cursor, Sz) ->
+    % NOTE
+    % We consider only whole fragments here, so for example from the point of view of
+    % this algo `[{Offset1 = 0, Size1 = 15}, {Offset2 = 10, Size2 = 10}]` has no
+    % coverage.
+    case coverage(Rest, -MEnd, Sz) of
+        Coverage when is_list(Coverage) ->
+            [{Node, Segment} | Coverage];
+        Missing = {missing, _} ->
+            case coverage(Rest, Cursor, Sz) of
+                CoverageAlt when is_list(CoverageAlt) ->
+                    CoverageAlt;
+                {missing, _} ->
+                    Missing
+            end
+    end;
+coverage([{{Offset, _MEnd, _, _}, _Segment} | _], Cursor, _Sz) when Offset > Cursor ->
+    {missing, {segment, Cursor, Offset}};
+coverage([], Cursor, Sz) when Cursor < Sz ->
+    {missing, {segment, Cursor, Sz}};
+coverage([], Cursor, Cursor) ->
+    [].
+
+dominant(Coverage) ->
+    % TODO: needs improvement, better defined _dominance_, maybe some score
+    Freqs = frequencies(fun({Node, Segment}) -> {Node, segsize(Segment)} end, Coverage),
+    maxfreq(Freqs, node()).
+
+frequencies(Fun, List) ->
+    lists:foldl(
+        fun(E, Acc) ->
+            {K, N} = Fun(E),
+            maps:update_with(K, fun(M) -> M + N end, N, Acc)
+        end,
+        #{},
+        List
+    ).
+
+maxfreq(Freqs, Init) ->
+    {_, Max} = maps:fold(
+        fun
+            (F, N, {M, _MF}) when N > M -> {N, F};
+            (_F, _N, {M, MF}) -> {M, MF}
+        end,
+        {0, Init},
+        Freqs
+    ),
+    Max.
+
+locality(Node) when Node =:= node() ->
+    % NOTE
+    % This should prioritize locally available segments over those on remote nodes.
+    0;
+locality(_RemoteNode) ->
+    1.
+
+segsize(#{fragment := {segmentinfo, Info}}) ->
+    maps:get(size, Info).
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+incomplete_new_test() ->
+    ?assertEqual(
+        {incomplete, {missing, filemeta}},
+        status(update(new()))
+    ).
+
+incomplete_test() ->
+    ?assertEqual(
+        {incomplete, {missing, filemeta}},
+        status(
+            update(
+                append(new(), node(), [
+                    segmentinfo(p1, 0, 42),
+                    segmentinfo(p1, 42, 100)
+                ])
+            )
+        )
+    ).
+
+consistent_test() ->
+    Asm1 = append(new(), n1, [filemeta(m1, "blarg")]),
+    Asm2 = append(Asm1, n2, [segmentinfo(s2, 0, 42)]),
+    Asm3 = append(Asm2, n3, [filemeta(m3, "blarg")]),
+    ?assertMatch({complete, _}, status(meta, Asm3)).
+
+inconsistent_test() ->
+    Asm1 = append(new(), node(), [segmentinfo(s1, 0, 42)]),
+    Asm2 = append(Asm1, n1, [filemeta(m1, "blarg")]),
+    Asm3 = append(Asm2, n2, [segmentinfo(s2, 0, 42), filemeta(m1, "blorg")]),
+    Asm4 = append(Asm3, n3, [filemeta(m3, "blarg")]),
+    ?assertMatch(
+        {error,
+            {inconsistent, [
+                % blarg < blorg
+                #{node := n3, path := m3, fragment := {filemeta, #{name := "blarg"}}},
+                #{node := n2, path := m1, fragment := {filemeta, #{name := "blorg"}}}
+            ]}},
+        status(meta, Asm4)
+    ).
+
+simple_coverage_test() ->
+    Node = node(),
+    Segs = [
+        {node42, segmentinfo(n1, 20, 30)},
+        {Node, segmentinfo(n2, 0, 10)},
+        {Node, segmentinfo(n3, 50, 50)},
+        {Node, segmentinfo(n4, 10, 10)}
+    ],
+    Asm = append_many(new(), Segs),
+    ?assertMatch(
+        {complete,
+            [
+                {Node, #{path := n2}},
+                {Node, #{path := n4}},
+                {node42, #{path := n1}},
+                {Node, #{path := n3}}
+            ],
+            #{dominant := Node}},
+        status(coverage, Asm)
+    ).
+
+redundant_coverage_test() ->
+    Node = node(),
+    Segs = [
+        {Node, segmentinfo(n1, 0, 20)},
+        {node1, segmentinfo(n2, 0, 10)},
+        {Node, segmentinfo(n3, 20, 40)},
+        {node2, segmentinfo(n4, 10, 10)},
+        {node2, segmentinfo(n5, 50, 20)},
+        {node3, segmentinfo(n6, 20, 20)},
+        {Node, segmentinfo(n7, 50, 10)},
+        {node1, segmentinfo(n8, 40, 10)}
+    ],
+    Asm = append_many(new(), Segs),
+    ?assertMatch(
+        {complete,
+            [
+                {Node, #{path := n1}},
+                {node3, #{path := n6}},
+                {node1, #{path := n8}},
+                {node2, #{path := n5}}
+            ],
+            #{dominant := _}},
+        status(coverage, Asm)
+    ).
+
+redundant_coverage_prefer_local_test() ->
+    Node = node(),
+    Segs = [
+        {node1, segmentinfo(n1, 0, 20)},
+        {Node, segmentinfo(n2, 0, 10)},
+        {Node, segmentinfo(n3, 10, 10)},
+        {node2, segmentinfo(n4, 20, 20)},
+        {Node, segmentinfo(n5, 30, 10)},
+        {Node, segmentinfo(n6, 20, 10)}
+    ],
+    Asm = append_many(new(), Segs),
+    ?assertMatch(
+        {complete,
+            [
+                {Node, #{path := n2}},
+                {Node, #{path := n3}},
+                {Node, #{path := n6}},
+                {Node, #{path := n5}}
+            ],
+            #{dominant := Node}},
+        status(coverage, Asm)
+    ).
+
+missing_coverage_test() ->
+    Node = node(),
+    Segs = [
+        {Node, segmentinfo(n1, 0, 10)},
+        {node1, segmentinfo(n3, 10, 20)},
+        {Node, segmentinfo(n2, 0, 20)},
+        {node2, segmentinfo(n4, 50, 50)},
+        {Node, segmentinfo(n5, 40, 60)}
+    ],
+    Asm = append_many(new(), Segs),
+    ?assertEqual(
+        % {incomplete, {missing, {segment, 30, 40}}}, ???
+        {incomplete, {missing, {segment, 20, 40}}},
+        status(coverage, Asm)
+    ).
+
+missing_end_coverage_test() ->
+    Node = node(),
+    Segs = [
+        {Node, segmentinfo(n1, 0, 15)},
+        {node1, segmentinfo(n3, 10, 10)}
+    ],
+    Asm = append_many(new(), Segs),
+    ?assertEqual(
+        {incomplete, {missing, {segment, 15, 20}}},
+        status(coverage, Asm)
+    ).
+
+missing_coverage_with_redudancy_test() ->
+    Segs = [
+        {node(), segmentinfo(n1, 0, 10)},
+        {node(), segmentinfo(n2, 0, 20)},
+        {node42, segmentinfo(n3, 10, 20)},
+        {node43, segmentinfo(n4, 10, 50)},
+        {node(), segmentinfo(n5, 40, 60)}
+    ],
+    Asm = append_many(new(), Segs),
+    ?assertEqual(
+        % {incomplete, {missing, {segment, 50, 60}}}, ???
+        {incomplete, {missing, {segment, 20, 40}}},
+        status(coverage, Asm)
+    ).
+
+append_many(Asm, List) ->
+    lists:foldl(
+        fun({Node, Frag}, Acc) -> append(Acc, Node, Frag) end,
+        Asm,
+        List
+    ).
+
+filemeta(Path, Name) ->
+    #{
+        path => Path,
+        fragment =>
+            {filemeta, #{
+                name => Name
+            }}
+    }.
+
+segmentinfo(Path, Offset, Size) ->
+    #{
+        path => Path,
+        fragment =>
+            {segmentinfo, #{
+                offset => Offset,
+                size => Size
+            }}
+    }.
+
+-endif.

+ 425 - 0
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -0,0 +1,425 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_storage_fs).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+% -compile(export_all).
+
+-export([store_filemeta/3]).
+-export([store_segment/3]).
+-export([list/2]).
+-export([assemble/2]).
+
+-export([open_file/3]).
+-export([complete/4]).
+-export([write/2]).
+-export([discard/1]).
+
+% -behaviour(gen_server).
+% -export([init/1]).
+% -export([handle_call/3]).
+% -export([handle_cast/2]).
+
+-type json_value() ::
+    null
+    | boolean()
+    | binary()
+    | number()
+    | [json_value()]
+    | #{binary() => json_value()}.
+
+-reflect_type([json_value/0]).
+
+-type transfer() :: emqx_ft:transfer().
+-type offset() :: emqx_ft:offset().
+
+%% TODO: move to `emqx_ft` interface module
+% -type sha256_hex() :: <<_:512>>.
+
+-type filemeta() :: #{
+    %% Display name
+    name := string(),
+    %% Size in bytes, as advertised by the client.
+    %% Client is free to specify here whatever it wants, which means we can end
+    %% up with a file of different size after assembly. It's not clear from
+    %% specification what that means (e.g. what are clients' expectations), we
+    %% currently do not condider that an error (or, specifically, a signal that
+    %% the resulting file is corrupted during transmission).
+    size => _Bytes :: non_neg_integer(),
+    checksum => {sha256, <<_:256>>},
+    expire_at := emqx_datetime:epoch_second(),
+    %% TTL of individual segments
+    %% Somewhat confusing that we won't know it on the nodes where the filemeta
+    %% is missing.
+    segments_ttl => _Seconds :: pos_integer(),
+    user_data => json_value()
+}.
+
+-type segment() :: {offset(), _Content :: binary()}.
+
+-type segmentinfo() :: #{
+    offset := offset(),
+    size := _Bytes :: non_neg_integer()
+}.
+
+-type filefrag(T) :: #{
+    path := file:name(),
+    timestamp := emqx_datetime:epoch_second(),
+    fragment := T
+}.
+
+-type filefrag() :: filefrag({filemeta, filemeta()} | {segment, segmentinfo()}).
+
+-define(MANIFEST, "MANIFEST.json").
+-define(SEGMENT, "SEG").
+-define(TEMP, "TMP").
+
+-type root() :: file:name().
+
+% -record(st, {
+%     root :: file:name()
+% }).
+
+%% TODO
+-type storage() :: root().
+
+%%
+
+-define(PROCREF(Root), {via, gproc, {n, l, {?MODULE, Root}}}).
+
+-spec start_link(root()) ->
+    {ok, pid()} | {error, already_started}.
+start_link(Root) ->
+    gen_server:start_link(?PROCREF(Root), ?MODULE, [], []).
+
+%% Store manifest in the backing filesystem.
+%% Atomic operation.
+-spec store_filemeta(storage(), transfer(), filemeta()) ->
+    % Quota? Some lower level errors?
+    ok | {error, conflict} | {error, _TODO}.
+store_filemeta(Storage, Transfer, Meta) ->
+    Filepath = mk_filepath(Storage, Transfer, ?MANIFEST),
+    case read_file(Filepath, fun decode_filemeta/1) of
+        {ok, Meta} ->
+            _ = touch_file(Filepath),
+            ok;
+        {ok, Conflict} ->
+            % TODO
+            % We won't see conflicts in case of concurrent `store_filemeta`
+            % requests. It's rather odd scenario so it's fine not to worry
+            % about it too much now.
+            {error, conflict};
+        {error, Reason} when Reason =:= notfound; Reason =:= corrupted ->
+            write_file_atomic(Filepath, encode_filemeta(Meta))
+    end.
+
+%% Store a segment in the backing filesystem.
+%% Atomic operation.
+-spec store_segment(storage(), transfer(), segment()) ->
+    % Where is the checksum gets verified? Upper level probably.
+    % Quota? Some lower level errors?
+    ok | {error, _TODO}.
+store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
+    Filepath = mk_filepath(Storage, Transfer, mk_segment_filename(Segment)),
+    write_file_atomic(Filepath, Content).
+
+-spec list(storage(), transfer()) ->
+    % Some lower level errors? {error, notfound}?
+    % Result will contain zero or only one filemeta.
+    {ok, list(filefrag())} | {error, _TODO}.
+list(Storage, Transfer) ->
+    Dirname = mk_filedir(Storage, Transfer),
+    case file:list_dir(Dirname) of
+        {ok, Filenames} ->
+            {ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)};
+        {error, enoent} ->
+            {ok, []};
+        {error, _} = Error ->
+            Error
+    end.
+
+-spec assemble(storage(), transfer()) ->
+    % {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}.
+    {ok, _Assembler :: pid()} | {error, _TODO}.
+assemble(Storage, Transfer) ->
+    emqx_ft_assembler_sup:start_child(Storage, Storage, Transfer).
+
+%%
+
+-opaque handle() :: {file:name(), io:device(), crypto:hash_state()}.
+
+-spec open_file(storage(), transfer(), filemeta()) ->
+    {ok, handle()} | {error, _TODO}.
+open_file(Storage, Transfer, Filemeta) ->
+    Filename = maps:get(name, Filemeta),
+    Filepath = mk_filepath(Storage, Transfer, Filename),
+    TempFilepath = mk_temp_filepath(Filepath),
+    case file:open(TempFilepath, [write, raw]) of
+        {ok, Handle} ->
+            _ = file:truncate(Handle),
+            {ok, {TempFilepath, Handle, init_checksum(Filemeta)}};
+        {error, _} = Error ->
+            Error
+    end.
+
+-spec write(handle(), iodata()) ->
+    ok | {error, _TODO}.
+write({Filepath, IoDevice, Ctx}, IoData) ->
+    case file:write(IoDevice, IoData) of
+        ok ->
+            {ok, {Filepath, IoDevice, update_checksum(Ctx, IoData)}};
+        {error, _} = Error ->
+            Error
+    end.
+
+-spec complete(storage(), transfer(), filemeta(), handle()) ->
+    ok | {error, {checksum, _Algo, _Computed}} | {error, _TODO}.
+complete(Storage, Transfer, Filemeta, Handle = {Filepath, IoDevice, Ctx}) ->
+    TargetFilepath = mk_filepath(Storage, Transfer, maps:get(name, Filemeta)),
+    case verify_checksum(Ctx, Filemeta) of
+        ok ->
+            ok = file:close(IoDevice),
+            file:rename(Filepath, TargetFilepath);
+        {error, _} = Error ->
+            _ = discard(Handle),
+            Error
+    end.
+
+-spec discard(handle()) ->
+    ok.
+discard({Filepath, IoDevice, _Ctx}) ->
+    ok = file:close(IoDevice),
+    file:delete(Filepath).
+
+init_checksum(#{checksum := {Algo, _}}) ->
+    crypto:hash_init(Algo);
+init_checksum(#{}) ->
+    undefined.
+
+update_checksum(Ctx, IoData) when Ctx /= undefined ->
+    crypto:hash_update(Ctx, IoData);
+update_checksum(undefined, _IoData) ->
+    undefined.
+
+verify_checksum(Ctx, #{checksum := {Algo, Digest}}) when Ctx /= undefined ->
+    case crypto:hash_final(Ctx) of
+        Digest ->
+            ok;
+        Mismatch ->
+            {error, {checksum, Algo, binary:encode_hex(Mismatch)}}
+    end;
+verify_checksum(undefined, _) ->
+    ok.
+
+%%
+
+-spec init(root()) -> {ok, storage()}.
+init(Root) ->
+    % TODO: garbage_collect(...)
+    {ok, Root}.
+
+%%
+
+-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
+
+schema() ->
+    #{
+        roots => [
+            {name, hoconsc:mk(string(), #{required => true})},
+            {size, hoconsc:mk(non_neg_integer())},
+            {expire_at, hoconsc:mk(non_neg_integer(), #{required => true})},
+            {checksum, hoconsc:mk({atom(), binary()}, #{converter => converter(checksum)})},
+            {segments_ttl, hoconsc:mk(pos_integer())},
+            {user_data, hoconsc:mk(json_value())}
+        ]
+    }.
+
+% encode_filemeta(Meta) ->
+%     emqx_json:encode(
+%         ?PRELUDE(
+%             _Vsn = 1,
+%             maps:map(
+%                 fun
+%                     (name, Name) ->
+%                         {<<"name">>, Name};
+%                     (size, Size) ->
+%                         {<<"size">>, Size};
+%                     (checksum, {sha256, Hash}) ->
+%                         {<<"checksum">>, <<"sha256:", (binary:encode_hex(Hash))/binary>>};
+%                     (expire_at, ExpiresAt) ->
+%                         {<<"expire_at">>, ExpiresAt};
+%                     (segments_ttl, TTL) ->
+%                         {<<"segments_ttl">>, TTL};
+%                     (user_data, UserData) ->
+%                         {<<"user_data">>, UserData}
+%                 end,
+%                 Meta
+%             )
+%         )
+%     ).
+
+encode_filemeta(Meta) ->
+    % TODO: Looks like this should be hocon's responsibility.
+    Term = hocon_tconf:make_serializable(schema(), emqx_map_lib:binary_key_map(Meta), #{}),
+    emqx_json:encode(?PRELUDE(_Vsn = 1, Term)).
+
+decode_filemeta(Binary) ->
+    ?PRELUDE(_Vsn = 1, Term) = emqx_json:decode(Binary, [return_maps]),
+    hocon_tconf:check_plain(schema(), Term, #{atom_key => true, required => false}).
+
+converter(checksum) ->
+    fun
+        (undefined, #{}) ->
+            undefined;
+        ({sha256, Bin}, #{make_serializable := true}) ->
+            _ = is_binary(Bin) orelse throw({expected_type, string}),
+            _ = byte_size(Bin) =:= 32 orelse throw({expected_length, 32}),
+            binary:encode_hex(Bin);
+        (Hex, #{}) ->
+            _ = is_binary(Hex) orelse throw({expected_type, string}),
+            _ = byte_size(Hex) =:= 64 orelse throw({expected_length, 64}),
+            {sha256, binary:decode_hex(Hex)}
+    end.
+
+% map_into(Fun, Into, Ks, Map) ->
+%     map_foldr(map_into_fn(Fun, Into), Into, Ks, Map).
+
+% map_into_fn(Fun, L) when is_list(L) ->
+%     fun(K, V, Acc) -> [{K, Fun(K, V)} || Acc] end.
+
+% map_foldr(_Fun, Acc, [], _) ->
+%     Acc;
+% map_foldr(Fun, Acc, [K | Ks], Map) when is_map_key(K, Map) ->
+%     Fun(K, maps:get(K, Map), map_foldr(Fun, Acc, Ks, Map));
+% map_foldr(Fun, Acc, [_ | Ks], Map) ->
+%     map_foldr(Fun, Acc, Ks, Map).
+
+%%
+
+mk_segment_filename({Offset, Content}) ->
+    lists:concat([?SEGMENT, ".", Offset, ".", byte_size(Content)]).
+
+break_segment_filename(Filename) ->
+    Regex = "^" ?SEGMENT "[.]([0-9]+)[.]([0-9]+)$",
+    Result = re:run(Filename, Regex, [{capture, all_but_first, list}]),
+    case Result of
+        {match, [Offset, Size]} ->
+            {ok, #{offset => list_to_integer(Offset), size => list_to_integer(Size)}};
+        nomatch ->
+            {error, invalid}
+    end.
+
+mk_filedir(Storage, {ClientId, FileId}) ->
+    filename:join([get_storage_root(Storage), ClientId, FileId]).
+
+mk_filepath(Storage, Transfer, Filename) ->
+    filename:join(mk_filedir(Storage, Transfer), Filename).
+
+get_storage_root(Storage) ->
+    Storage.
+
+%%
+
+-include_lib("kernel/include/file.hrl").
+
+read_file(Filepath) ->
+    file:read_file(Filepath).
+
+read_file(Filepath, DecodeFun) ->
+    case read_file(Filepath) of
+        {ok, Content} ->
+            safe_decode(Content, DecodeFun);
+        {error, _} = Error ->
+            Error
+    end.
+
+safe_decode(Content, DecodeFun) ->
+    try
+        {ok, DecodeFun(Content)}
+    catch
+        C:R:Stacktrace ->
+            % TODO: Log?
+            {error, corrupted}
+    end.
+
+write_file_atomic(Filepath, Content) when is_binary(Content) ->
+    Result = emqx_misc:pipeline(
+        [
+            fun filelib:ensure_dir/1,
+            fun mk_temp_filepath/1,
+            fun write_contents/2,
+            fun mv_temp_file/1
+        ],
+        Filepath,
+        Content
+    ),
+    case Result of
+        {ok, {Filepath, TempFilepath}, _} ->
+            _ = file:delete(TempFilepath),
+            ok;
+        {error, Reason, _} ->
+            {error, Reason}
+    end.
+
+mk_temp_filepath(Filepath) ->
+    Dirname = filename:dirname(Filepath),
+    Filename = filename:basename(Filepath),
+    Unique = erlang:unique_integer([positive]),
+    TempFilepath = filename:join(Dirname, ?TEMP ++ integer_to_list(Unique) ++ "." ++ Filename),
+    {Filepath, TempFilepath}.
+
+write_contents({_Filepath, TempFilepath}, Content) ->
+    file:write_file(TempFilepath, Content).
+
+mv_temp_file({Filepath, TempFilepath}) ->
+    file:rename(TempFilepath, Filepath).
+
+touch_file(Filepath) ->
+    Now = erlang:localtime(),
+    file:change_time(Filepath, _Mtime = Now, _Atime = Now).
+
+filtermap_files(Fun, Dirname, Filenames) ->
+    lists:filtermap(fun(Filename) -> Fun(Dirname, Filename) end, Filenames).
+
+mk_filefrag(Dirname, Filename = ?MANIFEST) ->
+    mk_filefrag(Dirname, Filename, fun read_filemeta/2);
+mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) ->
+    mk_filefrag(Dirname, Filename, fun read_segmentinfo/2);
+mk_filefrag(_Dirname, _) ->
+    false.
+
+mk_filefrag(Dirname, Filename, Fun) ->
+    Filepath = filename:join(Dirname, Filename),
+    Fileinfo = file:read_file_info(Filepath),
+    case Fun(Filename, Filepath) of
+        {ok, Frag} ->
+            {true, #{
+                path => Filepath,
+                timestamp => Fileinfo#file_info.mtime,
+                fragment => Frag
+            }};
+        {error, Reason} ->
+            false
+    end.
+
+read_filemeta(_Filename, Filepath) ->
+    read_file(Filepath, fun decode_filemeta/1).
+
+read_segmentinfo(Filename, _Filepath) ->
+    break_segment_filename(Filename).