Prechádzať zdrojové kódy

refactor(ft-asm): turn state data record into a map

Which should be more future-proof.
Andrew Mayorov 2 rokov pred
rodič
commit
23cd78b8d6

+ 72 - 42
apps/emqx_ft/src/emqx_ft_assembler.erl

@@ -24,12 +24,12 @@
 -export([handle_event/4]).
 -export([terminate/3]).
 
--record(st, {
-    storage :: _Storage,
-    transfer :: emqx_ft:transfer(),
-    assembly :: emqx_ft_assembly:t(),
-    export :: _Export | undefined
-}).
+-type stdata() :: #{
+    storage := emqx_ft_storage_fs:storage(),
+    transfer := emqx_ft:transfer(),
+    assembly := emqx_ft_assembly:t(),
+    export => emqx_ft_storage_exporter:export()
+}.
 
 -define(NAME(Transfer), {n, l, {?MODULE, Transfer}}).
 -define(REF(Transfer), {via, gproc, ?NAME(Transfer)}).
@@ -41,31 +41,48 @@ start_link(Storage, Transfer, Size) ->
 
 %%
 
+-type state() ::
+    idle
+    | list_local_fragments
+    | {list_remote_fragments, [node()]}
+    | start_assembling
+    | {assemble, [{node(), emqx_ft_storage_fs:filefrag()}]}
+    | complete.
+
 -define(internal(C), {next_event, internal, C}).
 
 callback_mode() ->
     handle_event_function.
 
+-spec init(_Args) -> {ok, state(), stdata()}.
 init({Storage, Transfer, Size}) ->
     _ = erlang:process_flag(trap_exit, true),
-    St = #st{
-        storage = Storage,
-        transfer = Transfer,
-        assembly = emqx_ft_assembly:new(Size)
+    St = #{
+        storage => Storage,
+        transfer => Transfer,
+        assembly => emqx_ft_assembly:new(Size)
     },
     {ok, idle, St}.
 
+-spec handle_event(info | internal, _, state(), stdata()) ->
+    {next_state, state(), stdata(), {next_event, internal, _}}
+    | {stop, {shutdown, ok | {error, _}}, stdata()}.
 handle_event(info, kickoff, idle, St) ->
     % NOTE
     % Someone's told us to start the work, which usually means that it has set up a monitor.
     % We could wait for this message and handle it at the end of the assembling rather than at
     % the beginning, however it would make error handling much more messier.
     {next_state, list_local_fragments, St, ?internal([])};
