Pārlūkot izejas kodu

Merge pull request #9676 from thalesmg/fix-change-retainer-storage-type-v50

fix(retainer): change mnesia table storage types during update
Thales Macedo Garitezi 3 gadi atpakaļ
vecāks
revīzija
fa4f90d43e

+ 1 - 1
apps/emqx_retainer/src/emqx_retainer.app.src

@@ -2,7 +2,7 @@
 {application, emqx_retainer, [
     {description, "EMQX Retainer"},
     % strict semver, bump manually!
-    {vsn, "5.0.8"},
+    {vsn, "5.0.9"},
     {modules, []},
     {registered, [emqx_retainer_sup]},
     {applications, [kernel, stdlib, emqx]},

+ 15 - 8
apps/emqx_retainer/src/emqx_retainer.erl

@@ -321,16 +321,23 @@ update_config(
     OldConf
 ) ->
     #{
-        backend := BackendCfg,
+        backend := #{
+            type := BackendType,
+            storage_type := StorageType
+        },
         msg_clear_interval := ClearInterval
     } = NewConf,
 
-    #{backend := OldBackendCfg} = OldConf,
-
-    StorageType = maps:get(type, BackendCfg),
-    OldStrorageType = maps:get(type, OldBackendCfg),
-    case OldStrorageType of
-        StorageType ->
+    #{
+        backend := #{
+            type := OldBackendType,
+            storage_type := OldStorageType
+        }
+    } = OldConf,
+    SameBackendType = BackendType =:= OldBackendType,
+    SameStorageType = StorageType =:= OldStorageType,
+    case SameBackendType andalso SameStorageType of
+        true ->
             State#{
                 clear_timer := check_timer(
                     ClearTimer,
@@ -338,7 +345,7 @@ update_config(
                     clear_expired
                 )
             };
-        _ ->
+        false ->
             State2 = disable_retainer(State),
             enable_retainer(State2, NewConf)
     end.

+ 3 - 1
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -146,7 +146,9 @@ store_retained(_, Msg = #message{topic = Topic}) ->
                 reason => table_is_full
             });
         false ->
