Просмотр исходного кода

feat(dssubs): bring back implicit queue declaration

Also test both scenarios.
Andrew Mayorov 1 год назад
Родитель
Сommit
e288c8cc01

+ 14 - 5
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl

@@ -115,12 +115,21 @@ open(TopicSubscriptions, Opts) ->
 can_subscribe(_State, #share{group = Group, topic = Topic}, _SubOpts) ->
 can_subscribe(_State, #share{group = Group, topic = Topic}, _SubOpts) ->
     case ?dq_config(enable) of
     case ?dq_config(enable) of
         true ->
         true ->
-            case emqx_ds_shared_sub_queue:exists(Group, Topic) of
-                true ->
+            %% TODO: Weird to have side effects in function with this name.
+            TS = emqx_message:timestamp_now(),
+            case emqx_ds_shared_sub_queue:declare(Group, Topic, TS, _StartTime = TS) of
+                {ok, _} ->
                     ok;
                     ok;
-                false ->
-                    %% TODO: These refusals are logged as warnings.
-                    {error, ?RC_TOPIC_FILTER_INVALID}
+                exists ->
+                    ok;
+                {error, Class, Reason} ->
+                    ?tp(warning, "Shared queue declare failed", #{
+                        group => Group,
+                        topic => Topic,
+                        class => Class,
+                        reason => Reason
+                    }),
+                    {error, ?RC_UNSPECIFIED_ERROR}
             end;
             end;
         false ->
         false ->
             {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}
             {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}

+ 57 - 23
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl

@@ -13,7 +13,24 @@
 -include_lib("emqx/include/asserts.hrl").
 -include_lib("emqx/include/asserts.hrl").
 
 
 all() ->
 all() ->
-    emqx_common_test_helpers:all(?MODULE).
+    [{group, declare_explicit}, {group, declare_implicit}].
+
+groups() ->
+    TCs = emqx_common_test_helpers:all(?MODULE),
+    Groups = [declare_explicit, declare_implicit],
+    GroupTCs = [{Group, TC} || TC <- TCs, Group <- groups_per_testcase(TC, Groups)],
+    lists:foldl(
+        fun({Group, TC}, Acc) -> orddict:append(Group, TC, Acc) end,
+        [],
+        GroupTCs
+    ).
+
+groups_per_testcase(TC, Groups) ->
+    try
+        ?MODULE:TC(groups, Groups)
+    catch
+        error:_ -> Groups
+    end.
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
     Apps = emqx_cth_suite:start(
     Apps = emqx_cth_suite:start(
@@ -50,6 +67,12 @@ init_per_suite(Config) ->
     ),
     ),
     [{apps, Apps} | Config].
     [{apps, Apps} | Config].
 
 
+init_per_group(GroupName, Config) ->
+    [{queue_need_declare, GroupName =:= declare_explicit} | Config].
+
+end_per_group(_GroupName, _Config) ->
+    ok.
+
 end_per_suite(Config) ->
 end_per_suite(Config) ->
     ok = emqx_cth_suite:stop(?config(apps, Config)),
     ok = emqx_cth_suite:stop(?config(apps, Config)),
     ok.
     ok.
@@ -63,6 +86,12 @@ end_per_testcase(TC, Config) ->
     ok = emqx_ds_shared_sub_registry:purge(),
     ok = emqx_ds_shared_sub_registry:purge(),
     emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config).
     emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config).
 
 
+declare_queue_if_needed(Group, Topic, Config) ->
+    case proplists:get_value(queue_need_declare, Config) of
+        true -> declare_queue(Group, Topic, Config);
+        false -> Config
+    end.
+
 declare_queue(Group, Topic, Config) ->
 declare_queue(Group, Topic, Config) ->
     Now = emqx_message:timestamp_now(),
     Now = emqx_message:timestamp_now(),
     {ok, Queue} = emqx_ds_shared_sub_queue:declare(Group, Topic, Now, _StartTime = 0),
     {ok, Queue} = emqx_ds_shared_sub_queue:declare(Group, Topic, Now, _StartTime = 0),
