|
@@ -139,6 +139,7 @@ subscribe(Topic, SubId, SubOpts0) when ?IS_TOPIC(Topic), ?IS_SUBID(SubId), is_ma
|
|
|
%% New
|
|
%% New
|
|
|
false ->
|
|
false ->
|
|
|
ok = emqx_broker_helper:register_sub(SubPid, SubId),
|
|
ok = emqx_broker_helper:register_sub(SubPid, SubId),
|
|
|
|
|
+ true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
|
|
|
do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts));
|
|
do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts));
|
|
|
%% Existed
|
|
%% Existed
|
|
|
true ->
|
|
true ->
|
|
@@ -153,11 +154,7 @@ with_subid(undefined, SubOpts) ->
|
|
|
with_subid(SubId, SubOpts) ->
|
|
with_subid(SubId, SubOpts) ->
|
|
|
maps:put(subid, SubId, SubOpts).
|
|
maps:put(subid, SubId, SubOpts).
|
|
|
|
|
|
|
|
-do_subscribe(Topic, SubPid, SubOpts) ->
|
|
|
|
|
- true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
|
|
|
|
|
- do_subscribe2(Topic, SubPid, SubOpts).
|
|
|
|
|
-
|
|
|
|
|
-do_subscribe2(Topic, SubPid, SubOpts) when is_binary(Topic) ->
|
|
|
|
|
|
|
+do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) ->
|
|
|
%% FIXME: subscribe shard bug
|
|
%% FIXME: subscribe shard bug
|
|
|
%% https://emqx.atlassian.net/browse/EMQX-10214
|
|
%% https://emqx.atlassian.net/browse/EMQX-10214
|
|
|
case emqx_broker_helper:get_sub_shard(SubPid, Topic) of
|
|
case emqx_broker_helper:get_sub_shard(SubPid, Topic) of
|
|
@@ -170,7 +167,7 @@ do_subscribe2(Topic, SubPid, SubOpts) when is_binary(Topic) ->
|
|
|
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}),
|
|
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}),
|
|
|
call(pick({Topic, I}), {subscribe, Topic, I})
|
|
call(pick({Topic, I}), {subscribe, Topic, I})
|
|
|
end;
|
|
end;
|
|
|
-do_subscribe2(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when
|
|
|
|
|
|
|
+do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when
|
|
|
is_binary(RealTopic)
|
|
is_binary(RealTopic)
|
|
|
->
|
|
->
|
|
|
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}),
|
|
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}),
|