Просмотр исходного кода

Shared subscriber should be keyed by SharedName + Topic

Prior to this change, if a producer session produces to two
or more shared subscriber groups, the previously picked subscriber
for sticky strategy may not be a valid member for the next group.
spring2maz 7 лет назад
Родитель
Сommit
3bab3cbd2a
1 измененных файлов с 14 добавлено и 13 удалено
  1. 14 13
      src/emqx_shared_sub.erl

+ 14 - 13
src/emqx_shared_sub.erl

@@ -89,7 +89,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}) -
     end.
 
 pick(sticky, ClientId, Group, Topic) ->
-    Sub0 = erlang:get(shared_sub_sticky),
+    Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
     case is_sub_alive(Sub0) of
         true ->
             %% the old subscriber is still alive
@@ -99,32 +99,33 @@ pick(sticky, ClientId, Group, Topic) ->
             %% randomly pick one for the first message
             Sub = do_pick(random, ClientId, Group, Topic),
             %% stick to whatever pick result
-            erlang:put(shared_sub_sticky, Sub),
+            erlang:put({shared_sub_sticky, Group, Topic}, Sub),
             Sub
     end;
 pick(Strategy, ClientId, Group, Topic) ->
     do_pick(Strategy, ClientId, Group, Topic).
 
 do_pick(Strategy, ClientId, Group, Topic) ->
-    All = subscribers(Group, Topic),
-    pick_subscriber(Strategy, ClientId, All).
+    case subscribers(Group, Topic) of
+        [] -> false;
+        [Sub] -> Sub;
+        All -> pick_subscriber(Group, Topic, Strategy, ClientId, All)
+    end.
 
-pick_subscriber(_, _ClientId, []) -> false;
-pick_subscriber(_, _ClientId, [Sub]) -> Sub;
-pick_subscriber(Strategy, ClientId, Subs) ->
-    Nth = do_pick_subscriber(Strategy, ClientId, length(Subs)),
+pick_subscriber(Group, Topic, Strategy, ClientId, Subs) ->
+    Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, length(Subs)),
     lists:nth(Nth, Subs).
 
-do_pick_subscriber(random, _ClientId, Count) ->
+do_pick_subscriber(_Group, _Topic, random, _ClientId, Count) ->
     rand:uniform(Count);
-do_pick_subscriber(hash, ClientId, Count) ->
+do_pick_subscriber(_Group, _Topic, hash, ClientId, Count) ->
     1 + erlang:phash2(ClientId) rem Count;
-do_pick_subscriber(round_robin, _ClientId, Count) ->
-    Rem = case erlang:get(shared_sub_round_robin) of
+do_pick_subscriber(Group, Topic, round_robin, _ClientId, Count) ->
+    Rem = case erlang:get({shared_sub_round_robin, Group, Topic}) of
               undefined -> 0;
               N -> (N + 1) rem Count
           end,
-    _ = erlang:put(shared_sub_round_robin, Rem),
+    _ = erlang:put({shared_sub_round_robin, Group, Topic}, Rem),
     Rem + 1.
 
 subscribers(Group, Topic) ->