소스 검색

Fix share sub bug

turtled 7 년 전
부모
커밋
95d36d0204
4개의 변경된 파일9개의 추가작업 그리고 7개의 파일을 삭제
  1. 1 1
      include/emqx.hrl
  2. 6 4
      src/emqx_broker.erl
  3. 1 1
      src/emqx_shared_sub.erl
  4. 1 1
      src/emqx_topic.erl

+ 1 - 1
include/emqx.hrl

@@ -54,7 +54,7 @@
 -type(subid() :: binary() | atom()).
 
 -type(subopts() :: #{qos    => integer(),
-                     share  => '$queue' | binary(),
+                     share  => binary(),
                      atom() => term()}).
 
 -record(subscription, {

+ 6 - 4
src/emqx_broker.erl

@@ -183,16 +183,18 @@ route([{To, Node}], Delivery) when Node =:= node() ->
 route([{To, Node}], Delivery = #delivery{flows = Flows}) when is_atom(Node) ->
     forward(Node, To, Delivery#delivery{flows = [{route, Node, To}|Flows]});
 
-route([{To, Shared}], Delivery) when is_tuple(Shared); is_binary(Shared) ->
-    emqx_shared_sub:dispatch(Shared, To, Delivery);
+route([{To, Group}], Delivery) when is_tuple(Group); is_binary(Group) ->
+    emqx_shared_sub:dispatch(Group, To, Delivery);
 
 route(Routes, Delivery) ->
     lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes).
 
 aggre([]) ->
     [];
-aggre([#route{topic = To, dest = Dest}]) ->
-    [{To, Dest}];
+aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
+    [{To, Node}];
+aggre([#route{topic = To, dest = {Group, _Node}}]) ->
+    [{To, Group}];
 aggre(Routes) ->
     lists:foldl(
       fun(#route{topic = To, dest = Node}, Acc) when is_atom(Node) ->

+ 1 - 1
src/emqx_shared_sub.erl

@@ -81,7 +81,7 @@ record(Group, Topic, SubPid) ->
     #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
 
 %% TODO: dispatch strategy, ensure the delivery...
-dispatch({Group, _Node}, Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
+dispatch(Group, Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
     case pick(subscribers(Group, Topic)) of
         false  -> Delivery;
         SubPid -> SubPid ! {dispatch, Topic, Msg},

+ 1 - 1
src/emqx_topic.erl

@@ -185,7 +185,7 @@ parse(Topic = <<"$queue/", _/binary>>, #{share := _Group}) ->
 parse(Topic = <<"$share/", _/binary>>, #{share := _Group}) ->
     error({invalid_topic, Topic});
 parse(<<"$queue/", Topic1/binary>>, Options) ->
-    parse(Topic1, maps:put(share, '$queue', Options));
+    parse(Topic1, maps:put(share, <<"$queue">>, Options));
 parse(<<"$share/", Topic1/binary>>, Options) ->
     [Group, Topic2] = binary:split(Topic1, <<"/">>),
     {Topic2, maps:put(share, Group, Options)};