Просмотр исходного кода

refactor: address review comments

Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
59b94788bf

+ 1 - 1
apps/emqx/include/emqx.hrl

@@ -23,7 +23,7 @@
 -define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
 -define(CM_SHARD, emqx_cm_shard).
 -define(ROUTE_SHARD, route_shard).
--define(PS_ROUTER_SHARD, ps_router_shard).
+-define(PS_ROUTER_SHARD, persistent_session_router_shard).
 
 %% Banner
 %%--------------------------------------------------------------------

+ 20 - 20
apps/emqx/integration_test/emqx_ds_SUITE.erl

@@ -144,13 +144,13 @@ t_non_persistent_session_subscription(_Config) ->
     SubTopicFilter = <<"t/#">>,
     ?check_trace(
         begin
-            ct:pal("starting"),
+            ?tp(notice, "starting", #{}),
             Client = start_client(#{
                 clientid => ClientId,
                 properties => #{'Session-Expiry-Interval' => 0}
             }),
             {ok, _} = emqtt:connect(Client),
-            ct:pal("subscribing"),
+            ?tp(notice, "subscribing", #{}),
             {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client, SubTopicFilter, qos2),
             IteratorRefs = get_all_iterator_refs(node()),
             IteratorIds = get_all_iterator_ids(node()),
@@ -198,7 +198,7 @@ t_session_subscription_idempotency(Config) ->
 
             spawn_link(fun() ->
                 ?tp(will_restart_node, #{}),
-                ct:pal("restarting node ~p", [Node1]),
+                ?tp(notice, "restarting node", #{node => Node1}),
                 true = monitor_node(Node1, true),
                 ok = erpc:call(Node1, init, restart, []),
                 receive
@@ -207,10 +207,10 @@ t_session_subscription_idempotency(Config) ->
                 after 10_000 ->
                     ct:fail("node ~p didn't stop", [Node1])
                 end,
-                ct:pal("waiting for nodeup ~p", [Node1]),
+                ?tp(notice, "waiting for nodeup", #{node => Node1}),
                 wait_nodeup(Node1),
                 wait_gen_rpc_down(Node1Spec),
-                ct:pal("restarting apps on ~p", [Node1]),
+                ?tp(notice, "restarting apps", #{node => Node1}),
                 Apps = maps:get(apps, Node1Spec),
                 ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]),
                 _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]),
@@ -218,15 +218,15 @@ t_session_subscription_idempotency(Config) ->
                 %% end....
                 ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec),
                 ok = snabbkaffe:forward_trace(Node1),
-                ct:pal("node ~p restarted", [Node1]),
+                ?tp(notice, "node restarted", #{node => Node1}),
                 ?tp(restarted_node, #{}),
                 ok
             end),
 
-            ct:pal("starting 1"),
+            ?tp(notice, "starting 1", #{}),
             Client0 = start_client(#{port => Port, clientid => ClientId}),
             {ok, _} = emqtt:connect(Client0),
-            ct:pal("subscribing 1"),
+            ?tp(notice, "subscribing 1", #{}),
             process_flag(trap_exit, true),
             catch emqtt:subscribe(Client0, SubTopicFilter, qos2),
             receive
@@ -237,10 +237,10 @@ t_session_subscription_idempotency(Config) ->
             process_flag(trap_exit, false),
 
             {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000),
-            ct:pal("starting 2"),
+            ?tp(notice, "starting 2", #{}),
             Client1 = start_client(#{port => Port, clientid => ClientId}),
             {ok, _} = emqtt:connect(Client1),
-            ct:pal("subscribing 2"),
+            ?tp(notice, "subscribing 2", #{}),
             {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
 
             ok = emqtt:stop(Client1),
@@ -286,7 +286,7 @@ t_session_unsubscription_idempotency(Config) ->
 
             spawn_link(fun() ->
                 ?tp(will_restart_node, #{}),
-                ct:pal("restarting node ~p", [Node1]),
+                ?tp(notice, "restarting node", #{node => Node1}),
                 true = monitor_node(Node1, true),
                 ok = erpc:call(Node1, init, restart, []),
                 receive
@@ -295,10 +295,10 @@ t_session_unsubscription_idempotency(Config) ->
                 after 10_000 ->
                     ct:fail("node ~p didn't stop", [Node1])
                 end,
-                ct:pal("waiting for nodeup ~p", [Node1]),
+                ?tp(notice, "waiting for nodeup", #{node => Node1}),
                 wait_nodeup(Node1),
                 wait_gen_rpc_down(Node1Spec),
-                ct:pal("restarting apps on ~p", [Node1]),
+                ?tp(notice, "restarting apps", #{node => Node1}),
                 Apps = maps:get(apps, Node1Spec),
                 ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]),
                 _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]),
@@ -306,17 +306,17 @@ t_session_unsubscription_idempotency(Config) ->
                 %% end....
                 ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec),
                 ok = snabbkaffe:forward_trace(Node1),
-                ct:pal("node ~p restarted", [Node1]),
+                ?tp(notice, "node restarted", #{node => Node1}),
                 ?tp(restarted_node, #{}),
                 ok
             end),
 
-            ct:pal("starting 1"),
+            ?tp(notice, "starting 1", #{}),
             Client0 = start_client(#{port => Port, clientid => ClientId}),
             {ok, _} = emqtt:connect(Client0),
-            ct:pal("subscribing 1"),
+            ?tp(notice, "subscribing 1", #{}),
             {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, SubTopicFilter, qos2),
-            ct:pal("unsubscribing 1"),
+            ?tp(notice, "unsubscribing 1", #{}),
             process_flag(trap_exit, true),
             catch emqtt:unsubscribe(Client0, SubTopicFilter),
             receive
@@ -327,12 +327,12 @@ t_session_unsubscription_idempotency(Config) ->
             process_flag(trap_exit, false),
 
             {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000),
-            ct:pal("starting 2"),
+            ?tp(notice, "starting 2", #{}),
             Client1 = start_client(#{port => Port, clientid => ClientId}),
             {ok, _} = emqtt:connect(Client1),
-            ct:pal("subscribing 2"),
+            ?tp(notice, "subscribing 2", #{}),
             {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2),
-            ct:pal("unsubscribing 2"),
+            ?tp(notice, "unsubscribing 2", #{}),
             {{ok, _, [?RC_SUCCESS]}, {ok, _}} =
                 ?wait_async_action(
                     emqtt:unsubscribe(Client1, SubTopicFilter),

+ 2 - 4
apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -19,14 +19,12 @@
 -define(PS_ROUTER_TAB, emqx_ds_ps_router).
 -define(PS_FILTERS_TAB, emqx_ds_ps_filters).
 
--type dest() :: emqx_ds:session_id().
-
 -record(ps_route, {
     topic :: binary(),
     dest :: emqx_ds:session_id()
 }).
 -record(ps_routeidx, {
-    entry :: emqx_topic_index:key(dest()),
+    entry :: emqx_topic_index:key(emqx_persistent_session_ds_router:dest()),
     unused = [] :: nil()
 }).
 

+ 4 - 0
apps/emqx/src/emqx_persistent_session_ds_router.erl

@@ -39,6 +39,10 @@
 -export([has_route/2]).
 -endif.
 
+-type dest() :: emqx_ds:session_id().
+
+-export_type([dest/0]).
+
 %%--------------------------------------------------------------------
 %% Table Initialization
 %%--------------------------------------------------------------------

+ 18 - 17
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -24,6 +24,8 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-import(emqx_common_test_helpers, [on_exit/1]).
+
 -define(DS_SHARD, <<"local">>).
 
 all() ->
@@ -34,33 +36,31 @@ init_per_suite(Config) ->
     %% TODO: remove after other suites start to use `emx_cth_suite'
     application:stop(emqx),
     application:stop(emqx_durable_storage),
-    TCApps = emqx_cth_suite:start(
-        app_specs(),
-        #{work_dir => emqx_cth_suite:work_dir(Config)}
-    ),
-    [{tc_apps, TCApps} | Config].
+    Config.
 
-end_per_suite(Config) ->
-    TCApps = ?config(tc_apps, Config),
-    emqx_cth_suite:stop(TCApps),
+end_per_suite(_Config) ->
     ok.
 
-init_per_testcase(t_session_subscription_iterators, Config) ->
+init_per_testcase(t_session_subscription_iterators = TestCase, Config) ->
     Cluster = cluster(),
-    Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => ?config(priv_dir, Config)}),
+    Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}),
     [{nodes, Nodes} | Config];
-init_per_testcase(_TestCase, Config) ->
-    Config.
+init_per_testcase(TestCase, Config) ->
+    Apps = emqx_cth_suite:start(
+        app_specs(),
+        #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
+    ),
+    [{apps, Apps} | Config].
 
 end_per_testcase(t_session_subscription_iterators, Config) ->
     Nodes = ?config(nodes, Config),
+    emqx_common_test_helpers:call_janitor(60_000),
     ok = emqx_cth_cluster:stop(Nodes),
-    ok = emqx_ds_storage_layer:discard_iterator_prefix(?DS_SHARD, _Prefix = <<>>),
-    delete_all_messages(),
     ok;
-end_per_testcase(_TestCase, _Config) ->
-    ok = emqx_ds_storage_layer:discard_iterator_prefix(?DS_SHARD, _Prefix = <<>>),
-    delete_all_messages(),
+end_per_testcase(_TestCase, Config) ->
+    Apps = ?config(apps, Config),
+    emqx_common_test_helpers:call_janitor(60_000),
+    emqx_cth_suite:stop(Apps),
     ok.
 
 t_messages_persisted(_Config) ->
@@ -256,6 +256,7 @@ connect(Opts0 = #{}) ->
     Defaults = #{proto_ver => v5},
     Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)),
     {ok, Client} = emqtt:start_link(Opts),
+    on_exit(fun() -> catch emqtt:stop(Client) end),
     {ok, _} = emqtt:connect(Client),
     Client.
 

+ 1 - 1
apps/emqx/test/emqx_persistent_session_ds_router_SUITE.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.