Procházet zdrojové kódy

fix issue #53 - client will receive duplicate messages when overlapping subscription

Feng před 10 roky
rodič
revize
2954094619
1 změnil soubory, kde provedl 18 přidání a 1 odebrání
  1. 18 1
      apps/emqttd/src/emqttd_pubsub.erl

+ 18 - 1
apps/emqttd/src/emqttd_pubsub.erl

@@ -24,6 +24,7 @@
 %%%
 %%%
 %%% @end
 %%% @end
 %%%-----------------------------------------------------------------------------
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_pubsub).
 -module(emqttd_pubsub).
 
 
 -author("Feng Lee <feng@emqtt.io>").
 -author("Feng Lee <feng@emqtt.io>").
@@ -217,6 +218,7 @@ match(Topic) when is_binary(Topic) ->
 init([Id, _Opts]) ->
 init([Id, _Opts]) ->
     process_flag(min_heap_size, 1024*1024),
     process_flag(min_heap_size, 1024*1024),
     gproc_pool:connect_worker(pubsub, {?MODULE, Id}),
     gproc_pool:connect_worker(pubsub, {?MODULE, Id}),
+    %%TODO: gb_trees to replace maps?
     {ok, #state{id = Id, submap = maps:new()}}.
     {ok, #state{id = Id, submap = maps:new()}}.
 
 
 handle_call({subscribe, SubPid, Topics}, _From, State) ->
 handle_call({subscribe, SubPid, Topics}, _From, State) ->
@@ -384,9 +386,24 @@ add_topic(TopicR = #mqtt_topic{topic = Topic}) ->
             end
             end
     end.
     end.
 
 
-add_subscriber({TopicR, Subscriber}) when is_record(TopicR, mqtt_topic) ->
+%% Fix issue #53 - Remove Overlapping Subscriptions
+add_subscriber({TopicR, Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, pid = SubPid}})
+                when is_record(TopicR, mqtt_topic) ->
     case add_topic(TopicR) of
     case add_topic(TopicR) of
         ok ->
         ok ->
+            OverlapSubs = [Sub || Sub = #mqtt_subscriber{topic = SubTopic, qos = SubQos}
+                                    <- mnesia:index_read(subscriber, SubPid, #mqtt_subscriber.pid),
+                                        SubTopic =:= Topic, SubQos =/= Qos],
+
+            %% remove overlapping subscribers
+            if
+                length(OverlapSubs) =:= 0 -> ok;
+                true ->
+                    lager:warning("Remove overlapping subscribers: ~p", [OverlapSubs]),
+                    [mnesia:delete_object(subscriber, OverlapSub, write) || OverlapSub <- OverlapSubs]
+            end,
+
+            %% insert subscriber
             mnesia:write(subscriber, Subscriber, write);
             mnesia:write(subscriber, Subscriber, write);
         Error -> 
         Error -> 
             Error
             Error