|
@@ -185,12 +185,10 @@ callback_mode() -> [state_functions].
|
|
|
init(#{name := Name} = ConnectOpts) ->
|
|
init(#{name := Name} = ConnectOpts) ->
|
|
|
?LOG(info, "starting bridge worker for ~p", [Name]),
|
|
?LOG(info, "starting bridge worker for ~p", [Name]),
|
|
|
erlang:process_flag(trap_exit, true),
|
|
erlang:process_flag(trap_exit, true),
|
|
|
- ConnectModule = conn_type(maps:get(conn_type, ConnectOpts)),
|
|
|
|
|
Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})),
|
|
Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})),
|
|
|
State = init_state(ConnectOpts),
|
|
State = init_state(ConnectOpts),
|
|
|
self() ! idle,
|
|
self() ! idle,
|
|
|
{ok, idle, State#{
|
|
{ok, idle, State#{
|
|
|
- connect_module => ConnectModule,
|
|
|
|
|
connect_opts => pre_process_opts(ConnectOpts),
|
|
connect_opts => pre_process_opts(ConnectOpts),
|
|
|
replayq => Queue
|
|
replayq => Queue
|
|
|
}}.
|
|
}}.
|
|
@@ -311,15 +309,13 @@ connected(Type, Content, State) ->
|
|
|
%% Common handlers
|
|
%% Common handlers
|
|
|
common(StateName, {call, From}, status, _State) ->
|
|
common(StateName, {call, From}, status, _State) ->
|
|
|
{keep_state_and_data, [{reply, From, StateName}]};
|
|
{keep_state_and_data, [{reply, From, StateName}]};
|
|
|
-common(_StateName, {call, From}, ping, #{connection := Conn,
|
|
|
|
|
- connect_module := ConnectModule} =_State) ->
|
|
|
|
|
- Reply = ConnectModule:ping(Conn),
|
|
|
|
|
|
|
+common(_StateName, {call, From}, ping, #{connection := Conn} =_State) ->
|
|
|
|
|
+ Reply = emqx_connector_mqtt_mod:ping(Conn),
|
|
|
{keep_state_and_data, [{reply, From, Reply}]};
|
|
{keep_state_and_data, [{reply, From, Reply}]};
|
|
|
common(_StateName, {call, From}, ensure_stopped, #{connection := undefined} = _State) ->
|
|
common(_StateName, {call, From}, ensure_stopped, #{connection := undefined} = _State) ->
|
|
|
{keep_state_and_data, [{reply, From, ok}]};
|
|
{keep_state_and_data, [{reply, From, ok}]};
|
|
|
-common(_StateName, {call, From}, ensure_stopped, #{connection := Conn,
|
|
|
|
|
- connect_module := ConnectModule} = State) ->
|
|
|
|
|
- Reply = ConnectModule:stop(Conn),
|
|
|
|
|
|
|
+common(_StateName, {call, From}, ensure_stopped, #{connection := Conn} = State) ->
|
|
|
|
|
+ Reply = emqx_connector_mqtt_mod:stop(Conn),
|
|
|
{next_state, idle, State#{connection => undefined}, [{reply, From, Reply}]};
|
|
{next_state, idle, State#{connection => undefined}, [{reply, From, Reply}]};
|
|
|
common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := Forwards}}) ->
|
|
common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := Forwards}}) ->
|
|
|
{keep_state_and_data, [{reply, From, Forwards}]};
|
|
{keep_state_and_data, [{reply, From, Forwards}]};
|
|
@@ -341,22 +337,21 @@ common(StateName, Type, Content, #{name := Name} = State) ->
|
|
|
[Name, Type, StateName, Content]),
|
|
[Name, Type, StateName, Content]),
|
|
|
{keep_state, State}.
|
|
{keep_state, State}.
|
|
|
|
|
|
|
|
-do_connect(#{connect_module := ConnectModule,
|
|
|
|
|
- connect_opts := ConnectOpts = #{forwards := Forwards},
|
|
|
|
|
|
|
+do_connect(#{connect_opts := ConnectOpts = #{forwards := Forwards},
|
|
|
inflight := Inflight,
|
|
inflight := Inflight,
|
|
|
name := Name} = State) ->
|
|
name := Name} = State) ->
|
|
|
case Forwards of
|
|
case Forwards of
|
|
|
undefined -> ok;
|
|
undefined -> ok;
|
|
|
#{subscribe_local_topic := Topic} -> subscribe_local_topic(Topic, Name)
|
|
#{subscribe_local_topic := Topic} -> subscribe_local_topic(Topic, Name)
|
|
|
end,
|
|
end,
|
|
|
- case ConnectModule:start(ConnectOpts) of
|
|
|
|
|
|
|
+ case emqx_connector_mqtt_mod:start(ConnectOpts) of
|
|
|
{ok, Conn} ->
|
|
{ok, Conn} ->
|
|
|
?tp(info, connected, #{name => Name, inflight => length(Inflight)}),
|
|
?tp(info, connected, #{name => Name, inflight => length(Inflight)}),
|
|
|
{ok, State#{connection => Conn}};
|
|
{ok, State#{connection => Conn}};
|
|
|
{error, Reason} ->
|
|
{error, Reason} ->
|
|
|
ConnectOpts1 = obfuscate(ConnectOpts),
|
|
ConnectOpts1 = obfuscate(ConnectOpts),
|
|
|
- ?LOG(error, "Failed to connect with module=~p\n"
|
|
|
|
|
- "config=~p\nreason:~p", [ConnectModule, ConnectOpts1, Reason]),
|
|
|
|
|
|
|
+ ?LOG(error, "Failed to connect \n"
|
|
|
|
|
+ "config=~p\nreason:~p", [ConnectOpts1, Reason]),
|
|
|
{error, Reason, State}
|
|
{error, Reason, State}
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
@@ -385,16 +380,13 @@ pop_and_send(#{inflight := Inflight, max_inflight := Max} = State) ->
|
|
|
pop_and_send_loop(State, 0) ->
|
|
pop_and_send_loop(State, 0) ->
|
|
|
?tp(debug, inflight_full, #{}),
|
|
?tp(debug, inflight_full, #{}),
|
|
|
{ok, State};
|
|
{ok, State};
|
|
|
-pop_and_send_loop(#{replayq := Q, connect_module := Module} = State, N) ->
|
|
|
|
|
|
|
+pop_and_send_loop(#{replayq := Q} = State, N) ->
|
|
|
case replayq:is_empty(Q) of
|
|
case replayq:is_empty(Q) of
|
|
|
true ->
|
|
true ->
|
|
|
?tp(debug, replayq_drained, #{}),
|
|
?tp(debug, replayq_drained, #{}),
|
|
|
{ok, State};
|
|
{ok, State};
|
|
|
false ->
|
|
false ->
|
|
|
- BatchSize = case Module of
|
|
|
|
|
- emqx_bridge_rpc -> maps:get(batch_size, State);
|
|
|
|
|
- _ -> 1
|
|
|
|
|
- end,
|
|
|
|
|
|
|
+ BatchSize = 1,
|
|
|
Opts = #{count_limit => BatchSize, bytes_limit => 999999999},
|
|
Opts = #{count_limit => BatchSize, bytes_limit => 999999999},
|
|
|
{Q1, QAckRef, Batch} = replayq:pop(Q, Opts),
|
|
{Q1, QAckRef, Batch} = replayq:pop(Q, Opts),
|
|
|
case do_send(State#{replayq := Q1}, QAckRef, Batch) of
|
|
case do_send(State#{replayq := Q1}, QAckRef, Batch) of
|
|
@@ -407,7 +399,6 @@ pop_and_send_loop(#{replayq := Q, connect_module := Module} = State, N) ->
|
|
|
do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Batch) ->
|
|
do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Batch) ->
|
|
|
?LOG(error, "cannot forward messages to remote broker as 'bridge.mqtt.<name>.in' not configured, msg: ~p", [Batch]);
|
|
?LOG(error, "cannot forward messages to remote broker as 'bridge.mqtt.<name>.in' not configured, msg: ~p", [Batch]);
|
|
|
do_send(#{inflight := Inflight,
|
|
do_send(#{inflight := Inflight,
|
|
|
- connect_module := Module,
|
|
|
|
|
connection := Connection,
|
|
connection := Connection,
|
|
|
mountpoint := Mountpoint,
|
|
mountpoint := Mountpoint,
|
|
|
connect_opts := #{forwards := Forwards},
|
|
connect_opts := #{forwards := Forwards},
|
|
@@ -415,10 +406,10 @@ do_send(#{inflight := Inflight,
|
|
|
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
|
|
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
|
|
|
ExportMsg = fun(Message) ->
|
|
ExportMsg = fun(Message) ->
|
|
|
bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'),
|
|
bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'),
|
|
|
- emqx_connector_mqtt_msg:to_remote_msg(Module, Message, Vars)
|
|
|
|
|
|
|
+ emqx_connector_mqtt_msg:to_remote_msg(Message, Vars)
|
|
|
end,
|
|
end,
|
|
|
?LOG(debug, "publish to remote broker, msg: ~p, vars: ~p", [Batch, Vars]),
|
|
?LOG(debug, "publish to remote broker, msg: ~p, vars: ~p", [Batch, Vars]),
|
|
|
- case Module:send(Connection, [ExportMsg(M) || M <- Batch]) of
|
|
|
|
|
|
|
+ case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(M) || M <- Batch]) of
|
|
|
{ok, Refs} ->
|
|
{ok, Refs} ->
|
|
|
{ok, State#{inflight := Inflight ++ [#{q_ack_ref => QAckRef,
|
|
{ok, State#{inflight := Inflight ++ [#{q_ack_ref => QAckRef,
|
|
|
send_ack_ref => map_set(Refs),
|
|
send_ack_ref => map_set(Refs),
|
|
@@ -492,10 +483,8 @@ do_subscribe(RawTopic, Name) ->
|
|
|
{Topic, SubOpts} = emqx_topic:parse(TopicFilter, #{qos => ?QOS_2}),
|
|
{Topic, SubOpts} = emqx_topic:parse(TopicFilter, #{qos => ?QOS_2}),
|
|
|
emqx_broker:subscribe(Topic, Name, SubOpts).
|
|
emqx_broker:subscribe(Topic, Name, SubOpts).
|
|
|
|
|
|
|
|
-disconnect(#{connection := Conn,
|
|
|
|
|
- connect_module := Module
|
|
|
|
|
- } = State) when Conn =/= undefined ->
|
|
|
|
|
- Module:stop(Conn),
|
|
|
|
|
|
|
+disconnect(#{connection := Conn} = State) when Conn =/= undefined ->
|
|
|
|
|
+ emqx_connector_mqtt_mod:stop(Conn),
|
|
|
State#{connection => undefined};
|
|
State#{connection => undefined};
|
|
|
disconnect(State) ->
|
|
disconnect(State) ->
|
|
|
State.
|
|
State.
|
|
@@ -538,13 +527,6 @@ obfuscate(Map) ->
|
|
|
is_sensitive(password) -> true;
|
|
is_sensitive(password) -> true;
|
|
|
is_sensitive(_) -> false.
|
|
is_sensitive(_) -> false.
|
|
|
|
|
|
|
|
-conn_type(rpc) ->
|
|
|
|
|
- emqx_bridge_rpc;
|
|
|
|
|
-conn_type(mqtt) ->
|
|
|
|
|
- emqx_connector_mqtt_mod;
|
|
|
|
|
-conn_type(Mod) when is_atom(Mod) ->
|
|
|
|
|
- Mod.
|
|
|
|
|
-
|
|
|
|
|
str(A) when is_atom(A) ->
|
|
str(A) when is_atom(A) ->
|
|
|
atom_to_list(A);
|
|
atom_to_list(A);
|
|
|
str(B) when is_binary(B) ->
|
|
str(B) when is_binary(B) ->
|