|
|
@@ -106,8 +106,8 @@ start_link(Pool, Id) ->
|
|
|
create_tabs() ->
|
|
|
TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}],
|
|
|
|
|
|
- %% SubOption: {SubPid, Topic} -> SubOption
|
|
|
- ok = emqx_tables:new(?SUBOPTION, [set | TabOpts]),
|
|
|
+ %% SubOption: {Topic, SubPid} -> SubOption
|
|
|
+ ok = emqx_tables:new(?SUBOPTION, [ordered_set | TabOpts]),
|
|
|
|
|
|
%% Subscription: SubPid -> Topic1, Topic2, Topic3, ...
|
|
|
%% duplicate_bag: o(1) insert
|
|
|
@@ -136,7 +136,7 @@ subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?IS_SUBID(SubId), is_ma
|
|
|
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
|
|
|
_ = emqx_trace:subscribe(Topic, SubId, SubOpts),
|
|
|
SubPid = self(),
|
|
|
- case ets:member(?SUBOPTION, {SubPid, Topic}) of
|
|
|
+ case subscribed(SubPid, Topic) of
|
|
|
%% New
|
|
|
false ->
|
|
|
ok = emqx_broker_helper:register_sub(SubPid, SubId),
|
|
|
@@ -164,16 +164,16 @@ do_subscribe(undefined, Topic, SubPid, SubOpts) ->
|
|
|
case emqx_broker_helper:get_sub_shard(SubPid, Topic) of
|
|
|
0 ->
|
|
|
true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
|
|
|
- true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
|
|
|
+ true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}),
|
|
|
call(pick(Topic), {subscribe, Topic});
|
|
|
I ->
|
|
|
true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
|
|
- true = ets:insert(?SUBOPTION, {{SubPid, Topic}, maps:put(shard, I, SubOpts)}),
|
|
|
+ true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}),
|
|
|
call(pick({Topic, I}), {subscribe, Topic, I})
|
|
|
end;
|
|
|
%% Shared subscription
|
|
|
do_subscribe(Group, Topic, SubPid, SubOpts) ->
|
|
|
- true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
|
|
|
+ true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}),
|
|
|
emqx_shared_sub:subscribe(Group, Topic, SubPid).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -183,7 +183,7 @@ do_subscribe(Group, Topic, SubPid, SubOpts) ->
|
|
|
-spec unsubscribe(emqx_types:topic()) -> ok.
|
|
|
unsubscribe(Topic) when is_binary(Topic) ->
|
|
|
SubPid = self(),
|
|
|
- case ets:lookup(?SUBOPTION, {SubPid, Topic}) of
|
|
|
+ case ets:lookup(?SUBOPTION, {Topic, SubPid}) of
|
|
|
[{_, SubOpts}] ->
|
|
|
_ = emqx_broker_helper:reclaim_seq(Topic),
|
|
|
_ = emqx_trace:unsubscribe(Topic, SubOpts),
|
|
|
@@ -193,7 +193,7 @@ unsubscribe(Topic) when is_binary(Topic) ->
|
|
|
end.
|
|
|
|
|
|
do_unsubscribe(Topic, SubPid, SubOpts) ->
|
|
|
- true = ets:delete(?SUBOPTION, {SubPid, Topic}),
|
|
|
+ true = ets:delete(?SUBOPTION, {Topic, SubPid}),
|
|
|
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
|
|
|
Group = maps:get(share, SubOpts, undefined),
|
|
|
do_unsubscribe(Group, Topic, SubPid, SubOpts),
|
|
|
@@ -362,10 +362,10 @@ subscribers(Shard = {shard, _Topic, _I}) ->
|
|
|
subscriber_down(SubPid) ->
|
|
|
lists:foreach(
|
|
|
fun(Topic) ->
|
|
|
- case lookup_value(?SUBOPTION, {SubPid, Topic}) of
|
|
|
+ case lookup_value(?SUBOPTION, {Topic, SubPid}) of
|
|
|
SubOpts when is_map(SubOpts) ->
|
|
|
_ = emqx_broker_helper:reclaim_seq(Topic),
|
|
|
- true = ets:delete(?SUBOPTION, {SubPid, Topic}),
|
|
|
+ true = ets:delete(?SUBOPTION, {Topic, SubPid}),
|
|
|
case maps:get(shard, SubOpts, 0) of
|
|
|
0 ->
|
|
|
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
|
|
|
@@ -390,7 +390,7 @@ subscriber_down(SubPid) ->
|
|
|
[{emqx_types:topic(), emqx_types:subopts()}].
|
|
|
subscriptions(SubPid) when is_pid(SubPid) ->
|
|
|
[
|
|
|
- {Topic, lookup_value(?SUBOPTION, {SubPid, Topic}, #{})}
|
|
|
+ {Topic, lookup_value(?SUBOPTION, {Topic, SubPid}, #{})}
|
|
|
|| Topic <- lookup_value(?SUBSCRIPTION, SubPid, [])
|
|
|
];
|
|
|
subscriptions(SubId) ->
|
|
|
@@ -403,19 +403,19 @@ subscriptions(SubId) ->
|
|
|
|
|
|
-spec subscriptions_via_topic(emqx_types:topic()) -> [emqx_types:subopts()].
|
|
|
subscriptions_via_topic(Topic) ->
|
|
|
- MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=', '$1', Topic}], ['$_']}],
|
|
|
+ MatchSpec = [{{{Topic, '_'}, '_'}, [], ['$_']}],
|
|
|
ets:select(?SUBOPTION, MatchSpec).
|
|
|
|
|
|
-spec subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean().
|
|
|
subscribed(SubPid, Topic) when is_pid(SubPid) ->
|
|
|
- ets:member(?SUBOPTION, {SubPid, Topic});
|
|
|
+ ets:member(?SUBOPTION, {Topic, SubPid});
|
|
|
subscribed(SubId, Topic) when ?IS_SUBID(SubId) ->
|
|
|
SubPid = emqx_broker_helper:lookup_subpid(SubId),
|
|
|
- ets:member(?SUBOPTION, {SubPid, Topic}).
|
|
|
+ ets:member(?SUBOPTION, {Topic, SubPid}).
|
|
|
|
|
|
-spec get_subopts(pid(), emqx_types:topic()) -> maybe(emqx_types:subopts()).
|
|
|
get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) ->
|
|
|
- lookup_value(?SUBOPTION, {SubPid, Topic});
|
|
|
+ lookup_value(?SUBOPTION, {Topic, SubPid});
|
|
|
get_subopts(SubId, Topic) when ?IS_SUBID(SubId) ->
|
|
|
case emqx_broker_helper:lookup_subpid(SubId) of
|
|
|
SubPid when is_pid(SubPid) ->
|
|
|
@@ -430,7 +430,7 @@ set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) ->
|
|
|
|
|
|
%% @private
|
|
|
set_subopts(SubPid, Topic, NewOpts) ->
|
|
|
- Sub = {SubPid, Topic},
|
|
|
+ Sub = {Topic, SubPid},
|
|
|
case ets:lookup(?SUBOPTION, Sub) of
|
|
|
[{_, OldOpts}] ->
|
|
|
ets:insert(?SUBOPTION, {Sub, maps:merge(OldOpts, NewOpts)});
|