|
@@ -186,14 +186,12 @@ init(#{name := Name} = ConnectOpts) ->
|
|
|
?LOG(info, "starting bridge worker for ~p", [Name]),
|
|
?LOG(info, "starting bridge worker for ~p", [Name]),
|
|
|
erlang:process_flag(trap_exit, true),
|
|
erlang:process_flag(trap_exit, true),
|
|
|
ConnectModule = conn_type(maps:get(conn_type, ConnectOpts)),
|
|
ConnectModule = conn_type(maps:get(conn_type, ConnectOpts)),
|
|
|
- Forwards = maps:get(forwards, ConnectOpts, #{}),
|
|
|
|
|
Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})),
|
|
Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})),
|
|
|
State = init_state(ConnectOpts),
|
|
State = init_state(ConnectOpts),
|
|
|
self() ! idle,
|
|
self() ! idle,
|
|
|
{ok, idle, State#{
|
|
{ok, idle, State#{
|
|
|
connect_module => ConnectModule,
|
|
connect_module => ConnectModule,
|
|
|
connect_opts => pre_process_opts(ConnectOpts),
|
|
connect_opts => pre_process_opts(ConnectOpts),
|
|
|
- forwards => Forwards,
|
|
|
|
|
replayq => Queue
|
|
replayq => Queue
|
|
|
}}.
|
|
}}.
|
|
|
|
|
|
|
@@ -323,7 +321,7 @@ common(_StateName, {call, From}, ensure_stopped, #{connection := Conn,
|
|
|
connect_module := ConnectModule} = State) ->
|
|
connect_module := ConnectModule} = State) ->
|
|
|
Reply = ConnectModule:stop(Conn),
|
|
Reply = ConnectModule:stop(Conn),
|
|
|
{next_state, idle, State#{connection => undefined}, [{reply, From, Reply}]};
|
|
{next_state, idle, State#{connection => undefined}, [{reply, From, Reply}]};
|
|
|
-common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) ->
|
|
|
|
|
|
|
+common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := Forwards}}) ->
|
|
|
{keep_state_and_data, [{reply, From, Forwards}]};
|
|
{keep_state_and_data, [{reply, From, Forwards}]};
|
|
|
common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) ->
|
|
common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) ->
|
|
|
{keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]};
|
|
{keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]};
|
|
@@ -343,9 +341,8 @@ common(StateName, Type, Content, #{name := Name} = State) ->
|
|
|
[Name, Type, StateName, Content]),
|
|
[Name, Type, StateName, Content]),
|
|
|
{keep_state, State}.
|
|
{keep_state, State}.
|
|
|
|
|
|
|
|
-do_connect(#{forwards := Forwards,
|
|
|
|
|
- connect_module := ConnectModule,
|
|
|
|
|
- connect_opts := ConnectOpts,
|
|
|
|
|
|
|
+do_connect(#{connect_module := ConnectModule,
|
|
|
|
|
+ connect_opts := ConnectOpts = #{forwards := Forwards},
|
|
|
inflight := Inflight,
|
|
inflight := Inflight,
|
|
|
name := Name} = State) ->
|
|
name := Name} = State) ->
|
|
|
case Forwards of
|
|
case Forwards of
|
|
@@ -407,19 +404,20 @@ pop_and_send_loop(#{replayq := Q, connect_module := Module} = State, N) ->
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
%% Assert non-empty batch because we have a is_empty check earlier.
|
|
%% Assert non-empty batch because we have a is_empty check earlier.
|
|
|
-do_send(#{forwards := undefined}, _QAckRef, Batch) ->
|
|
|
|
|
|
|
+do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Batch) ->
|
|
|
?LOG(error, "cannot forward messages to remote broker as 'bridge.mqtt.<name>.in' not configured, msg: ~p", [Batch]);
|
|
?LOG(error, "cannot forward messages to remote broker as 'bridge.mqtt.<name>.in' not configured, msg: ~p", [Batch]);
|
|
|
do_send(#{inflight := Inflight,
|
|
do_send(#{inflight := Inflight,
|
|
|
connect_module := Module,
|
|
connect_module := Module,
|
|
|
connection := Connection,
|
|
connection := Connection,
|
|
|
mountpoint := Mountpoint,
|
|
mountpoint := Mountpoint,
|
|
|
- forwards := Forwards,
|
|
|
|
|
|
|
+ connect_opts := #{forwards := Forwards},
|
|
|
if_record_metrics := IfRecordMetrics} = State, QAckRef, [_ | _] = Batch) ->
|
|
if_record_metrics := IfRecordMetrics} = State, QAckRef, [_ | _] = Batch) ->
|
|
|
Vars = emqx_bridge_msg:make_pub_vars(Mountpoint, Forwards),
|
|
Vars = emqx_bridge_msg:make_pub_vars(Mountpoint, Forwards),
|
|
|
ExportMsg = fun(Message) ->
|
|
ExportMsg = fun(Message) ->
|
|
|
bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'),
|
|
bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'),
|
|
|
emqx_bridge_msg:to_remote_msg(Module, Message, Vars)
|
|
emqx_bridge_msg:to_remote_msg(Module, Message, Vars)
|
|
|
end,
|
|
end,
|
|
|
|
|
+ ?LOG(debug, "publish to remote broker, msg: ~p, vars: ~p", [Batch, Vars]),
|
|
|
case Module:send(Connection, [ExportMsg(M) || M <- Batch]) of
|
|
case Module:send(Connection, [ExportMsg(M) || M <- Batch]) of
|
|
|
{ok, Refs} ->
|
|
{ok, Refs} ->
|
|
|
{ok, State#{inflight := Inflight ++ [#{q_ack_ref => QAckRef,
|
|
{ok, State#{inflight := Inflight ++ [#{q_ack_ref => QAckRef,
|