|
|
@@ -216,31 +216,14 @@ sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) ->
|
|
|
process_config(Config) ->
|
|
|
maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config).
|
|
|
|
|
|
-maybe_publish_to_local_broker(
|
|
|
- #{topic := Topic} = Msg,
|
|
|
- #{remote_topic := SubTopic} = Vars,
|
|
|
- Props
|
|
|
-) ->
|
|
|
+maybe_publish_to_local_broker(Msg, Vars, Props) ->
|
|
|
case maps:get(local_topic, Vars, undefined) of
|
|
|
undefined ->
|
|
|
%% local topic is not set, discard it
|
|
|
ok;
|
|
|
_ ->
|
|
|
- case emqx_topic:match(Topic, SubTopic) of
|
|
|
- true ->
|
|
|
- _ = emqx_broker:publish(
|
|
|
- emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props)
|
|
|
- ),
|
|
|
- ok;
|
|
|
- false ->
|
|
|
- ?SLOG(warning, #{
|
|
|
- msg => "discard_message_as_topic_not_matched",
|
|
|
- message => Msg,
|
|
|
- subscribed => SubTopic,
|
|
|
- got_topic => Topic
|
|
|
- })
|
|
|
- end
|
|
|
- end.
|
|
|
+ _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props))
|
|
|
+end.
|
|
|
|
|
|
format_msg_received(
|
|
|
#{
|