|
|
@@ -73,7 +73,7 @@
|
|
|
-export([connecting/3, connected/3]).
|
|
|
|
|
|
%% management APIs
|
|
|
--export([get_forwards/1, ensure_forward_present/2]). %, del_forward/2]).
|
|
|
+-export([get_forwards/1, ensure_forward_present/2, ensure_forward_absent/2]).
|
|
|
-export([get_subscriptions/1]). %, add_subscription/3, del_subscription/2]).
|
|
|
|
|
|
-export_type([config/0,
|
|
|
@@ -143,8 +143,13 @@ handle_ack(Pid, Ref) when node() =:= node(Pid) ->
|
|
|
get_forwards(Id) -> gen_statem:call(id(Id), get_forwards, timer:seconds(1000)).
|
|
|
|
|
|
%% @doc Add a new forward (local topic subscription).
|
|
|
--spec ensure_forward_present(id(), topic()) -> ok | {error, any()}.
|
|
|
-ensure_forward_present(Id, Topic) -> gen_statem:call(id(Id), {ensure_forward_present, topic(Topic)}).
|
|
|
+-spec ensure_forward_present(id(), topic()) -> ok.
|
|
|
+ensure_forward_present(Id, Topic) ->
|
|
|
+ gen_statem:call(id(Id), {ensure_forward_present, topic(Topic)}).
|
|
|
+
|
|
|
+-spec ensure_forward_absent(id(), topic()) -> ok.
|
|
|
+ensure_forward_absent(Id, Topic) ->
|
|
|
+ gen_statem:call(id(Id), {ensure_forward_absent, topic(Topic)}).
|
|
|
|
|
|
-spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}].
|
|
|
get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions).
|
|
|
@@ -290,6 +295,16 @@ common(_StateName, {call, From}, {ensure_forward_present, Topic},
|
|
|
{keep_state, State#{forwards := lists:usort([Topic | Forwards])},
|
|
|
[{reply, From, ok}]}
|
|
|
end;
|
|
|
+common(_StateName, {call, From}, {ensure_forward_absent, Topic},
|
|
|
+ #{forwards := Forwards} = State) ->
|
|
|
+ case lists:member(Topic, Forwards) of
|
|
|
+ true ->
|
|
|
+ emqx_broker:unsubscribe(Topic),
|
|
|
+ {keep_state, State#{forwards := lists:delete(Topic, Forwards)},
|
|
|
+ [{reply, From, ok}]};
|
|
|
+ false ->
|
|
|
+ {keep_state_and_data, [{reply, From, ok}]}
|
|
|
+ end;
|
|
|
common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) ->
|
|
|
{keep_state_and_data, [{reply, From, Subs}]};
|
|
|
common(_StateName, info, {dispatch, _, Msg},
|