|
|
@@ -23,12 +23,16 @@
|
|
|
|
|
|
-export([start_link/2, start_bridge/1, stop_bridge/1, status/1]).
|
|
|
|
|
|
+-export([show_forwards/1, add_forward/2, del_forward/2]).
|
|
|
+
|
|
|
+-export([show_subscriptions/1, add_subscription/3, del_subscription/2]).
|
|
|
+
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
|
|
code_change/3]).
|
|
|
|
|
|
--record(state, {client_pid, options, reconnect_time, reconnect_count,
|
|
|
- def_reconnect_count, type, mountpoint, queue, store_type,
|
|
|
- max_pending_messages}).
|
|
|
+-record(state, {client_pid, options, reconnect_interval,
|
|
|
+ mountpoint, queue, mqueue_type, max_pending_messages,
|
|
|
+ forwards = [], subscriptions = []}).
|
|
|
|
|
|
-record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false,
|
|
|
packet_id, topic, props, payload}).
|
|
|
@@ -42,6 +46,50 @@ start_bridge(Name) ->
|
|
|
stop_bridge(Name) ->
|
|
|
gen_server:call(name(Name), stop_bridge).
|
|
|
|
|
|
+-spec(show_forwards(atom()) -> list()).
|
|
|
+show_forwards(Name) ->
|
|
|
+ gen_server:call(name(Name), show_forwards).
|
|
|
+
|
|
|
+-spec(add_forward(atom(), binary()) -> ok | {error, already_exists | validate_fail}).
|
|
|
+add_forward(Name, Topic) ->
|
|
|
+ case catch emqx_topic:validate({filter, Topic}) of
|
|
|
+ true ->
|
|
|
+ gen_server:call(name(Name), {add_forward, Topic});
|
|
|
+ {'EXIT', _Reason} ->
|
|
|
+ {error, validate_fail}
|
|
|
+ end.
|
|
|
+
|
|
|
+-spec(del_forward(atom(), binary()) -> ok | {error, validate_fail}).
|
|
|
+del_forward(Name, Topic) ->
|
|
|
+ case catch emqx_topic:validate({filter, Topic}) of
|
|
|
+ true ->
|
|
|
+ gen_server:call(name(Name), {del_forward, Topic});
|
|
|
+ _ ->
|
|
|
+ {error, validate_fail}
|
|
|
+ end.
|
|
|
+
|
|
|
+-spec(show_subscriptions(atom()) -> list()).
|
|
|
+show_subscriptions(Name) ->
|
|
|
+ gen_server:call(name(Name), show_subscriptions).
|
|
|
+
|
|
|
+-spec(add_subscription(atom(), binary(), integer()) -> ok | {error, already_exists | validate_fail}).
|
|
|
+add_subscription(Name, Topic, QoS) ->
|
|
|
+ case catch emqx_topic:validate({filter, Topic}) of
|
|
|
+ true ->
|
|
|
+ gen_server:call(name(Name), {add_subscription, Topic, QoS});
|
|
|
+ {'EXIT', _Reason} ->
|
|
|
+ {error, validate_fail}
|
|
|
+ end.
|
|
|
+
|
|
|
+-spec(del_subscription(atom(), binary()) -> ok | {error, validate_fail}).
|
|
|
+del_subscription(Name, Topic) ->
|
|
|
+ case catch emqx_topic:validate({filter, Topic}) of
|
|
|
+ true ->
|
|
|
+ gen_server:call(name(Name), {del_subscription, Topic});
|
|
|
+ _ ->
|
|
|
+ {error, validate_fail}
|
|
|
+ end.
|
|
|
+
|
|
|
status(Pid) ->
|
|
|
gen_server:call(Pid, status).
|
|
|
|
|
|
@@ -55,41 +103,78 @@ init([Options]) ->
|
|
|
manual -> ok;
|
|
|
auto -> erlang:send_after(1000, self(), start)
|
|
|
end,
|
|
|
- ReconnectCount = get_value(reconnect_count, Options, 10),
|
|
|
- ReconnectTime = get_value(reconnect_time, Options, 30000),
|
|
|
+ ReconnectInterval = get_value(reconnect_interval, Options, 30000),
|
|
|
MaxPendingMsg = get_value(max_pending_messages, Options, 10000),
|
|
|
Mountpoint = format_mountpoint(get_value(mountpoint, Options)),
|
|
|
- StoreType = get_value(store_type, Options, memory),
|
|
|
- Type = get_value(type, Options, in),
|
|
|
+ MqueueType = get_value(mqueue_type, Options, memory),
|
|
|
Queue = [],
|
|
|
- {ok, #state{type = Type,
|
|
|
- mountpoint = Mountpoint,
|
|
|
- queue = Queue,
|
|
|
- store_type = StoreType,
|
|
|
- options = Options,
|
|
|
- reconnect_count = ReconnectCount,
|
|
|
- reconnect_time = ReconnectTime,
|
|
|
- def_reconnect_count = ReconnectCount,
|
|
|
+ {ok, #state{mountpoint = Mountpoint,
|
|
|
+ queue = Queue,
|
|
|
+ mqueue_type = MqueueType,
|
|
|
+ options = Options,
|
|
|
+ reconnect_interval = ReconnectInterval,
|
|
|
max_pending_messages = MaxPendingMsg}}.
|
|
|
|
|
|
handle_call(start_bridge, _From, State = #state{client_pid = undefined}) ->
|
|
|
{noreply, NewState} = handle_info(start, State),
|
|
|
- {reply, <<"start bridge successfully">>, NewState};
|
|
|
+ {reply, #{msg => <<"start bridge successfully">>}, NewState};
|
|
|
|
|
|
handle_call(start_bridge, _From, State) ->
|
|
|
- {reply, <<"bridge already started">>, State};
|
|
|
+ {reply, #{msg => <<"bridge already started">>}, State};
|
|
|
|
|
|
handle_call(stop_bridge, _From, State = #state{client_pid = undefined}) ->
|
|
|
- {reply, <<"bridge not started">>, State};
|
|
|
+ {reply, #{msg => <<"bridge not started">>}, State};
|
|
|
|
|
|
handle_call(stop_bridge, _From, State = #state{client_pid = Pid}) ->
|
|
|
emqx_client:disconnect(Pid),
|
|
|
- {reply, <<"stop bridge successfully">>, State};
|
|
|
+ {reply, #{msg => <<"stop bridge successfully">>}, State};
|
|
|
|
|
|
handle_call(status, _From, State = #state{client_pid = undefined}) ->
|
|
|
- {reply, <<"Stopped">>, State};
|
|
|
+ {reply, #{status => <<"Stopped">>}, State};
|
|
|
handle_call(status, _From, State = #state{client_pid = _Pid})->
|
|
|
- {reply, <<"Running">>, State};
|
|
|
+ {reply, #{status => <<"Running">>}, State};
|
|
|
+
|
|
|
+handle_call(show_forwards, _From, State = #state{forwards = Forwards}) ->
|
|
|
+ {reply, Forwards, State};
|
|
|
+
|
|
|
+handle_call({add_forward, Topic}, _From, State = #state{forwards = Forwards}) ->
|
|
|
+ case not lists:member(Topic, Forwards) of
|
|
|
+ true ->
|
|
|
+ emqx_broker:subscribe(Topic),
|
|
|
+ {reply, ok, State#state{forwards = [Topic | Forwards]}};
|
|
|
+ false ->
|
|
|
+ {reply, {error, already_exists}, State}
|
|
|
+ end;
|
|
|
+
|
|
|
+handle_call({del_forward, Topic}, _From, State = #state{forwards = Forwards}) ->
|
|
|
+ case lists:member(Topic, Forwards) of
|
|
|
+ true ->
|
|
|
+ emqx_broker:unsubscribe(Topic),
|
|
|
+ {reply, ok, State#state{forwards = lists:delete(Topic, Forwards)}};
|
|
|
+ false ->
|
|
|
+ {reply, ok, State}
|
|
|
+ end;
|
|
|
+
|
|
|
+handle_call(show_subscriptions, _From, State = #state{subscriptions = Subscriptions}) ->
|
|
|
+ {reply, Subscriptions, State};
|
|
|
+
|
|
|
+handle_call({add_subscription, Topic, Qos}, _From, State = #state{subscriptions = Subscriptions, client_pid = ClientPid}) ->
|
|
|
+ case not lists:keymember(Topic, 1, Subscriptions) of
|
|
|
+ true ->
|
|
|
+ emqx_client:subscribe(ClientPid, {Topic, Qos}),
|
|
|
+ {reply, ok, State#state{subscriptions = [{Topic, Qos} | Subscriptions]}};
|
|
|
+ false ->
|
|
|
+ {reply, {error, already_exists}, State}
|
|
|
+ end;
|
|
|
+
|
|
|
+handle_call({del_subscription, Topic}, _From, State = #state{subscriptions = Subscriptions, client_pid = ClientPid}) ->
|
|
|
+ case lists:keymember(Topic, 1, Subscriptions) of
|
|
|
+ true ->
|
|
|
+ emqx_client:unsubscribe(ClientPid, Topic),
|
|
|
+ {reply, ok, State#state{subscriptions = lists:keydelete(Topic, 1, Subscriptions)}};
|
|
|
+ false ->
|
|
|
+ {reply, ok, State}
|
|
|
+ end;
|
|
|
|
|
|
handle_call(Req, _From, State) ->
|
|
|
emqx_logger:error("[Bridge] unexpected call: ~p", [Req]),
|
|
|
@@ -99,46 +184,24 @@ handle_cast(Msg, State) ->
|
|
|
emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
-handle_info(start, State = #state{reconnect_count = 0}) ->
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
%%----------------------------------------------------------------
|
|
|
-%% start in message bridge
|
|
|
+%% start message bridge
|
|
|
%%----------------------------------------------------------------
|
|
|
handle_info(start, State = #state{options = Options,
|
|
|
client_pid = undefined,
|
|
|
- reconnect_time = ReconnectTime,
|
|
|
- reconnect_count = ReconnectCount,
|
|
|
- type = in}) ->
|
|
|
+ reconnect_interval = ReconnectInterval}) ->
|
|
|
case emqx_client:start_link([{owner, self()}|options(Options)]) of
|
|
|
{ok, ClientPid, _} ->
|
|
|
- Subs = get_value(subscriptions, Options, []),
|
|
|
- [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs],
|
|
|
- {noreply, State#state{client_pid = ClientPid}};
|
|
|
+ Subs = [{i2b(Topic), Qos} || {Topic, Qos} <- get_value(subscriptions, Options, []),
|
|
|
+ emqx_topic:validate({filter, i2b(Topic)})],
|
|
|
+ Forwards = [i2b(Topic) || Topic <- string:tokens(get_value(forwards, Options, ""), ","),
|
|
|
+ emqx_topic:validate({filter, i2b(Topic)})],
|
|
|
+ [emqx_client:subscribe(ClientPid, {Topic, Qos}) || {Topic, Qos} <- Subs],
|
|
|
+ [emqx_broker:subscribe(Topic) || Topic <- Forwards],
|
|
|
+ {noreply, State#state{client_pid = ClientPid, subscriptions = Subs, forwards = Forwards}};
|
|
|
{error,_} ->
|
|
|
- erlang:send_after(ReconnectTime, self(), start),
|
|
|
- {noreply, State#state{reconnect_count = ReconnectCount-1}}
|
|
|
- end;
|
|
|
-
|
|
|
-%%----------------------------------------------------------------
|
|
|
-%% start out message bridge
|
|
|
-%%----------------------------------------------------------------
|
|
|
-handle_info(start, State = #state{options = Options,
|
|
|
- client_pid = undefined,
|
|
|
- reconnect_time = ReconnectTime,
|
|
|
- reconnect_count = ReconnectCount,
|
|
|
- type = out}) ->
|
|
|
- case emqx_client:start_link([{owner, self()}|options(Options)]) of
|
|
|
- {ok, ClientPid, _} ->
|
|
|
- Subs = get_value(subscriptions, Options, []),
|
|
|
- [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs],
|
|
|
- ForwardRules = string:tokens(get_value(forward_rule, Options, ""), ","),
|
|
|
- [emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules,
|
|
|
- emqx_topic:validate({filter, i2b(Topic)})],
|
|
|
- {noreply, State#state{client_pid = ClientPid}};
|
|
|
- {error,_} ->
|
|
|
- erlang:send_after(ReconnectTime, self(), start),
|
|
|
- {noreply, State#state{reconnect_count = ReconnectCount-1}}
|
|
|
+ erlang:send_after(ReconnectInterval, self(), start),
|
|
|
+ {noreply, State}
|
|
|
end;
|
|
|
|
|
|
%%----------------------------------------------------------------
|
|
|
@@ -146,14 +209,14 @@ handle_info(start, State = #state{options = Options,
|
|
|
%%----------------------------------------------------------------
|
|
|
handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}},
|
|
|
State = #state{client_pid = Pid, mountpoint = Mountpoint, queue = Queue,
|
|
|
- store_type = StoreType, max_pending_messages = MaxPendingMsg}) ->
|
|
|
+ mqueue_type = MqueueType, max_pending_messages = MaxPendingMsg}) ->
|
|
|
Msg = #mqtt_msg{qos = 1,
|
|
|
retain = Retain,
|
|
|
topic = mountpoint(Mountpoint, Topic),
|
|
|
payload = Payload},
|
|
|
case emqx_client:publish(Pid, Msg) of
|
|
|
{ok, PkgId} ->
|
|
|
- {noreply, State#state{queue = store(StoreType, {PkgId, Msg}, Queue, MaxPendingMsg)}};
|
|
|
+ {noreply, State#state{queue = store(MqueueType, {PkgId, Msg}, Queue, MaxPendingMsg)}};
|
|
|
{error, Reason} ->
|
|
|
emqx_logger:error("Publish fail:~p", [Reason]),
|
|
|
{noreply, State}
|
|
|
@@ -165,26 +228,25 @@ handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{r
|
|
|
handle_info({publish, #{qos := QoS, dup := Dup, retain := Retain, topic := Topic,
|
|
|
properties := Props, payload := Payload}}, State) ->
|
|
|
NewMsg0 = emqx_message:make(bridge, QoS, Topic, Payload),
|
|
|
- NewMsg1 = emqx_message:set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain=> Retain}, NewMsg0)),
|
|
|
+ NewMsg1 = emqx_message:set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain => Retain}, NewMsg0)),
|
|
|
emqx_broker:publish(NewMsg1),
|
|
|
{noreply, State};
|
|
|
|
|
|
%%----------------------------------------------------------------
|
|
|
%% received remote puback message
|
|
|
%%----------------------------------------------------------------
|
|
|
-handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, store_type = StoreType}) ->
|
|
|
+handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, mqueue_type = MqueueType}) ->
|
|
|
% lists:keydelete(PkgId, 1, Queue)
|
|
|
- {noreply, State#state{queue = delete(StoreType, PkgId, Queue)}};
|
|
|
+ {noreply, State#state{queue = delete(MqueueType, PkgId, Queue)}};
|
|
|
|
|
|
handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) ->
|
|
|
{noreply, State#state{client_pid = undefined}};
|
|
|
|
|
|
handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid,
|
|
|
- reconnect_time = ReconnectTime,
|
|
|
- def_reconnect_count = DefReconnectCount}) ->
|
|
|
+ reconnect_interval = ReconnectInterval}) ->
|
|
|
lager:warning("emqx bridge stop reason:~p", [Reason]),
|
|
|
- erlang:send_after(ReconnectTime, self(), start),
|
|
|
- {noreply, State#state{client_pid = undefined, reconnect_count = DefReconnectCount}};
|
|
|
+ erlang:send_after(ReconnectInterval, self(), start),
|
|
|
+ {noreply, State#state{client_pid = undefined}};
|
|
|
|
|
|
handle_info(Info, State) ->
|
|
|
emqx_logger:error("[Bridge] unexpected info: ~p", [Info]),
|
|
|
@@ -196,9 +258,9 @@ terminate(_Reason, #state{}) ->
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
|
|
|
|
-proto_ver(mqtt3) -> v3;
|
|
|
-proto_ver(mqtt4) -> v4;
|
|
|
-proto_ver(mqtt5) -> v5.
|
|
|
+proto_ver(mqttv3) -> v3;
|
|
|
+proto_ver(mqttv4) -> v4;
|
|
|
+proto_ver(mqttv5) -> v5.
|
|
|
address(Address) ->
|
|
|
case string:tokens(Address, ":") of
|
|
|
[Host] -> {Host, 1883};
|