|
|
@@ -73,7 +73,7 @@
|
|
|
-export([connecting/3, connected/3]).
|
|
|
|
|
|
%% management APIs
|
|
|
--export([get_forwards/1]). %, add_forward/2, del_forward/2]).
|
|
|
+-export([get_forwards/1, ensure_forward_present/2]). %, del_forward/2]).
|
|
|
-export([get_subscriptions/1]). %, add_subscription/3, del_subscription/2]).
|
|
|
|
|
|
-export_type([config/0,
|
|
|
@@ -86,6 +86,7 @@
|
|
|
-type config() :: map().
|
|
|
-type batch() :: [emqx_portal_msg:exp_msg()].
|
|
|
-type ack_ref() :: term().
|
|
|
+-type topic() :: emqx_topic:topic().
|
|
|
|
|
|
-include("logger.hrl").
|
|
|
-include("emqx_mqtt.hrl").
|
|
|
@@ -137,8 +138,13 @@ handle_ack(Pid, Ref) when node() =:= node(Pid) ->
|
|
|
Pid ! {batch_ack, Ref},
|
|
|
ok.
|
|
|
|
|
|
--spec get_forwards(id()) -> [emqx_topic:topic()].
|
|
|
-get_forwards(Id) -> gen_statem:call(id(Id), get_forwards).
|
|
|
+%% @doc Return all forwards (local subscriptions).
|
|
|
+-spec get_forwards(id()) -> [topic()].
|
|
|
+get_forwards(Id) -> gen_statem:call(id(Id), get_forwards, timer:seconds(1000)).
|
|
|
+
|
|
|
+%% @doc Add a new forward (local topic subscription).
|
|
|
+-spec ensure_forward_present(id(), topic()) -> ok | {error, any()}.
|
|
|
+ensure_forward_present(Id, Topic) -> gen_statem:call(id(Id), {ensure_forward_present, topic(Topic)}).
|
|
|
|
|
|
-spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}].
|
|
|
get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions).
|
|
|
@@ -185,7 +191,7 @@ init(Config) ->
|
|
|
batch_count_limit => GetQ(batch_count_limit, ?DEFAULT_BATCH_COUNT),
|
|
|
max_inflight_batches => Get(max_inflight_batches, ?DEFAULT_SEND_AHEAD),
|
|
|
mountpoint => format_mountpoint(Get(mountpoint, undefined)),
|
|
|
- topics => Topics,
|
|
|
+ forwards => Topics,
|
|
|
subscriptions => Subs,
|
|
|
replayq => Queue,
|
|
|
inflight => []
|
|
|
@@ -274,6 +280,16 @@ connected(Type, Content, State) ->
|
|
|
%% Common handlers
|
|
|
common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) ->
|
|
|
{keep_state_and_data, [{reply, From, Forwards}]};
|
|
|
+common(_StateName, {call, From}, {ensure_forward_present, Topic},
|
|
|
+ #{forwards := Forwards} = State) ->
|
|
|
+ case lists:member(Topic, Forwards) of
|
|
|
+ true ->
|
|
|
+ {keep_state_and_data, [{reply, From, ok}]};
|
|
|
+ false ->
|
|
|
+ ok = subscribe_local_topic(Topic),
|
|
|
+ {keep_state, State#{forwards := lists:usort([Topic | Forwards])},
|
|
|
+ [{reply, From, ok}]}
|
|
|
+ end;
|
|
|
common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) ->
|
|
|
{keep_state_and_data, [{reply, From, Subs}]};
|
|
|
common(_StateName, info, {dispatch, _, Msg},
|
|
|
@@ -281,8 +297,8 @@ common(_StateName, info, {dispatch, _, Msg},
|
|
|
NewQ = replayq:append(Q, collect([Msg])),
|
|
|
{keep_state, State#{replayq => NewQ}, ?maybe_send};
|
|
|
common(StateName, Type, Content, State) ->
|
|
|
- ?DEBUG("Portal ~p discarded ~p type event at state ~p:~p",
|
|
|
- [name(), Type, StateName, Content]),
|
|
|
+ ?INFO("Portal ~p discarded ~p type event at state ~p:~p",
|
|
|
+ [name(), Type, StateName, Content]),
|
|
|
{keep_state, State}.
|
|
|
|
|
|
collect(Acc) ->
|
|
|
@@ -346,13 +362,19 @@ do_ack(#{inflight := Inflight}, Ref) ->
|
|
|
false -> stale
|
|
|
end.
|
|
|
|
|
|
-subscribe_local_topics(Topics) ->
|
|
|
- lists:foreach(
|
|
|
- fun(Topic0) ->
|
|
|
- Topic = iolist_to_binary(Topic0),
|
|
|
- emqx_topic:validate({filter, Topic}) orelse erlang:error({bad_topic, Topic}),
|
|
|
- emqx_broker:subscribe(Topic, #{qos => ?QOS_1, subid => name()})
|
|
|
- end, Topics).
|
|
|
+subscribe_local_topics(Topics) -> lists:foreach(fun subscribe_local_topic/1, Topics).
|
|
|
+
|
|
|
+subscribe_local_topic(Topic0) ->
|
|
|
+ Topic = topic(Topic0),
|
|
|
+ try
|
|
|
+ emqx_topic:validate({filter, Topic})
|
|
|
+ catch
|
|
|
+ error : Reason ->
|
|
|
+ erlang:error({bad_topic, Topic, Reason})
|
|
|
+ end,
|
|
|
+ ok = emqx_broker:subscribe(Topic, #{qos => ?QOS_1, subid => name()}).
|
|
|
+
|
|
|
+topic(T) -> iolist_to_binary(T).
|
|
|
|
|
|
disconnect(#{connection := Conn,
|
|
|
conn_ref := ConnRef,
|