@@ -76,10 +105,13 @@ declare_queue(Group, Topic, Config) ->
 destroy_queue(Config) ->
 destroy_queue(Config) ->
     Group = proplists:get_value(queue_group, Config),
     Group = proplists:get_value(queue_group, Config),
     Topic = proplists:get_value(queue_topic, Config),
     Topic = proplists:get_value(queue_topic, Config),
-    emqx_ds_shared_sub_queue:destroy(Group, Topic).
+    case Group of
+        undefined -> ok;
+        _ -> emqx_ds_shared_sub_queue:destroy(Group, Topic)
+    end.
 
 
 t_lease_initial('init', Config) ->
 t_lease_initial('init', Config) ->
-    declare_queue(<<"gr1">>, <<"topic1/#">>, Config);
+    declare_queue_if_needed(<<"gr1">>, <<"topic1/#">>, Config);
 t_lease_initial('end', Config) ->
 t_lease_initial('end', Config) ->
     destroy_queue(Config).
     destroy_queue(Config).
 
 
@@ -99,6 +131,8 @@ t_lease_initial(_Config) ->
     ok = emqtt:disconnect(ConnShared),
     ok = emqtt:disconnect(ConnShared),
     ok = emqtt:disconnect(ConnPub).
     ok = emqtt:disconnect(ConnPub).
 
 
+t_declare_triggers_persistence(groups, _Groups) ->
+    [declare_explicit];
 t_declare_triggers_persistence('init', Config) ->
 t_declare_triggers_persistence('init', Config) ->
     declare_queue(<<"dtp">>, <<"topic1/#">>, Config);
     declare_queue(<<"dtp">>, <<"topic1/#">>, Config);
 t_declare_triggers_persistence('end', Config) ->
 t_declare_triggers_persistence('end', Config) ->
@@ -128,7 +162,7 @@ t_declare_triggers_persistence(_Config) ->
     ok = emqtt:disconnect(ConnPub).
     ok = emqtt:disconnect(ConnPub).
 
 
 t_destroy_queue_live_clients('init', Config) ->
 t_destroy_queue_live_clients('init', Config) ->
-    declare_queue(<<"dqlc">>, <<"t1337/#">>, Config);
+    declare_queue_if_needed(<<"dqlc">>, <<"t1337/#">>, Config);
 t_destroy_queue_live_clients('end', Config) ->
 t_destroy_queue_live_clients('end', Config) ->
     destroy_queue(Config).
     destroy_queue(Config).
 
 
@@ -145,24 +179,24 @@ t_destroy_queue_live_clients(Config) ->
     {ok, _} = emqtt:publish(ConnPub, <<"t1337/3/4">>, <<"hello3">>, 1),
     {ok, _} = emqtt:publish(ConnPub, <<"t1337/3/4">>, <<"hello3">>, 1),
     {ok, _} = emqtt:publish(ConnPub, <<"t1337/5/6">>, <<"hello4">>, 1),
     {ok, _} = emqtt:publish(ConnPub, <<"t1337/5/6">>, <<"hello4">>, 1),
 
 
