Просмотр исходного кода

Merge pull request #10655 from savonarola/0510-idempotent-fin

Make FT fin command idempotent
Ilya Averyanov 2 лет назад
Родитель
Сommit
42f5433aaf

+ 2 - 2
apps/emqx_ft/src/emqx_ft.erl

@@ -268,8 +268,8 @@ on_fin(PacketId, Msg, Transfer, FinalSize, Checksum) ->
     with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() ->
         case assemble(Transfer, FinalSize) of
             %% Assembling completed, ack through the responder right away
-            % ok ->
-            %     emqx_ft_responder:ack(FinPacketKey, ok);
+            ok ->
+                emqx_ft_responder:ack(FinPacketKey, ok);
             %% Assembling started, packet will be acked by the responder
             {async, Pid} ->
                 ok = emqx_ft_responder:kickoff(FinPacketKey, Pid),

+ 5 - 0
apps/emqx_ft/src/emqx_ft_assembler.erl

@@ -24,6 +24,8 @@
 -export([handle_event/4]).
 -export([terminate/3]).
 
+-export([where/1]).
+
 -type stdata() :: #{
     storage := emqx_ft_storage_fs:storage(),
     transfer := emqx_ft:transfer(),
@@ -39,6 +41,9 @@
 start_link(Storage, Transfer, Size) ->
     gen_statem:start_link(?REF(Transfer), ?MODULE, {Storage, Transfer, Size}, []).
 
+where(Transfer) ->
+    gproc:where(?NAME(Transfer)).
+
 %%
 
 -type state() ::

+ 3 - 1
apps/emqx_ft/src/emqx_ft_storage_exporter_fs.erl

