Sfoglia il codice sorgente

chore(ft): add tests for async reply registry

Ilya Averyanov 2 anni fa
parent
commit
b8cacd2833

+ 1 - 1
apps/emqx/test/emqx_connection_SUITE.erl

@@ -49,7 +49,7 @@ init_per_suite(Config) ->
     %% Meck Hooks
     ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]),
     ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
-    ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> {ok, Acc} end),
+    ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> Acc end),
 
     ok = meck:expect(emqx_channel, ensure_disconnected, fun(_, Channel) -> Channel end),
 

+ 1 - 1
apps/emqx_ft/src/emqx_ft_app.erl

@@ -22,7 +22,7 @@
 
 start(_StartType, _StartArgs) ->
     {ok, Sup} = emqx_ft_sup:start_link(),
-    ok = emqx_ft_async_reply:create_table(),
+    ok = emqx_ft_async_reply:create_tables(),
     ok = emqx_ft_conf:load(),
     {ok, Sup}.
 

+ 8 - 7
apps/emqx_ft/src/emqx_ft_async_reply.erl

@@ -21,7 +21,8 @@
 -include_lib("stdlib/include/ms_transform.hrl").
 
 -export([
-    create_tables/0
+    create_tables/0,
+    info/0
 ]).
 
 -export([
@@ -38,9 +39,9 @@
 
 %% packets waiting for async workers
 
--define(WORKER_TAB, emqx_ft_async_mons).
--define(WORKER_KEY(MRef), ?WORKER_KEY(self(), MRef)).
--define(WORKER_KEY(ChannelPid, MRef), {ChannelPid, MRef}).
+-define(MON_TAB, emqx_ft_async_mons).
+-define(MON_KEY(MRef), ?MON_KEY(self(), MRef)).
+-define(MON_KEY(ChannelPid, MRef), {ChannelPid, MRef}).
 
 %% async worker monitors by packet ids
 
@@ -54,14 +55,14 @@
 
 -spec create_tables() -> ok.
 create_tables() ->
-    _ = ets:new(?WORKER_TAB, [named_table, public, ordered_set]),
+    _ = ets:new(?MON_TAB, [named_table, public, ordered_set]),
     _ = ets:new(?PACKET_TAB, [named_table, public, ordered_set]),
     ok.
 
 -spec register(packet_id(), mon_ref(), timer_ref()) -> ok.
 register(PacketId, MRef, TRef) ->
     _ = ets:insert(?PACKET_TAB, {?PACKET_KEY(PacketId), MRef}),
-    _ = ets:insert(?WORKER_TAB, {?WORKER_KEY(MRef), PacketId, TRef}),
+    _ = ets:insert(?MON_TAB, {?MON_KEY(MRef), PacketId, TRef}),
     ok.
 
 -spec with_new_packet(packet_id(), fun(() -> any()), any()) -> any().
@@ -73,7 +74,7 @@ with_new_packet(PacketId, Fun, Default) ->
 
 -spec take_by_mref(mon_ref()) -> {ok, packet_id(), timer_ref()} | not_found.
 take_by_mref(MRef) ->
-    case ets:take(?WORKER_TAB, ?WORKER_KEY(MRef)) of
+    case ets:take(?MON_TAB, ?MON_KEY(MRef)) of
         [{_, PacketId, TRef}] ->
             _ = ets:delete(?PACKET_TAB, ?PACKET_KEY(PacketId)),
             {ok, PacketId, TRef};

+ 247 - 0
apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl

@@ -0,0 +1,247 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ft_async_reply_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("stdlib/include/assert.hrl").
+-include_lib("emqx/include/asserts.hrl").
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx, #{override_env => [{boot_modules, [broker, listeners]}]}},
+            {emqx_ft, "file_transfer { enable = true, assemble_timeout = 1s }"}
+        ],
+        #{work_dir => ?config(priv_dir, Config)}
+    ),
+    [{suite_apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(suite_apps, Config)),
+    ok.
+
+init_per_testcase(_Case, Config) ->
+    ok = snabbkaffe:start_trace(),
+    Config.
+
+end_per_testcase(_Case, _Config) ->
+    ok = snabbkaffe:stop(),
+    ok.
+
+%%--------------------------------------------------------------------
+%% Tests
+%%--------------------------------------------------------------------
+
+t_register(_Config) ->
+    PacketId = 1,
+    MRef = make_ref(),
+    TRef = make_ref(),
+    ok = emqx_ft_async_reply:register(PacketId, MRef, TRef),
+
+    ?assertEqual(
+        undefined,
+        emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
+    ),
+
+    ?assertEqual(
+        ok,
+        emqx_ft_async_reply:with_new_packet(2, fun() -> ok end, undefined)
+    ),
+
+    ?assertEqual(
+        {ok, PacketId, TRef},
+        emqx_ft_async_reply:take_by_mref(MRef)
+    ).
+
+t_process_independence(_Config) ->
+    PacketId = 1,
+    MRef = make_ref(),
+    TRef = make_ref(),
+    ok = emqx_ft_async_reply:register(PacketId, MRef, TRef),
+
+    Self = self(),
+
+    spawn_link(fun() ->
+        Self ! emqx_ft_async_reply:take_by_mref(MRef)
+    end),
+
+    Res1 =
+        receive
+            Msg1 -> Msg1
+        end,
+
+    ?assertEqual(
+        not_found,
+        Res1
+    ),
+
+    spawn_link(fun() ->
+        Self ! emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
+    end),
+
+    Res2 =
+        receive
+            Msg2 -> Msg2
+        end,
+
+    ?assertEqual(
+        ok,
+        Res2
+    ).
+
+t_take(_Config) ->
+    PacketId = 1,
+    MRef = make_ref(),
+    TRef = make_ref(),
+    ok = emqx_ft_async_reply:register(PacketId, MRef, TRef),
+
+    ?assertEqual(
+        {ok, PacketId, TRef},
+        emqx_ft_async_reply:take_by_mref(MRef)
+    ),
+
+    ?assertEqual(
+        not_found,
+        emqx_ft_async_reply:take_by_mref(MRef)
+    ),
+
+    ?assertEqual(
+        ok,
+        emqx_ft_async_reply:with_new_packet(2, fun() -> ok end, undefined)
+    ).
+
+t_cleanup(_Config) ->
+    PacketId = 1,
+    MRef0 = make_ref(),
+    TRef0 = make_ref(),
+    MRef1 = make_ref(),
+    TRef1 = make_ref(),
+    ok = emqx_ft_async_reply:register(PacketId, MRef0, TRef0),
+
+    Self = self(),
+
+    Pid = spawn_link(fun() ->
+        ok = emqx_ft_async_reply:register(PacketId, MRef1, TRef1),
+        receive
+            kickoff ->
+                ?assertEqual(
+                    undefined,
+                    emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
+                ),
+
+                ?assertEqual(
+                    {ok, PacketId, TRef1},
+                    emqx_ft_async_reply:take_by_mref(MRef1)
+                ),
+
+                Self ! done
+        end
+    end),
+
+    ?assertEqual(
+        undefined,
+        emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
+    ),
+
+    ok = emqx_ft_async_reply:deregister_all(Self),
+
+    ?assertEqual(
+        ok,
+        emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined)
+    ),
+
+    Pid ! kickoff,
+
+    receive
+        done -> ok
+    end.
+
+t_reply_by_tiemout(_Config) ->
+    process_flag(trap_exit, true),
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    C = emqx_ft_test_helpers:start_client(ClientId, node()),
+
+    SleepForever = fun() ->
+        Ref = make_ref(),
+        receive
+            Ref -> ok
+        end
+    end,
+
+    ok = meck:new(emqx_ft_storage, [passthrough]),
+    meck:expect(emqx_ft_storage, assemble, fun(_, _, _) -> {async, spawn_link(SleepForever)} end),
+
+    FinTopic = <<"$file/fakeid/fin/999999">>,
+
+    ?assertMatch(
+        {ok, #{reason_code_name := unspecified_error}},
+        emqtt:publish(C, FinTopic, <<>>, 1)
+    ),
+
+    meck:unload(emqx_ft_storage),
+    emqtt:stop(C).
+
+t_cleanup_by_cm(_Config) ->
+    process_flag(trap_exit, true),
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    C = emqx_ft_test_helpers:start_client(ClientId, node()),
+
+    ok = meck:new(emqx_ft_storage, [passthrough]),
+    meck:expect(emqx_ft_storage, kickoff, fun(_) -> meck:exception(error, oops) end),
+
+    FinTopic = <<"$file/fakeid/fin/999999">>,
+
+    [ClientPid] = emqx_cm:lookup_channels(ClientId),
+
+    ?assertWaitEvent(
+        begin
+            emqtt:publish(C, FinTopic, <<>>, 1),
+            exit(ClientPid, kill)
+        end,
+        #{?snk_kind := emqx_cm_clean_down, client_id := ClientId},
+        1000
+    ),
+
+    ?assertEqual(
+        {0, 0},
+        emqx_ft_async_reply:info()
+    ),
+
+    meck:unload(emqx_ft_storage).
+
+t_unrelated_events(_Config) ->
+    process_flag(trap_exit, true),
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    C = emqx_ft_test_helpers:start_client(ClientId, node()),
+    [ClientPid] = emqx_cm:lookup_channels(ClientId),
+
+    erlang:monitor(process, ClientPid),
+
+    ClientPid ! {'DOWN', make_ref(), process, self(), normal},
+    ClientPid ! {timeout, make_ref(), unknown_timer_event},
+
+    ?assertNotReceive(
+        {'DOWN', _Ref, process, ClientPid, _Reason},
+        500
+    ),
+
+    emqtt:stop(C).