-    ?assertReceive({publish, #{payload := <<"hello1">>}}, 5_000),
-    ?assertReceive({publish, #{payload := <<"hello2">>}}, 1_000),
-    ?assertReceive({publish, #{payload := <<"hello3">>}}, 1_000),
-    ?assertReceive({publish, #{payload := <<"hello4">>}}, 1_000),
+    ?assertReceive({publish, #{payload := <<"hello1">>}}, 2_000),
+    ?assertReceive({publish, #{payload := <<"hello2">>}}, 2_000),
+    ?assertReceive({publish, #{payload := <<"hello3">>}}, 2_000),
+    ?assertReceive({publish, #{payload := <<"hello4">>}}, 2_000),
 
 
-    ok = destroy_queue(Config),
+    ok = emqx_ds_shared_sub_queue:destroy(<<"dqlc">>, <<"t1337/#">>),
 
 
-    %% No more published after the queue was destroyed.
+    %% No more publishes after the queue was destroyed.
     {ok, _} = emqtt:publish(ConnPub, <<"t1337/1">>, <<"hello5">>, 1),
     {ok, _} = emqtt:publish(ConnPub, <<"t1337/1">>, <<"hello5">>, 1),
     {ok, _} = emqtt:publish(ConnPub, <<"t1337/2">>, <<"hello6">>, 1),
     {ok, _} = emqtt:publish(ConnPub, <<"t1337/2">>, <<"hello6">>, 1),
-    ?assertNotReceive({publish, #{payload := _}}, 5_000),
+    ?assertNotReceive({publish, #{payload := _}}, 2_000),
 
 
     ok = emqtt:disconnect(ConnShared1),
     ok = emqtt:disconnect(ConnShared1),
     ok = emqtt:disconnect(ConnShared2),
     ok = emqtt:disconnect(ConnShared2),
     ok = emqtt:disconnect(ConnPub).
     ok = emqtt:disconnect(ConnPub).
 
 
 t_two_clients('init', Config) ->
 t_two_clients('init', Config) ->
-    declare_queue(<<"gr4">>, <<"topic4/#">>, Config);
+    declare_queue_if_needed(<<"gr4">>, <<"topic4/#">>, Config);
 t_two_clients('end', Config) ->
 t_two_clients('end', Config) ->
     destroy_queue(Config).
     destroy_queue(Config).
 
 
@@ -191,7 +225,7 @@ t_two_clients(_Config) ->
     ok = emqtt:disconnect(ConnPub).
     ok = emqtt:disconnect(ConnPub).
 
 
 t_client_loss('init', Config) ->
 t_client_loss('init', Config) ->
-    declare_queue(<<"gr5">>, <<"topic5/#">>, Config);
+    declare_queue_if_needed(<<"gr5">>, <<"topic5/#">>, Config);
 t_client_loss('end', Config) ->
 t_client_loss('end', Config) ->
     destroy_queue(Config).
     destroy_queue(Config).
 
 
@@ -220,7 +254,7 @@ t_client_loss(_Config) ->
     ok = emqtt:disconnect(ConnPub).
     ok = emqtt:disconnect(ConnPub).
 
 
 t_stream_revoke('init', Config) ->
 t_stream_revoke('init', Config) ->
-    declare_queue(<<"gr6">>, <<"topic6/#">>, Config);
+    declare_queue_if_needed(<<"gr6">>, <<"topic6/#">>, Config);
 t_stream_revoke('end', Config) ->
 t_stream_revoke('end', Config) ->
     destroy_queue(Config).
     destroy_queue(Config).
 
 
@@ -259,7 +293,7 @@ t_stream_revoke(_Config) ->
     ok = emqtt:disconnect(ConnPub).
     ok = emqtt:disconnect(ConnPub).
 
 
 t_graceful_disconnect('init', Config) ->
 t_graceful_disconnect('init', Config) ->
-    declare_queue(<<"gr4">>, <<"topic7/#">>, Config);
+    declare_queue_if_needed(<<"gr4">>, <<"topic7/#">>, Config);
 t_graceful_disconnect('end', Config) ->
 t_graceful_disconnect('end', Config) ->
     destroy_queue(Config).
     destroy_queue(Config).
 
 
@@ -296,7 +330,7 @@ t_graceful_disconnect(_Config) ->
     ok = emqtt:disconnect(ConnPub).
     ok = emqtt:disconnect(ConnPub).
 
 
 t_leader_state_preserved('init', Config) ->
 t_leader_state_preserved('init', Config) ->
-    declare_queue(<<"lsp">>, <<"topic42/#">>, Config);
+    declare_queue_if_needed(<<"lsp">>, <<"topic42/#">>, Config);
 t_leader_state_preserved('end', Config) ->
 t_leader_state_preserved('end', Config) ->
     destroy_queue(Config).
     destroy_queue(Config).
 
 
@@ -339,7 +373,7 @@ t_leader_state_preserved(_Config) ->
     ).
     ).
 
 
 t_intensive_reassign('init', Config) ->
 t_intensive_reassign('init', Config) ->
-    declare_queue(<<"gr8">>, <<"topic8/#">>, Config);
+    declare_queue_if_needed(<<"gr8">>, <<"topic8/#">>, Config);
 t_intensive_reassign('end', Config) ->
 t_intensive_reassign('end', Config) ->
     destroy_queue(Config).
     destroy_queue(Config).
 
 
@@ -392,7 +426,7 @@ t_intensive_reassign(_Config) ->
     ok = emqtt:disconnect(ConnPub).
     ok = emqtt:disconnect(ConnPub).
 
 
 t_unsubscribe('init', Config) ->
 t_unsubscribe('init', Config) ->
-    declare_queue(<<"gr9">>, <<"topic9/#">>, Config);
+    declare_queue_if_needed(<<"gr9">>, <<"topic9/#">>, Config);
 t_unsubscribe('end', Config) ->
 t_unsubscribe('end', Config) ->
     destroy_queue(Config).
     destroy_queue(Config).
 
 
@@ -442,7 +476,7 @@ t_unsubscribe(_Config) ->
     ok = emqtt:disconnect(ConnPub).
     ok = emqtt:disconnect(ConnPub).
 
 
 t_quick_resubscribe('init', Config) ->
 t_quick_resubscribe('init', Config) ->
-    declare_queue(<<"gr10">>, <<"topic10/#">>, Config);
+    declare_queue_if_needed(<<"gr10">>, <<"topic10/#">>, Config);
 t_quick_resubscribe('end', Config) ->
 t_quick_resubscribe('end', Config) ->
     destroy_queue(Config).
     destroy_queue(Config).
 
 
@@ -499,7 +533,7 @@ t_quick_resubscribe(_Config) ->
     ok = emqtt:disconnect(ConnPub).
     ok = emqtt:disconnect(ConnPub).
 
 
 t_disconnect_no_double_replay1('init', Config) ->
 t_disconnect_no_double_replay1('init', Config) ->
-    declare_queue(<<"gr11">>, <<"topic11/#">>, Config);
+    declare_queue_if_needed(<<"gr11">>, <<"topic11/#">>, Config);
 t_disconnect_no_double_replay1('end', Config) ->
 t_disconnect_no_double_replay1('end', Config) ->
     destroy_queue(Config).
     destroy_queue(Config).
 
 
@@ -552,7 +586,7 @@ t_disconnect_no_double_replay1(_Config) ->
     ok = emqtt:disconnect(ConnPub).
     ok = emqtt:disconnect(ConnPub).
 
 
 t_disconnect_no_double_replay2('init', Config) ->
 t_disconnect_no_double_replay2('init', Config) ->
-    declare_queue(<<"gr12">>, <<"topic12/#">>, Config);
+    declare_queue_if_needed(<<"gr12">>, <<"topic12/#">>, Config);
 t_disconnect_no_double_replay2('end', Config) ->
 t_disconnect_no_double_replay2('end', Config) ->
     destroy_queue(Config).
     destroy_queue(Config).
 
 
@@ -589,7 +623,7 @@ t_disconnect_no_double_replay2(_Config) ->
     ok = emqtt:disconnect(ConnPub).
     ok = emqtt:disconnect(ConnPub).
 
 
 t_lease_reconnect('init', Config) ->
 t_lease_reconnect('init', Config) ->
-    declare_queue(<<"gr2">>, <<"topic2/#">>, Config);
+    declare_queue_if_needed(<<"gr2">>, <<"topic2/#">>, Config);
 t_lease_reconnect('end', Config) ->
 t_lease_reconnect('end', Config) ->
     meck:unload(),
     meck:unload(),
     destroy_queue(Config).
     destroy_queue(Config).
@@ -627,7 +661,7 @@ t_lease_reconnect(_Config) ->
     ok = emqtt:disconnect(ConnPub).
     ok = emqtt:disconnect(ConnPub).
 
 
 t_renew_lease_timeout('init', Config) ->
 t_renew_lease_timeout('init', Config) ->
-    declare_queue(<<"gr3">>, <<"topic3/#">>, Config);
+    declare_queue_if_needed(<<"gr3">>, <<"topic3/#">>, Config);
 t_renew_lease_timeout('end', Config) ->
 t_renew_lease_timeout('end', Config) ->
     destroy_queue(Config).
     destroy_queue(Config).