|
|
@@ -1,3 +1,4 @@
|
|
|
+
|
|
|
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
|
%%
|
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
@@ -30,9 +31,17 @@
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
|
|
code_change/3]).
|
|
|
|
|
|
--record(state, {client_pid, options, reconnect_interval,
|
|
|
- mountpoint, queue, mqueue_type, max_pending_messages,
|
|
|
- forwards = [], subscriptions = []}).
|
|
|
+-record(state, {client_pid :: pid(),
|
|
|
+ options :: list(),
|
|
|
+ reconnect_interval :: pos_integer(),
|
|
|
+ mountpoint :: binary(),
|
|
|
+ readq :: list(),
|
|
|
+ writeq :: list(),
|
|
|
+ replayq :: map(),
|
|
|
+ ackref :: replayq:ack_ref(),
|
|
|
+ queue_option :: map(),
|
|
|
+ forwards :: list(),
|
|
|
+ subscriptions :: list()}).
|
|
|
|
|
|
-record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false,
|
|
|
packet_id, topic, props, payload}).
|
|
|
@@ -104,20 +113,85 @@ init([Options]) ->
|
|
|
auto -> erlang:send_after(1000, self(), start)
|
|
|
end,
|
|
|
ReconnectInterval = get_value(reconnect_interval, Options, 30000),
|
|
|
- MaxPendingMsg = get_value(max_pending_messages, Options, 10000),
|
|
|
Mountpoint = format_mountpoint(get_value(mountpoint, Options)),
|
|
|
- MqueueType = get_value(mqueue_type, Options, memory),
|
|
|
- Queue = [],
|
|
|
+ QueueOptions = get_value(queue, Options),
|
|
|
{ok, #state{mountpoint = Mountpoint,
|
|
|
- queue = Queue,
|
|
|
- mqueue_type = MqueueType,
|
|
|
+ queue_option = QueueOptions,
|
|
|
+ readq = [],
|
|
|
+ writeq = [],
|
|
|
options = Options,
|
|
|
- reconnect_interval = ReconnectInterval,
|
|
|
- max_pending_messages = MaxPendingMsg}}.
|
|
|
+ reconnect_interval = ReconnectInterval}}.
|
|
|
+
|
|
|
+handle_call(start_bridge, _From, State = #state{options = Options,
|
|
|
+ replayq = undefined,
|
|
|
+ client_pid = undefined,
|
|
|
+ queue_option = #{batch_size := BatchSize,
|
|
|
+ replayq_dir := ReplayqDir,
|
|
|
+ replayq_seg_bytes := ReplayqSegBytes}}) ->
|
|
|
+ case emqx_client:start_link([{owner, self()}|options(Options)]) of
|
|
|
+ {ok, ClientPid} ->
|
|
|
+ case emqx_client:connect(ClientPid) of
|
|
|
+ {ok, _} ->
|
|
|
+ emqx_logger:info("[Bridge] connected to remote successfully"),
|
|
|
+ Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])),
|
|
|
+ Forwards = subscribe_local_topics(Options),
|
|
|
+ ReplayQ = replayq:open(#{dir => ReplayqDir,
|
|
|
+ seg_bytes => ReplayqSegBytes,
|
|
|
+ sizer => fun(Term) ->
|
|
|
+ size(term_to_binary(Term))
|
|
|
+ end,
|
|
|
+ marshaller => fun({PktId, Msg}) ->
|
|
|
+ term_to_binary({PktId, Msg});
|
|
|
+ (Bin) ->
|
|
|
+ binary_to_term(Bin)
|
|
|
+ end
|
|
|
+ }),
|
|
|
+ {NewReplayQ, AckRef, ReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}),
|
|
|
+ {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []),
|
|
|
+ {reply, #{msg => <<"start bridge successfully">>}, State#state{client_pid = ClientPid,
|
|
|
+ subscriptions = Subs,
|
|
|
+ readq = NewReadQ,
|
|
|
+ replayq = NewReplayQ,
|
|
|
+ ackref = AckRef,
|
|
|
+ forwards = Forwards}};
|
|
|
+ {error, Reason} ->
|
|
|
+ emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]),
|
|
|
+ {reply, #{msg => <<"connect to remote failed">>}, State#state{client_pid = ClientPid}}
|
|
|
+ end;
|
|
|
+ {error, Reason} ->
|
|
|
+ emqx_logger:error("[Bridge] start failed! error: ~p", [Reason]),
|
|
|
+ {reply, #{msg => <<"start bridge failed">>}, State}
|
|
|
+ end;
|
|
|
|
|
|
-handle_call(start_bridge, _From, State = #state{client_pid = undefined}) ->
|
|
|
- {noreply, NewState} = handle_info(start, State),
|
|
|
- {reply, #{msg => <<"start bridge successfully">>}, NewState};
|
|
|
+
|
|
|
+handle_call(start_bridge, _From, State = #state{options = Options,
|
|
|
+ client_pid = undefined,
|
|
|
+ replayq = ReplayQ,
|
|
|
+ queue_option = #{batch_size := BatchSize}
|
|
|
+ }) ->
|
|
|
+ case emqx_client:start_link([{owner, self()} | options(Options)]) of
|
|
|
+ {ok, ClientPid} ->
|
|
|
+ case emqx_client:connect(ClientPid) of
|
|
|
+ {ok, _} ->
|
|
|
+ emqx_logger:info("[Bridge] connected to remote ysucessfully"),
|
|
|
+ Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])),
|
|
|
+ Forwards = subscribe_local_topics(Options),
|
|
|
+ {NewReplayQ, AckRef, ReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}),
|
|
|
+ {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []),
|
|
|
+ {reply, #{msg => <<"start bridge successfully">>}, State#state{client_pid = ClientPid,
|
|
|
+ subscriptions = Subs,
|
|
|
+ readq = NewReadQ,
|
|
|
+ replayq = NewReplayQ,
|
|
|
+ ackref = AckRef,
|
|
|
+ forwards = Forwards}};
|
|
|
+ {error, Reason} ->
|
|
|
+ emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]),
|
|
|
+ {reply, #{msg => <<"connect to remote failed">>}, State#state{client_pid = ClientPid}}
|
|
|
+ end;
|
|
|
+ {error, Reason} ->
|
|
|
+ emqx_logger:error("[Bridge] restart failed! error: ~p", [Reason]),
|
|
|
+ {reply, #{msg => <<"start bridge failed">>}, State}
|
|
|
+ end;
|
|
|
|
|
|
handle_call(start_bridge, _From, State) ->
|
|
|
{reply, #{msg => <<"bridge already started">>}, State};
|
|
|
@@ -184,46 +258,82 @@ handle_cast(Msg, State) ->
|
|
|
emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
-%%----------------------------------------------------------------
|
|
|
-%% start message bridge
|
|
|
-%%----------------------------------------------------------------
|
|
|
-handle_info(start, State = #state{options = Options,
|
|
|
- client_pid = undefined}) ->
|
|
|
- case emqx_client:start_link([{owner, self()}|options(Options)]) of
|
|
|
+handle_info(restart, State = #state{options = Options,
|
|
|
+ client_pid = undefined,
|
|
|
+ replayq = ReplayQ,
|
|
|
+ queue_option = #{batch_size := BatchSize}
|
|
|
+ }) ->
|
|
|
+ case emqx_client:start_link([{owner, self()} | options(Options)]) of
|
|
|
{ok, ClientPid} ->
|
|
|
case emqx_client:connect(ClientPid) of
|
|
|
{ok, _} ->
|
|
|
- emqx_logger:info("[Bridge] connected to remote sucessfully"),
|
|
|
+ emqx_logger:info("[Bridge] connected to remote successfully"),
|
|
|
Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])),
|
|
|
- Forwards = subscribe_local_topics(get_value(forwards, Options, [])),
|
|
|
+ Forwards = subscribe_local_topics(Options),
|
|
|
+ {NewReplayQ, AckRef, ReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}),
|
|
|
+ {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []),
|
|
|
{noreply, State#state{client_pid = ClientPid,
|
|
|
subscriptions = Subs,
|
|
|
+ readq = NewReadQ,
|
|
|
+ replayq = NewReplayQ,
|
|
|
+ ackref = AckRef,
|
|
|
forwards = Forwards}};
|
|
|
{error, Reason} ->
|
|
|
emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]),
|
|
|
{noreply, State#state{client_pid = ClientPid}}
|
|
|
end;
|
|
|
{error, Reason} ->
|
|
|
- emqx_logger:error("[Bridge] start failed! error: ~p", [Reason]),
|
|
|
+ emqx_logger:error("[Bridge] restart failed! error: ~p", [Reason]),
|
|
|
{noreply, State}
|
|
|
end;
|
|
|
|
|
|
+%%----------------------------------------------------------------
|
|
|
+%% pop message from replayq and publish again
|
|
|
+%%----------------------------------------------------------------
|
|
|
+handle_info(pop, State = #state{writeq = WriteQ, replayq = ReplayQ,
|
|
|
+ queue_option = #{batch_size := BatchSize}}) ->
|
|
|
+ {NewReplayQ, AckRef, NewReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}),
|
|
|
+ {NewReadQ1, NewWriteQ} = case NewReadQ of
|
|
|
+ [] -> {WriteQ, []};
|
|
|
+ _ -> {NewReadQ, WriteQ}
|
|
|
+ end,
|
|
|
+ self() ! replay,
|
|
|
+ {noreply, State#state{readq = NewReadQ1, writeq = NewWriteQ, replayq = NewReplayQ, ackref = AckRef}};
|
|
|
+
|
|
|
+handle_info(dump, State = #state{writeq = WriteQ, replayq = ReplayQ}) ->
|
|
|
+ NewReplayQueue = replayq:append(ReplayQ, lists:reverse(WriteQ)),
|
|
|
+ {noreply, State#state{replayq = NewReplayQueue, writeq = []}};
|
|
|
+
|
|
|
+%%----------------------------------------------------------------
|
|
|
+%% replay message from replayq
|
|
|
+%%----------------------------------------------------------------
|
|
|
+handle_info(replay, State = #state{client_pid = ClientPid, readq = ReadQ}) ->
|
|
|
+ {ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []),
|
|
|
+ {noreply, State#state{readq = NewReadQ}};
|
|
|
+
|
|
|
%%----------------------------------------------------------------
|
|
|
%% received local node message
|
|
|
%%----------------------------------------------------------------
|
|
|
handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}},
|
|
|
- State = #state{client_pid = Pid, mountpoint = Mountpoint, queue = Queue,
|
|
|
- mqueue_type = MqueueType, max_pending_messages = MaxPendingMsg}) ->
|
|
|
+ State = #state{client_pid = undefined,
|
|
|
+ mountpoint = Mountpoint}) ->
|
|
|
+ Msg = #mqtt_msg{qos = 1,
|
|
|
+ retain = Retain,
|
|
|
+ topic = mountpoint(Mountpoint, Topic),
|
|
|
+ payload = Payload},
|
|
|
+ {noreply, en_writeq({undefined, Msg}, State)};
|
|
|
+handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}},
|
|
|
+ State = #state{client_pid = Pid, mountpoint = Mountpoint}) ->
|
|
|
Msg = #mqtt_msg{qos = 1,
|
|
|
retain = Retain,
|
|
|
topic = mountpoint(Mountpoint, Topic),
|
|
|
payload = Payload},
|
|
|
case emqx_client:publish(Pid, Msg) of
|
|
|
- {ok, PkgId} ->
|
|
|
- {noreply, State#state{queue = store(MqueueType, {PkgId, Msg}, Queue, MaxPendingMsg)}};
|
|
|
- {error, Reason} ->
|
|
|
+ {ok, PktId} ->
|
|
|
+ {noreply, en_writeq({PktId, Msg}, State)};
|
|
|
+ {error, {PktId, Reason}} ->
|
|
|
emqx_logger:error("[Bridge] Publish fail:~p", [Reason]),
|
|
|
- {noreply, State}
|
|
|
+ {noreply, en_writeq({PktId, Msg}, State)}
|
|
|
end;
|
|
|
|
|
|
%%----------------------------------------------------------------
|
|
|
@@ -239,18 +349,19 @@ handle_info({publish, #{qos := QoS, dup := Dup, retain := Retain, topic := Topic
|
|
|
%%----------------------------------------------------------------
|
|
|
%% received remote puback message
|
|
|
%%----------------------------------------------------------------
|
|
|
-handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, mqueue_type = MqueueType}) ->
|
|
|
- % lists:keydelete(PkgId, 1, Queue)
|
|
|
- {noreply, State#state{queue = delete(MqueueType, PkgId, Queue)}};
|
|
|
+handle_info({puback, #{packet_id := PktId}}, State) ->
|
|
|
+ {noreply, delete(PktId, State)};
|
|
|
|
|
|
handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) ->
|
|
|
emqx_logger:warning("[Bridge] stop ~p", [normal]),
|
|
|
+ self() ! dump,
|
|
|
{noreply, State#state{client_pid = undefined}};
|
|
|
|
|
|
handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid,
|
|
|
reconnect_interval = ReconnectInterval}) ->
|
|
|
emqx_logger:error("[Bridge] stop ~p", [Reason]),
|
|
|
- erlang:send_after(ReconnectInterval, self(), start),
|
|
|
+ self() ! dump,
|
|
|
+ erlang:send_after(ReconnectInterval, self(), restart),
|
|
|
{noreply, State#state{client_pid = undefined}};
|
|
|
|
|
|
handle_info(Info, State) ->
|
|
|
@@ -267,8 +378,10 @@ subscribe_remote_topics(ClientPid, Subscriptions) ->
|
|
|
[begin emqx_client:subscribe(ClientPid, {bin(Topic), Qos}), {bin(Topic), Qos} end
|
|
|
|| {Topic, Qos} <- Subscriptions, emqx_topic:validate({filter, bin(Topic)})].
|
|
|
|
|
|
-subscribe_local_topics(Topics) ->
|
|
|
- [begin emqx_broker:subscribe(bin(Topic)), bin(Topic) end
|
|
|
+subscribe_local_topics(Options) ->
|
|
|
+ Topics = get_value(forwards, Options, []),
|
|
|
+ Subid = get_value(client_id, Options, <<"bridge">>),
|
|
|
+ [begin emqx_broker:subscribe(bin(Topic), #{qos => 1, subid => Subid}), bin(Topic) end
|
|
|
|| Topic <- Topics, emqx_topic:validate({filter, bin(Topic)})].
|
|
|
|
|
|
proto_ver(mqttv3) -> v3;
|
|
|
@@ -320,15 +433,35 @@ format_mountpoint(undefined) ->
|
|
|
format_mountpoint(Prefix) ->
|
|
|
binary:replace(bin(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
|
|
|
|
|
|
-store(memory, Data, Queue, MaxPendingMsg) when length(Queue) =< MaxPendingMsg ->
|
|
|
- [Data | Queue];
|
|
|
-store(memory, _Data, Queue, _MaxPendingMsg) ->
|
|
|
- logger:error("Beyond max pending messages"),
|
|
|
- Queue;
|
|
|
-store(disk, Data, Queue, _MaxPendingMsg)->
|
|
|
- [Data | Queue].
|
|
|
-
|
|
|
-delete(memory, PkgId, Queue) ->
|
|
|
- lists:keydelete(PkgId, 1, Queue);
|
|
|
-delete(disk, PkgId, Queue) ->
|
|
|
- lists:keydelete(PkgId, 1, Queue).
|
|
|
+
|
|
|
+en_writeq(Msg, State = #state{replayq = ReplayQ,
|
|
|
+ queue_option = #{mem_cache := false}}) ->
|
|
|
+ NewReplayQ = replayq:append(ReplayQ, [Msg]),
|
|
|
+ State#state{replayq = NewReplayQ};
|
|
|
+en_writeq(Msg, State = #state{writeq = WriteQ,
|
|
|
+ queue_option = #{batch_size := BatchSize,
|
|
|
+ mem_cache := true}})
|
|
|
+ when length(WriteQ) < BatchSize->
|
|
|
+ State#state{writeq = [Msg | WriteQ]} ;
|
|
|
+en_writeq(Msg, State = #state{writeq = WriteQ, replayq = ReplayQ,
|
|
|
+ queue_option = #{mem_cache := true}}) ->
|
|
|
+ NewReplayQ =replayq:append(ReplayQ, lists:reverse(WriteQ)),
|
|
|
+ State#state{writeq = [Msg], replayq = NewReplayQ}.
|
|
|
+
|
|
|
+publish_readq_msg(_ClientPid, [], ReadQ) ->
|
|
|
+ {ok, ReadQ};
|
|
|
+publish_readq_msg(ClientPid, [{_PktId, Msg} | ReadQ], ReadQ) ->
|
|
|
+ io:format("~n replay msg: ~p ~n", [Msg]),
|
|
|
+ {ok, PktId} = emqx_client:publish(ClientPid, Msg),
|
|
|
+ publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | ReadQ]).
|
|
|
+
|
|
|
+delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref = AckRef}) ->
|
|
|
+ ok = replayq:ack(ReplayQ, AckRef),
|
|
|
+ self() ! pop,
|
|
|
+ State;
|
|
|
+
|
|
|
+delete(PktId, State = #state{readq = [], writeq = WriteQ}) ->
|
|
|
+ State#state{writeq = lists:keydelete(PktId, 1, WriteQ)};
|
|
|
+
|
|
|
+delete(PktId, State = #state{readq = ReadQ}) ->
|
|
|
+ State#state{readq = lists:keydelete(PktId, 1, ReadQ)}.
|