-handle_event(internal, _, list_local_fragments, St = #st{}) ->
+handle_event(
+    internal,
+    _,
+    list_local_fragments,
+    St = #{storage := Storage, transfer := Transfer, assembly := Asm}
+) ->
     % TODO: what we do with non-transients errors here (e.g. `eacces`)?
-    {ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer, fragment),
-    NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(St#st.assembly, node(), Fragments)),
-    NSt = St#st{assembly = NAsm},
+    {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer, fragment),
+    NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
+    NSt = St#{assembly := NAsm},
     case emqx_ft_assembly:status(NAsm) of
         complete ->
             {next_state, start_assembling, NSt, ?internal([])};
@@ -76,27 +93,32 @@ handle_event(internal, _, list_local_fragments, St = #st{}) ->
         {error, _} = Error ->
             {stop, {shutdown, Error}}
     end;
-handle_event(internal, _, {list_remote_fragments, Nodes}, St) ->
+handle_event(
+    internal,
+    _,
+    {list_remote_fragments, Nodes},
+    St = #{transfer := Transfer, assembly := Asm}
+) ->
     % TODO
     % Async would better because we would not need to wait for some lagging nodes if
     % the coverage is already complete.
     % TODO: portable "storage" ref
-    Results = emqx_ft_storage_fs_proto_v1:multilist(Nodes, St#st.transfer, fragment),
+    Results = emqx_ft_storage_fs_proto_v1:multilist(Nodes, Transfer, fragment),
     NodeResults = lists:zip(Nodes, Results),
     NAsm = emqx_ft_assembly:update(
         lists:foldl(
             fun
-                ({Node, {ok, {ok, Fragments}}}, Asm) ->
-                    emqx_ft_assembly:append(Asm, Node, Fragments);
-                ({_Node, _Result}, Asm) ->
+                ({Node, {ok, {ok, Fragments}}}, Acc) ->
+                    emqx_ft_assembly:append(Acc, Node, Fragments);
+                ({_Node, _Result}, Acc) ->
                     % TODO: log?
-                    Asm
+                    Acc
             end,
-            St#st.assembly,
+            Asm,
             NodeResults
         )
     ),
-    NSt = St#st{assembly = NAsm},
+    NSt = St#{assembly := NAsm},
     case emqx_ft_assembly:status(NAsm) of
         complete ->
             {next_state, start_assembling, NSt, ?internal([])};
@@ -106,43 +128,51 @@ handle_event(internal, _, {list_remote_fragments, Nodes}, St) ->
         {error, _} = Error ->
             {stop, {shutdown, Error}}
     end;
-handle_event(internal, _, start_assembling, St = #st{}) ->
-    Filemeta = emqx_ft_assembly:filemeta(St#st.assembly),
-    Coverage = emqx_ft_assembly:coverage(St#st.assembly),
+handle_event(
+    internal,
+    _,
+    start_assembling,
+    St = #{storage := Storage, transfer := Transfer, assembly := Asm}
+) ->
+    Filemeta = emqx_ft_assembly:filemeta(Asm),
+    Coverage = emqx_ft_assembly:coverage(Asm),
     % TODO: better error handling
     {ok, Export} = emqx_ft_storage_exporter:start_export(
-        St#st.storage,
-        St#st.transfer,
+        Storage,
+        Transfer,
         Filemeta
     ),
-    {next_state, {assemble, Coverage}, St#st{export = Export}, ?internal([])};
-handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
+    {next_state, {assemble, Coverage}, St#{export => Export}, ?internal([])};
+handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export := Export}) ->
     % TODO
     % Currently, race is possible between getting segment info from the remote node and
     % this node garbage collecting the segment itself.
     % TODO: pipelining
     % TODO: better error handling
     {ok, Content} = pread(Node, Segment, St),
-    {ok, NExport} = emqx_ft_storage_exporter:write(St#st.export, Content),
-    {next_state, {assemble, Rest}, St#st{export = NExport}, ?internal([])};
-handle_event(internal, _, {assemble, []}, St = #st{}) ->
+    {ok, NExport} = emqx_ft_storage_exporter:write(Export, Content),
+    {next_state, {assemble, Rest}, St#{export := NExport}, ?internal([])};
+handle_event(internal, _, {assemble, []}, St = #{}) ->
     {next_state, complete, St, ?internal([])};
-handle_event(internal, _, complete, St = #st{}) ->
-    Result = emqx_ft_storage_exporter:complete(St#st.export),
+handle_event(internal, _, complete, St = #{export := Export}) ->
+    Result = emqx_ft_storage_exporter:complete(Export),
     ok = maybe_garbage_collect(Result, St),
-    {stop, {shutdown, Result}, St#st{export = undefined}}.
+    {stop, {shutdown, Result}, maps:remove(export, St)}.
 
-terminate(_Reason, _StateName, #st{export = Export}) ->
-    Export /= undefined andalso emqx_ft_storage_exporter:discard(Export).
+-spec terminate(_Reason, state(), stdata()) -> _.
+terminate(_Reason, _StateName, #{export := Export}) ->
+    emqx_ft_storage_exporter:discard(Export);
+terminate(_Reason, _StateName, #{}) ->
+    ok.
 
-pread(Node, Segment, St) when Node =:= node() ->
-    emqx_ft_storage_fs:pread(St#st.storage, St#st.transfer, Segment, 0, segsize(Segment));
-pread(Node, Segment, St) ->
-    emqx_ft_storage_fs_proto_v1:pread(Node, St#st.transfer, Segment, 0, segsize(Segment)).
+pread(Node, Segment, #{storage := Storage, transfer := Transfer}) when Node =:= node() ->
+    emqx_ft_storage_fs:pread(Storage, Transfer, Segment, 0, segsize(Segment));
+pread(Node, Segment, #{transfer := Transfer}) ->
+    emqx_ft_storage_fs_proto_v1:pread(Node, Transfer, Segment, 0, segsize(Segment)).
 
 %%
 
-maybe_garbage_collect(ok, #st{storage = Storage, transfer = Transfer, assembly = Asm}) ->
+maybe_garbage_collect(ok, #{storage := Storage, transfer := Transfer, assembly := Asm}) ->
     Nodes = emqx_ft_assembly:nodes(Asm),
     emqx_ft_storage_fs_gc:collect(Storage, Transfer, Nodes);
 maybe_garbage_collect({error, _}, _St) ->

+ 3 - 0
apps/emqx_ft/src/emqx_ft_storage_exporter.erl

@@ -34,6 +34,9 @@
 
 -export([exporter/1]).
 
+-export_type([options/0]).
+-export_type([export/0]).
+
 -type storage() :: emxt_ft_storage_fs:storage().
 -type transfer() :: emqx_ft:transfer().
 -type filemeta() :: emqx_ft:filemeta().