-            do_store_retained(Msg, Tokens, ExpiryTime)
+            do_store_retained(Msg, Tokens, ExpiryTime),
+            ?tp(message_retained, #{topic => Topic}),
+            ok
     end.
 
 clear_expired(_) ->

+ 98 - 10
apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl

@@ -103,11 +103,12 @@ t_messages(_) ->
     end,
 
     ?check_trace(
-        ?wait_async_action(
-            lists:foreach(Each, lists:seq(1, 5)),
-            #{?snk_kind := message_retained, topic := <<"retained/A">>},
-            500
-        ),
+        {ok, {ok, _}} =
+            ?wait_async_action(
+                lists:foreach(Each, lists:seq(1, 5)),
+                #{?snk_kind := message_retained, topic := <<"retained/A">>},
+                500
+            ),
         []
     ),
 
@@ -149,11 +150,12 @@ t_messages_page(_) ->
     end,
 
     ?check_trace(
-        ?wait_async_action(
-            lists:foreach(Each, lists:seq(1, 5)),
-            #{?snk_kind := message_retained, topic := <<"retained/A">>},
-            500
-        ),
+        {ok, {ok, _}} =
+            ?wait_async_action(
+                lists:foreach(Each, lists:seq(1, 5)),
+                #{?snk_kind := message_retained, topic := <<"retained/A">>},
+                500
+            ),
         []
     ),
     Page = 4,
@@ -219,6 +221,92 @@ t_lookup_and_delete(_) ->
 
     ok = emqtt:disconnect(C1).
 
+t_change_storage_type(_Config) ->
+    Path = api_path(["mqtt", "retainer"]),
+    {ok, ConfJson} = request_api(get, Path),
+    RawConf = emqx_json:decode(ConfJson, [return_maps]),
+    %% pre-conditions
+    ?assertMatch(
+        #{
+            <<"backend">> := #{
+                <<"type">> := <<"built_in_database">>,
+                <<"storage_type">> := <<"ram">>
+            },
+            <<"enable">> := true
+        },
+        RawConf
+    ),
+    ?assertEqual(ram_copies, mnesia:table_info(?TAB_INDEX_META, storage_type)),
+    ?assertEqual(ram_copies, mnesia:table_info(?TAB_MESSAGE, storage_type)),
+    ?assertEqual(ram_copies, mnesia:table_info(?TAB_INDEX, storage_type)),
+    %% insert some retained messages
+    {ok, C0} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C0),
+    ok = snabbkaffe:start_trace(),
+    Topic = <<"retained">>,
+    Payload = <<"retained">>,
+    {ok, {ok, _}} =
+        ?wait_async_action(
+            emqtt:publish(C0, Topic, Payload, [{qos, 0}, {retain, true}]),
+            #{?snk_kind := message_retained, topic := Topic},
+            500
+        ),
+    emqtt:stop(C0),
+    ok = snabbkaffe:stop(),
+    {ok, MsgsJson0} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
+    #{data := Msgs0, meta := _} = decode_json(MsgsJson0),
+    ?assertEqual(1, length(Msgs0)),
+
+    ChangedConf = emqx_map_lib:deep_merge(
+        RawConf,
+        #{
+            <<"backend">> =>
+                #{<<"storage_type">> => <<"disc">>}
+        }
+    ),
+    {ok, UpdateResJson} = request_api(
+        put,
+        Path,
+        [],
+        auth_header_(),
+        ChangedConf
+    ),
+    UpdatedRawConf = emqx_json:decode(UpdateResJson, [return_maps]),
+    ?assertMatch(
+        #{
+            <<"backend">> := #{
+                <<"type">> := <<"built_in_database">>,
+                <<"storage_type">> := <<"disc">>
+            },
+            <<"enable">> := true
+        },
+        UpdatedRawConf
+    ),
+    ?assertEqual(disc_copies, mnesia:table_info(?TAB_INDEX_META, storage_type)),
+    ?assertEqual(disc_copies, mnesia:table_info(?TAB_MESSAGE, storage_type)),
+    ?assertEqual(disc_copies, mnesia:table_info(?TAB_INDEX, storage_type)),
+    %% keep retained messages
+    {ok, MsgsJson1} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
+    #{data := Msgs1, meta := _} = decode_json(MsgsJson1),
+    ?assertEqual(1, length(Msgs1)),
+    {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C1),
+    {ok, _, _} = emqtt:subscribe(C1, Topic),
+
+    receive
+        {publish, #{topic := T, payload := P, retain := R}} ->
+            ?assertEqual(Payload, P),
+            ?assertEqual(Topic, T),
+            ?assert(R),
+            ok
+    after 500 ->
+        emqtt:stop(C1),
+        ct:fail("should have preserved retained messages")
+    end,
+    emqtt:stop(C1),
+
+    ok.
+
 %%--------------------------------------------------------------------
 %% HTTP Request
 %%--------------------------------------------------------------------

+ 2 - 0
changes/v5.0.14-en.md

@@ -16,3 +16,5 @@
 - Fix an issue where testing the GCP PubSub could leak memory, and an issue where its JWT token would fail to refresh a second time. [#9641](https://github.com/emqx/emqx/pull/9641)
 
 - Fix the problem of data loss and bad match when the MySQL driver is disconnected [#9638](https://github.com/emqx/emqx/pull/9638).
+
+- Fixed an issue where changing the storage type of the built-in database retainer would not take effect without restarting the node [#9676](https://github.com/emqx/emqx/pull/9676).

+ 2 - 0
changes/v5.0.14-zh.md

@@ -16,3 +16,5 @@
 - 修复了测试GCP PubSub可能泄露内存的问题,以及其JWT令牌第二次刷新失败的问题。 [#9640](https://github.com/emqx/emqx/pull/9640)
 
 - 修复 MySQL 驱动断开连接时出现的数据丢失和匹配错误的问题 [#9638](https://github.com/emqx/emqx/pull/9638)。
+
+- 修复了如果不重新启动节点,改变保留消息的存储类型将不会生效的问题 [#9676](https://github.com/emqx/emqx/pull/9676)。