Przeglądaj źródła

test(retainer): assert that retained messages are not lost when changing storage type

Thales Macedo Garitezi 3 lat temu
rodzic
commit
51ad27cb4b

+ 3 - 1
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -146,7 +146,9 @@ store_retained(_, Msg = #message{topic = Topic}) ->
                 reason => table_is_full
             });
         false ->
-            do_store_retained(Msg, Tokens, ExpiryTime)
+            do_store_retained(Msg, Tokens, ExpiryTime),
+            ?tp(message_retained, #{topic => Topic}),
+            ok
     end.
 
 clear_expired(_) ->

+ 48 - 11
apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl

@@ -31,7 +31,6 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    emqx_common_test_helpers:clear_screen(),
     application:load(emqx_conf),
     ok = ekka:start(),
     ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
@@ -104,11 +103,12 @@ t_messages(_) ->
     end,
 
     ?check_trace(
-        ?wait_async_action(
-            lists:foreach(Each, lists:seq(1, 5)),
-            #{?snk_kind := message_retained, topic := <<"retained/A">>},
-            500
-        ),
+        {ok, {ok, _}} =
+            ?wait_async_action(
+                lists:foreach(Each, lists:seq(1, 5)),
+                #{?snk_kind := message_retained, topic := <<"retained/A">>},
+                500
+            ),
         []
     ),
 
@@ -150,11 +150,12 @@ t_messages_page(_) ->
     end,
 
     ?check_trace(
-        ?wait_async_action(
-            lists:foreach(Each, lists:seq(1, 5)),
-            #{?snk_kind := message_retained, topic := <<"retained/A">>},
-            500
-        ),
+        {ok, {ok, _}} =
+            ?wait_async_action(
+                lists:foreach(Each, lists:seq(1, 5)),
+                #{?snk_kind := message_retained, topic := <<"retained/A">>},
+                500
+            ),
         []
     ),
     Page = 4,
@@ -238,6 +239,23 @@ t_change_storage_type(_Config) ->
     ?assertEqual(ram_copies, mnesia:table_info(?TAB_INDEX_META, storage_type)),
     ?assertEqual(ram_copies, mnesia:table_info(?TAB_MESSAGE, storage_type)),
     ?assertEqual(ram_copies, mnesia:table_info(?TAB_INDEX, storage_type)),
+    %% insert some retained messages
+    {ok, C0} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C0),
+    ok = snabbkaffe:start_trace(),
+    Topic = <<"retained">>,
+    Payload = <<"retained">>,
+    {ok, {ok, _}} =
+        ?wait_async_action(
+            emqtt:publish(C0, Topic, Payload, [{qos, 0}, {retain, true}]),
+            #{?snk_kind := message_retained, topic := Topic},
+            500
+        ),
+    emqtt:stop(C0),
+    ok = snabbkaffe:stop(),
+    {ok, MsgsJson0} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
+    #{data := Msgs0, meta := _} = decode_json(MsgsJson0),
+    ?assertEqual(1, length(Msgs0)),
 
     ChangedConf = emqx_map_lib:deep_merge(
         RawConf,
@@ -267,6 +285,25 @@ t_change_storage_type(_Config) ->
     ?assertEqual(disc_copies, mnesia:table_info(?TAB_INDEX_META, storage_type)),
     ?assertEqual(disc_copies, mnesia:table_info(?TAB_MESSAGE, storage_type)),
     ?assertEqual(disc_copies, mnesia:table_info(?TAB_INDEX, storage_type)),
+    %% keep retained messages
+    {ok, MsgsJson1} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
+    #{data := Msgs1, meta := _} = decode_json(MsgsJson1),
+    ?assertEqual(1, length(Msgs1)),
+    {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C1),
+    {ok, _, _} = emqtt:subscribe(C1, Topic),
+
+    receive
+        {publish, #{topic := T, payload := P, retain := R}} ->
+            ?assertEqual(Payload, P),
+            ?assertEqual(Topic, T),
+            ?assert(R),
+            ok
+    after 500 ->
+        emqtt:stop(C1),
+        ct:fail("should have preserved retained messages")
+    end,
+    emqtt:stop(C1),
 
     ok.