Преглед изворни кода

fix(shared): handle unsubscribe for sticky strategy

prior to this change, the message is dispatched to a shared subscriber
even after unsubscribed
Zaiming (Stone) Shi пре 3 година
родитељ
комит
29f394aa70

+ 20 - 9
apps/emqx/src/emqx_shared_sub.erl

@@ -139,7 +139,7 @@ record(Group, Topic, SubPid) ->
 -spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) ->
     emqx_types:deliver_result().
 dispatch(Group, Topic, Delivery) ->
-    dispatch(Group, Topic, Delivery, _FailedSubs = []).
+    dispatch(Group, Topic, Delivery, _FailedSubs = #{}).
 
 dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
     #message{from = ClientId, topic = SourceTopic} = Msg,
@@ -151,9 +151,9 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
             case do_dispatch(SubPid, Group, Topic, Msg1, Type) of
                 ok ->
                     {ok, 1};
-                {error, _Reason} ->
+                {error, Reason} ->
                     %% Failed to dispatch to this sub, try next.
-                    dispatch(Group, Topic, Delivery, [SubPid | FailedSubs])
+                    dispatch(Group, Topic, Delivery, FailedSubs#{SubPid => Reason})
             end
     end.
 
@@ -262,7 +262,8 @@ redispatch_shared_message(#message{} = Msg) ->
     %% Note that dispatch is called with self() in failed subs
     %% This is done to avoid dispatching back to caller
     Delivery = #delivery{sender = self(), message = Msg},
-    dispatch(Group, Topic, Delivery, [self()]).
+    FailedSubs = #{self() => sender},
+    dispatch(Group, Topic, Delivery, FailedSubs).
 
 %% @hidden Return the `redispatch_to` group-topic in the message header.
 %% `false` is returned if the message is not a shared dispatch.
@@ -307,14 +308,22 @@ maybe_ack(Msg) ->
 
 pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
     Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
-    case is_active_sub(Sub0, FailedSubs) of
+    All = subscribers(Group, Topic),
+    case is_active_sub(Sub0, FailedSubs, All) of
         true ->
             %% the old subscriber is still alive
             %% keep using it for sticky strategy
             {fresh, Sub0};
         false ->
             %% randomly pick one for the first message
-            {Type, Sub} = do_pick(random, ClientId, SourceTopic, Group, Topic, [Sub0 | FailedSubs]),
+            {Type, Sub} = do_pick(
+                random,
+                ClientId,
+                SourceTopic,
+                Group,
+                Topic,
+                FailedSubs#{Sub0 => noproc}
+            ),
             %% stick to whatever pick result
             erlang:put({shared_sub_sticky, Group, Topic}, Sub),
             {Type, Sub}
@@ -324,7 +333,7 @@ pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
 
 do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
     All = subscribers(Group, Topic),
-    case All -- FailedSubs of
+    case lists:filter(fun(Sub) -> not maps:is_key(Sub, FailedSubs) end, All) of
         [] when All =:= [] ->
             %% Genuinely no subscriber
             false;
@@ -512,8 +521,10 @@ update_stats(State) ->
     State.
 
 %% Return 'true' if the subscriber process is alive AND not in the failed list
-is_active_sub(Pid, FailedSubs) ->
-    is_alive_sub(Pid) andalso not lists:member(Pid, FailedSubs).
+is_active_sub(Pid, FailedSubs, All) ->
+    lists:member(Pid, All) andalso
+        is_alive_sub(Pid) andalso
+        (not maps:is_key(Pid, FailedSubs)).
 
 %% erlang:is_process_alive/1 does not work with remote pid.
 is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->

+ 1 - 1
apps/emqx/test/emqx_session_SUITE.erl

@@ -190,7 +190,7 @@ t_publish_qos2_with_error_return(_) ->
 
     begin
         Msg2 = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload2">>),
-        {ok, [], Session1} = emqx_session:publish(clientinfo(), PacketId2 = 2, Msg2, Session),
+        {ok, [], Session1} = emqx_session:publish(clientinfo(), _PacketId2 = 2, Msg2, Session),
         ?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)),
         {error, RC2 = ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(
             clientinfo(), _PacketId3 = 3, Msg2, Session1

+ 33 - 0
apps/emqx/test/emqx_shared_sub_SUITE.erl

@@ -344,6 +344,39 @@ t_sticky(Config) when is_list(Config) ->
     ok = ensure_config(sticky, true),
     test_two_messages(sticky).
 
+%% two subscribers in one shared group
+%% one unsubscribe after receiving a message
+%% the other one in the group should receive the next message
+t_sticky_unsubscribe(Config) when is_list(Config) ->
+    ok = ensure_config(sticky, false),
+    Topic = <<"foo/bar/sticky-unsub">>,
+    ClientId1 = <<"c1-sticky-unsub">>,
+    ClientId2 = <<"c2-sticky-unsub">>,
+    Group = <<"gsu">>,
+    {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
+    {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
+    {ok, _} = emqtt:connect(ConnPid1),
+    {ok, _} = emqtt:connect(ConnPid2),
+
+    ShareTopic = <<"$share/", Group/binary, "/", Topic/binary>>,
+    emqtt:subscribe(ConnPid1, {ShareTopic, 0}),
+    emqtt:subscribe(ConnPid2, {ShareTopic, 0}),
+
+    Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
+    Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
+    ct:sleep(100),
+
+    emqx:publish(Message1),
+    {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
+    emqtt:unsubscribe(UsedSubPid1, ShareTopic),
+    emqx:publish(Message2),
+    {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
+    ?assertNotEqual(UsedSubPid1, UsedSubPid2),
+
+    kill_process(ConnPid1, fun(_) -> emqtt:stop(ConnPid1) end),
+    kill_process(ConnPid2, fun(_) -> emqtt:stop(ConnPid2) end),
+    ok.
+
 t_hash(Config) when is_list(Config) ->
     ok = ensure_config(hash, false),
     test_two_messages(hash).

+ 1 - 1
apps/emqx_connector/src/emqx_connector_jwt.erl

@@ -16,7 +16,7 @@
 
 -module(emqx_connector_jwt).
 
--include("emqx_connector_tables.hrl").
+-include_lib("emqx_connector/include/emqx_connector_tables.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
 
 %% API

+ 1 - 1
apps/emqx_connector/src/emqx_connector_jwt_sup.erl

@@ -18,7 +18,7 @@
 
 -behaviour(supervisor).
 
--include("emqx_connector_tables.hrl").
+-include_lib("emqx_connector/include/emqx_connector_tables.hrl").
 
 -export([
     start_link/0,