@@ -327,7 +327,9 @@ list(_Options, Query = #{transfer := _Transfer}) ->
         #{items := Exports = [_ | _]} ->
             {ok, #{items => Exports}};
         #{items := [], errors := NodeErrors} ->
-            {error, NodeErrors}
+            {error, NodeErrors};
+        #{items := []} ->
+            {ok, #{items => []}}
     end;
 list(_Options, Query) ->
     Result = list(Query),

+ 47 - 4
apps/emqx_ft/src/emqx_ft_storage_fs.erl

@@ -35,6 +35,7 @@
 -export([read_filemeta/2]).
 -export([list/3]).
 -export([pread/5]).
+-export([lookup_local_assembler/1]).
 -export([assemble/3]).
 
 -export([transfers/1]).
@@ -211,11 +212,15 @@ pread(_Storage, _Transfer, Frag, Offset, Size) ->
     end.
 
 -spec assemble(storage(), transfer(), emqx_ft:bytes()) ->
-    {async, _Assembler :: pid()} | {error, _TODO}.
+    {async, _Assembler :: pid()} | ok | {error, _TODO}.
 assemble(Storage, Transfer, Size) ->
-    % TODO: ask cluster if the transfer is already assembled
-    {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size),
-    {async, Pid}.
+    LookupSources = [
+        fun() -> lookup_local_assembler(Transfer) end,
+        fun() -> lookup_remote_assembler(Transfer) end,
+        fun() -> check_if_already_exported(Storage, Transfer) end,
+        fun() -> ensure_local_assembler(Storage, Transfer, Size) end
+    ],
+    lookup_assembler(LookupSources).
 
 %%
 
@@ -252,6 +257,44 @@ stop(Storage) ->
 
 %%
 
+lookup_assembler([LastSource]) ->
+    LastSource();
+lookup_assembler([Source | Sources]) ->
+    case Source() of
+        {error, not_found} -> lookup_assembler(Sources);
+        Result -> Result
+    end.
+
+check_if_already_exported(Storage, Transfer) ->
+    case files(Storage, #{transfer => Transfer}) of
+        {ok, #{items := [_ | _]}} -> ok;
+        _ -> {error, not_found}
+    end.
+
+lookup_local_assembler(Transfer) ->
+    case emqx_ft_assembler:where(Transfer) of
+        Pid when is_pid(Pid) -> {async, Pid};
+        _ -> {error, not_found}
+    end.
+
+lookup_remote_assembler(Transfer) ->
+    Nodes = emqx:running_nodes() -- [node()],
+    Assemblers = lists:flatmap(
+        fun
+            ({ok, {async, Pid}}) -> [Pid];
+            (_) -> []
+        end,
+        emqx_ft_storage_fs_proto_v1:list_assemblers(Nodes, Transfer)
+    ),
+    case Assemblers of
+        [Pid | _] -> {async, Pid};
+        _ -> {error, not_found}
+    end.
+
+ensure_local_assembler(Storage, Transfer, Size) ->
+    {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size),
+    {async, Pid}.
+
 -spec transfers(storage()) ->
     {ok, #{transfer() => transferinfo()}}.
 transfers(Storage) ->

+ 5 - 1
apps/emqx_ft/src/emqx_ft_storage_fs_proxy.erl

@@ -22,7 +22,8 @@
 
 -export([
     list_local/2,
-    pread_local/4
+    pread_local/4,
+    lookup_local_assembler/1
 ]).
 
 list_local(Transfer, What) ->
@@ -30,3 +31,6 @@ list_local(Transfer, What) ->
 
 pread_local(Transfer, Frag, Offset, Size) ->
     emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]).
+
+lookup_local_assembler(Transfer) ->
+    emqx_ft_storage:with_storage_type(local, lookup_local_assembler, [Transfer]).

+ 6 - 0
apps/emqx_ft/src/proto/emqx_ft_storage_fs_proto_v1.erl

@@ -22,6 +22,7 @@
 
 -export([multilist/3]).
 -export([pread/5]).
+-export([list_assemblers/2]).
 
 -type offset() :: emqx_ft:offset().
 -type transfer() :: emqx_ft:transfer().
@@ -41,3 +42,8 @@ multilist(Nodes, Transfer, What) ->
     {ok, [filefrag()]} | {error, term()} | no_return().
 pread(Node, Transfer, Frag, Offset, Size) ->
     erpc:call(Node, emqx_ft_storage_fs_proxy, pread_local, [Transfer, Frag, Offset, Size]).
+
+-spec list_assemblers([node()], transfer()) ->
+    emqx_rpc:erpc_multicall([pid()]).
+list_assemblers(Nodes, Transfer) ->
+    erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, lookup_local_assembler, [Transfer]).

+ 95 - 15
apps/emqx_ft/test/emqx_ft_SUITE.erl

@@ -44,7 +44,8 @@ groups() ->
 group_cluster() ->
     [
         t_switch_node,
-        t_unreliable_migrating_client
+        t_unreliable_migrating_client,
+        t_concurrent_fins
     ].
 
 init_per_suite(Config) ->
@@ -549,21 +550,11 @@ t_unreliable_migrating_client(Config) ->
 
     Exports = list_files(?config(clientid, Config)),
 
-    % NOTE
-    % The cluster had 2 assemblers running on two different nodes, because client sent `fin`
-    % twice. This is currently expected, files must be identical anyway.
     Node1Str = atom_to_list(Node1),
-    NodeSelfStr = atom_to_list(NodeSelf),
     % TODO: this testcase is specific to local fs storage backend
     ?assertMatch(
-        [#{"node" := Node1Str}, #{"node" := NodeSelfStr}],
-        lists:map(
-            fun(#{uri := URIString}) ->
-                #{query := QS} = uri_string:parse(URIString),
-                maps:from_list(uri_string:dissect_query(QS))
-            end,
-            lists:sort(Exports)
-        )
+        [#{"node" := Node1Str}],
+        fs_exported_file_attributes(Exports)
     ),
 
     [
@@ -571,6 +562,84 @@ t_unreliable_migrating_client(Config) ->
      || Export <- Exports
     ].
 
+t_concurrent_fins(Config) ->
+    NodeSelf = node(),
+    [Node1, Node2] = ?config(cluster_nodes, Config),
+
+    ClientId = ?config(clientid, Config),
+    FileId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    Filename = "migratory-birds-in-southern-hemisphere-2013.pdf",
+    Filesize = 100,
+    Gen = emqx_ft_content_gen:new({{ClientId, FileId}, Filesize}, 16),
+    Payload = iolist_to_binary(emqx_ft_content_gen:consume(Gen, fun({Chunk, _, _}) -> Chunk end)),
+    Meta = meta(Filename, Payload),
+
+    %% Send filemeta and segments to Node1
+    Context0 = #{
+        clientid => ClientId,
+        fileid => FileId,
+        filesize => Filesize,
+        payload => Payload
+    },
+
+    Context1 = run_commands(
+        [
+            {fun connect_mqtt_client/2, [Node1]},
+            {fun send_filemeta/2, [Meta]},
+            {fun send_segment/3, [0, 100]},
+            {fun stop_mqtt_client/1, []}
+        ],
+        Context0
+    ),
+
+    %% Now send fins concurrently to the 3 nodes
+    Self = self(),
+    Nodes = [Node1, Node2, NodeSelf],
+    FinSenders = lists:map(
+        fun(Node) ->
+            %% takeovers and disconnects will happen due to concurrency
+            _ = erlang:process_flag(trap_exit, true),
+            _Context = run_commands(
+                [
+                    {fun connect_mqtt_client/2, [Node]},
+                    {fun send_finish/1, []}
+                ],
+                Context1
+            ),
+            Self ! {done, Node}
+        end,
+        Nodes
+    ),
+    ok = lists:foreach(
+        fun(F) ->
+            _Pid = spawn_link(F)
+        end,
+        FinSenders
+    ),
+    ok = lists:foreach(
+        fun(Node) ->
+            receive
+                {done, Node} -> ok
+            after 1000 ->
+                ct:fail("Node ~p did not send finish successfully", [Node])
+            end
+        end,
+        Nodes
+    ),
+
+    %% Only one node should have the file
+    Exports = list_files(?config(clientid, Config)),
+    ?assertMatch(
+        [#{"node" := _Node}],
+        fs_exported_file_attributes(Exports)
+    ).
+
+%%------------------------------------------------------------------------------
+%% Command helpers
+%%------------------------------------------------------------------------------
+
+%% Command runners
+
 run_commands(Commands, Context) ->
     lists:foldl(fun run_command/2, Context, Commands).
 
@@ -578,6 +647,8 @@ run_command({Command, Args}, Context) ->
     ct:pal("COMMAND ~p ~p", [erlang:fun_info(Command, name), Args]),
     erlang:apply(Command, Args ++ [Context]).
 
+%% Commands
+
 connect_mqtt_client(Node, ContextIn) ->
     Context = #{clientid := ClientId} = disown_mqtt_client(ContextIn),
     NodePort = emqx_ft_test_helpers:tcp_port(Node),
@@ -623,9 +694,18 @@ send_finish(Context = #{client := Client, fileid := FileId, filesize := Filesize
     ),
     Context.
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Helpers
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
+
+fs_exported_file_attributes(FSExports) ->
+    lists:map(
+        fun(#{uri := URIString}) ->
+            #{query := QS} = uri_string:parse(URIString),
+            maps:from_list(uri_string:dissect_query(QS))
+        end,
+        lists:sort(FSExports)
+    ).
 
 mk_init_topic(FileId) ->
     <<"$file/", FileId/binary, "/init">>.