|
|
@@ -35,6 +35,11 @@
|
|
|
unsubscribe/2
|
|
|
]).
|
|
|
|
|
|
+%% Internal exports (RPC)
|
|
|
+-export([
|
|
|
+ try_subscribe/2
|
|
|
+]).
|
|
|
+
|
|
|
-record(exclusive_subscription, {
|
|
|
topic :: emqx_types:topic(),
|
|
|
clientid :: emqx_types:clientid()
|
|
|
@@ -80,10 +85,7 @@ on_delete_module() ->
|
|
|
-spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) ->
|
|
|
allow | deny.
|
|
|
check_subscribe(#{clientid := ClientId}, Topic) ->
|
|
|
- Fun = fun() ->
|
|
|
- try_subscribe(ClientId, Topic)
|
|
|
- end,
|
|
|
- case mria:transaction(?EXCLUSIVE_SHARD, Fun) of
|
|
|
+ case mria:transaction(?EXCLUSIVE_SHARD, fun ?MODULE:try_subscribe/2, [ClientId, Topic]) of
|
|
|
{atomic, Res} ->
|
|
|
Res;
|
|
|
{aborted, Reason} ->
|
|
|
@@ -94,7 +96,7 @@ check_subscribe(#{clientid := ClientId}, Topic) ->
|
|
|
end.
|
|
|
|
|
|
unsubscribe(Topic, #{is_exclusive := true}) ->
|
|
|
- _ = mria:transaction(?EXCLUSIVE_SHARD, fun() -> mnesia:delete({?TAB, Topic}) end),
|
|
|
+ _ = mria:transaction(?EXCLUSIVE_SHARD, fun mnesia:delete/1, [{?TAB, Topic}]),
|
|
|
ok;
|
|
|
unsubscribe(_Topic, _SubOpts) ->
|
|
|
ok.
|