|
|
@@ -30,7 +30,7 @@
|
|
|
|
|
|
%% callbacks for emqtt
|
|
|
-export([ handle_puback/2
|
|
|
- , handle_publish/2
|
|
|
+ , handle_publish/3
|
|
|
, handle_disconnected/2
|
|
|
]).
|
|
|
|
|
|
@@ -52,7 +52,7 @@ start(Config) ->
|
|
|
Mountpoint = maps:get(receive_mountpoint, Config, undefined),
|
|
|
Subscriptions = maps:get(subscriptions, Config, undefined),
|
|
|
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions),
|
|
|
- Handlers = make_hdlr(Parent, Vars),
|
|
|
+ Handlers = make_hdlr(Parent, Vars, #{server => ip_port_to_server(Host, Port)}),
|
|
|
Config1 = Config#{
|
|
|
msg_handler => Handlers,
|
|
|
host => Host,
|
|
|
@@ -161,12 +161,12 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) ->
|
|
|
?SLOG(warning, #{msg => "publish_to_remote_node_falied",
|
|
|
packet_id => PktId, reason_code => RC}).
|
|
|
|
|
|
-handle_publish(Msg, undefined) ->
|
|
|
+handle_publish(Msg, undefined, _Opts) ->
|
|
|
?SLOG(error, #{msg => "cannot_publish_to_local_broker_as"
|
|
|
"_'ingress'_is_not_configured",
|
|
|
message => Msg});
|
|
|
-handle_publish(#{properties := Props} = Msg0, Vars) ->
|
|
|
- Msg = format_msg_received(Msg0),
|
|
|
+handle_publish(#{properties := Props} = Msg0, Vars, Opts) ->
|
|
|
+ Msg = format_msg_received(Msg0, Opts),
|
|
|
?SLOG(debug, #{msg => "publish_to_local_broker",
|
|
|
message => Msg, vars => Vars}),
|
|
|
case Vars of
|
|
|
@@ -179,9 +179,9 @@ handle_publish(#{properties := Props} = Msg0, Vars) ->
|
|
|
handle_disconnected(Reason, Parent) ->
|
|
|
Parent ! {disconnected, self(), Reason}.
|
|
|
|
|
|
-make_hdlr(Parent, Vars) ->
|
|
|
+make_hdlr(Parent, Vars, Opts) ->
|
|
|
#{puback => {fun ?MODULE:handle_puback/2, [Parent]},
|
|
|
- publish => {fun ?MODULE:handle_publish/2, [Vars]},
|
|
|
+ publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]},
|
|
|
disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]}
|
|
|
}.
|
|
|
|
|
|
@@ -212,8 +212,9 @@ maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopi
|
|
|
end.
|
|
|
|
|
|
format_msg_received(#{dup := Dup, payload := Payload, properties := Props,
|
|
|
- qos := QoS, retain := Retain, topic := Topic}) ->
|
|
|
+ qos := QoS, retain := Retain, topic := Topic}, #{server := Server}) ->
|
|
|
#{ id => emqx_guid:to_hexstr(emqx_guid:gen())
|
|
|
+ , server => Server
|
|
|
, payload => Payload
|
|
|
, topic => Topic
|
|
|
, qos => QoS
|
|
|
@@ -236,3 +237,10 @@ printable_maps(Headers) ->
|
|
|
};
|
|
|
(K, V0, AccIn) -> AccIn#{K => V0}
|
|
|
end, #{}, Headers).
|
|
|
+
|
|
|
+ip_port_to_server(Host, Port) ->
|
|
|
+ HostStr = case inet:ntoa(Host) of
|
|
|
+ {error, einval} -> Host;
|
|
|
+ IPStr -> IPStr
|
|
|
+ end,
|
|
|
+ list_to_binary(io_lib:format("~s:~w", [HostStr, Port])).
|