Przeglądaj źródła

Merge pull request #11281 from thalesmg/restore-queue-shared-sub-r51-20230717

fix(shared_sub): restore support for `$queue/` shared subscription
Thales Macedo Garitezi 2 lat temu
rodzic
commit
4ec4041f99

+ 4 - 1
apps/emqx/include/emqx.hrl

@@ -33,12 +33,15 @@
 -define(ERTS_MINIMUM_REQUIRED, "10.0").
 
 %%--------------------------------------------------------------------
-%% Topics' prefix: $SYS | $share
+%% Topics' prefix: $SYS | $queue | $share
 %%--------------------------------------------------------------------
 
 %% System topic
 -define(SYSTOP, <<"$SYS/">>).
 
+%% Queue topic
+-define(QUEUE, <<"$queue/">>).
+
 %%--------------------------------------------------------------------
 %% alarms
 %%--------------------------------------------------------------------

+ 4 - 0
apps/emqx/src/emqx_topic.erl

@@ -244,8 +244,12 @@ parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
     parse(TopicFilter, Options).
 
 -spec parse(topic(), map()) -> {topic(), map()}.
+parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
+    error({invalid_topic_filter, TopicFilter});
 parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
     error({invalid_topic_filter, TopicFilter});
+parse(<<"$queue/", TopicFilter/binary>>, Options) ->
+    parse(TopicFilter, Options#{share => <<"$queue">>});
 parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
     case binary:split(Rest, <<"/">>) of
         [_Any] ->

+ 1 - 1
apps/emqx/test/emqx_proper_types.erl

@@ -444,7 +444,7 @@ systopic_mon() ->
 sharetopic() ->
     ?LET(
         {Type, Grp, T},
-        {oneof([<<"$share">>]), list(latin_char()), normal_topic()},
+        {oneof([<<"$queue">>, <<"$share">>]), list(latin_char()), normal_topic()},
         <<Type/binary, "/", (iolist_to_binary(Grp))/binary, "/", T/binary>>
     ).
 

+ 108 - 0
apps/emqx/test/emqx_shared_sub_SUITE.erl

@@ -20,8 +20,10 @@
 -compile(nowarn_export_all).
 
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(SUITE, ?MODULE).
 
@@ -986,6 +988,112 @@ t_session_kicked(Config) when is_list(Config) ->
     ?assertEqual([], collect_msgs(0)),
     ok.
 
+%% FIXME: currently doesn't work
+%% t_different_groups_same_topic({init, Config}) ->
+%%     TestName = atom_to_binary(?FUNCTION_NAME),
+%%     ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
+%%     {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
+%%     {ok, _} = emqtt:connect(C),
+%%     [{client, C}, {clientid, ClientId} | Config];
+%% t_different_groups_same_topic({'end', Config}) ->
+%%     C = ?config(client, Config),
+%%     emqtt:stop(C),
+%%     ok;
+%% t_different_groups_same_topic(Config) when is_list(Config) ->
+%%     C = ?config(client, Config),
+%%     ClientId = ?config(clientid, Config),
+%%     %% Subscribe and unsubscribe to both $queue and $shared topics
+%%     Topic = <<"t/1">>,
+%%     SharedTopic0 = <<"$share/aa/", Topic/binary>>,
+%%     SharedTopic1 = <<"$share/bb/", Topic/binary>>,
+%%     {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic0, 2}),
+%%     {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic1, 2}),
+
+%%     Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>),
+%%     emqx:publish(Message0),
+%%     ?assertMatch([ {publish, #{payload := <<"hi">>}}
+%%                  , {publish, #{payload := <<"hi">>}}
+%%                  ], collect_msgs(5_000), #{routes => ets:tab2list(emqx_route)}),
+
+%%     {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic0),
+%%     {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic1),
+
+%%     ok.
+
+t_queue_subscription({init, Config}) ->
+    TestName = atom_to_binary(?FUNCTION_NAME),
+    ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
+
+    {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C),
+
+    [{client, C}, {clientid, ClientId} | Config];
+t_queue_subscription({'end', Config}) ->
+    C = ?config(client, Config),
+    emqtt:stop(C),
+    ok;
+t_queue_subscription(Config) when is_list(Config) ->
+    C = ?config(client, Config),
+    ClientId = ?config(clientid, Config),
+    %% Subscribe and unsubscribe to both $queue and $shared topics
+    Topic = <<"t/1">>,
+    QueueTopic = <<"$queue/", Topic/binary>>,
+    SharedTopic = <<"$share/aa/", Topic/binary>>,
+    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {QueueTopic, 2}),
+    {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {SharedTopic, 2}),
+
+    %% FIXME: we should actually see 2 routes, one for each group
+    %% ($queue and aa), but currently the latest subscription
+    %% overwrites the existing one.
+    ?retry(
+        _Sleep0 = 100,
+        _Attempts0 = 50,
+        begin
+            ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
+            %% FIXME: should ensure we have 2 subscriptions
+            true = emqx_router:has_routes(Topic)
+        end
+    ),
+
+    %% now publish to the underlying topic
+    Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>),
+    emqx:publish(Message0),
+    ?assertMatch(
+        [
+            {publish, #{payload := <<"hi">>}}
+            %% FIXME: should receive one message from each group
+            %% , {publish, #{payload := <<"hi">>}}
+        ],
+        collect_msgs(5_000)
+    ),
+
+    {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, QueueTopic),
+    %% FIXME: return code should be success instead of 17 ("no_subscription_existed")
+    {ok, _, [?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(C, SharedTopic),
+
+    %% FIXME: this should eventually be true, but currently we leak
+    %% the previous group subscription...
+    %% ?retry(
+    %%     _Sleep0 = 100,
+    %%     _Attempts0 = 50,
+    %%    begin
+    %%     ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
+    %%     false = emqx_router:has_routes(Topic)
+    %%    end
+    %%   ),
+    ct:sleep(500),
+
+    Message1 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hello">>),
+    emqx:publish(Message1),
+    %% FIXME: we should *not* receive any messages...
+    %% ?assertEqual([], collect_msgs(1_000), #{routes => ets:tab2list(emqx_route)}),
+    %% This is from the leaked group...
+    ?assertMatch([{publish, #{topic := Topic}}], collect_msgs(1_000), #{
+        routes => ets:tab2list(emqx_route)
+    }),
+
+    ok.
+
 %%--------------------------------------------------------------------
 %% help functions
 %%--------------------------------------------------------------------

+ 10 - 0
apps/emqx/test/emqx_topic_SUITE.erl

@@ -211,6 +211,10 @@ t_systop(_) ->
     ?assertEqual(SysTop2, systop(<<"abc">>)).
 
 t_feed_var(_) ->
+    ?assertEqual(
+        <<"$queue/client/clientId">>,
+        feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)
+    ),
     ?assertEqual(
         <<"username/test/client/x">>,
         feed_var(
@@ -232,6 +236,10 @@ long_topic() ->
     iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]).
 
 t_parse(_) ->
+    ?assertError(
+        {invalid_topic_filter, <<"$queue/t">>},
+        parse(<<"$queue/t">>, #{share => <<"g">>})
+    ),
     ?assertError(
         {invalid_topic_filter, <<"$share/g/t">>},
         parse(<<"$share/g/t">>, #{share => <<"g">>})
@@ -246,9 +254,11 @@ t_parse(_) ->
     ),
     ?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)),
     ?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})),
+    ?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)),
     ?assertEqual({<<"topic">>, #{share => <<"group">>}}, parse(<<"$share/group/topic">>)),
     %% The '$local' and '$fastlane' topics have been deprecated.
     ?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)),
+    ?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)),
     ?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)),
     ?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)).
 

+ 2 - 0
apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl

@@ -187,6 +187,8 @@ format(WhichNode, {{Topic, _Subscriber}, Options}) ->
         maps:with([qos, nl, rap, rh], Options)
     ).
 
+get_topic(Topic, #{share := <<"$queue">> = Group}) ->
+    emqx_topic:join([Group, Topic]);
 get_topic(Topic, #{share := Group}) ->
     emqx_topic:join([<<"$share">>, Group, Topic]);
 get_topic(Topic, _) ->

+ 1 - 0
changes/ce/fix-11281.en.md

@@ -0,0 +1 @@
+Restored support for the special `$queue/` shared subscription.