Procházet zdrojové kódy

fix(mqtt-bridge): disallow QoS 2 on ingress bridges

Andrew Mayorov před 3 roky
rodič
revize
7002fe2ef4

+ 30 - 2
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -52,7 +52,7 @@
 -define(INGRESS_CONF, #{
     <<"remote">> => #{
         <<"topic">> => <<?INGRESS_REMOTE_TOPIC, "/#">>,
-        <<"qos">> => 2
+        <<"qos">> => 1
     },
     <<"local">> => #{
         <<"topic">> => <<?INGRESS_LOCAL_TOPIC, "/${topic}">>,
@@ -77,7 +77,7 @@
 -define(INGRESS_CONF_NO_PAYLOAD_TEMPLATE, #{
     <<"remote">> => #{
         <<"topic">> => <<?INGRESS_REMOTE_TOPIC, "/#">>,
-        <<"qos">> => 2
+        <<"qos">> => 1
     },
     <<"local">> => #{
         <<"topic">> => <<?INGRESS_LOCAL_TOPIC, "/${topic}">>,
@@ -265,6 +265,34 @@ t_mqtt_conn_bridge_ignores_clean_start(_) ->
 
     ok.
 
+t_mqtt_conn_bridge_ingress_downgrades_qos_2(_) ->
+    BridgeName = atom_to_binary(?FUNCTION_NAME),
+    BridgeID = create_bridge(
+        ?SERVER_CONF(<<"user1">>)#{
+            <<"type">> => ?TYPE_MQTT,
+            <<"name">> => BridgeName,
+            <<"ingress">> => emqx_map_lib:deep_merge(
+                ?INGRESS_CONF,
+                #{<<"remote">> => #{<<"qos">> => 2}}
+            )
+        }
+    ),
+
+    RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
+    LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
+    Payload = <<"whatqos">>,
+    emqx:subscribe(LocalTopic),
+    emqx:publish(emqx_message:make(undefined, _QoS = 2, RemoteTopic, Payload)),
+
+    %% we should receive a message on the local broker, with specified topic
+    Msg = assert_mqtt_msg_received(LocalTopic, Payload),
+    ?assertMatch(#message{qos = 1}, Msg),
+
+    %% delete the bridge
+    {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
+
+    ok.
+
 t_mqtt_conn_bridge_ingress_no_payload_template(_) ->
     User1 = <<"user1">>,
     BridgeIDIngress = create_bridge(

+ 3 - 3
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl

@@ -18,6 +18,7 @@
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 -behaviour(hocon_schema).
 
@@ -143,8 +144,7 @@ fields("ingress") ->
             mk(
                 ref(?MODULE, "ingress_local"),
                 #{
-                    desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local"),
-                    is_required => false
+                    desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local")
                 }
             )}
     ];
@@ -161,7 +161,7 @@ fields("ingress_remote") ->
             )},
         {qos,
             mk(
-                qos(),
+                emqx_schema:qos(),
                 #{
                     default => 1,
                     desc => ?DESC("ingress_remote_qos")

+ 35 - 7
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl

@@ -114,7 +114,7 @@ start_link(Name, BridgeOpts) ->
         name => Name,
         options => BridgeOpts
     }),
-    Conf = init_config(BridgeOpts),
+    Conf = init_config(Name, BridgeOpts),
     Options = mk_client_options(Conf, BridgeOpts),
     case emqtt:start_link(Options) of
         {ok, Pid} ->
@@ -129,13 +129,13 @@ start_link(Name, BridgeOpts) ->
             Error
     end.
 
-init_config(Opts) ->
+init_config(Name, Opts) ->
     Mountpoint = maps:get(forward_mountpoint, Opts, undefined),
     Subscriptions = maps:get(subscriptions, Opts, undefined),
     Forwards = maps:get(forwards, Opts, undefined),
     #{
         mountpoint => format_mountpoint(Mountpoint),
-        subscriptions => pre_process_subscriptions(Subscriptions),
+        subscriptions => pre_process_subscriptions(Subscriptions, Name, Opts),
         forwards => pre_process_forwards(Forwards)
     }.
 
@@ -282,11 +282,18 @@ format_mountpoint(undefined) ->
 format_mountpoint(Prefix) ->
     binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
 
-pre_process_subscriptions(undefined) ->
+pre_process_subscriptions(undefined, _, _) ->
     undefined;
-pre_process_subscriptions(#{local := LC} = Conf) when is_map(Conf) ->
-    Conf#{local => pre_process_in_out_common(LC)};
-pre_process_subscriptions(Conf) when is_map(Conf) ->
+pre_process_subscriptions(
+    #{remote := RC, local := LC} = Conf,
+    BridgeName,
+    BridgeOpts
+) when is_map(Conf) ->
+    Conf#{
+        remote => pre_process_in_remote(RC, BridgeName, BridgeOpts),
+        local => pre_process_in_out_common(LC)
+    };
+pre_process_subscriptions(Conf, _, _) when is_map(Conf) ->
     %% have no 'local' field in the config
     undefined.
 
@@ -314,6 +321,27 @@ pre_process_conf(Key, Conf) ->
             Conf#{Key => Val}
     end.
 
+pre_process_in_remote(#{qos := QoSIn} = Conf, BridgeName, BridgeOpts) ->
+    QoS = downgrade_ingress_qos(QoSIn),
+    case QoS of
+        QoSIn ->
+            ok;
+        _ ->
+            ?SLOG(warning, #{
+                msg => "downgraded_unsupported_ingress_qos",
+                qos_configured => QoSIn,
+                qos_used => QoS,
+                name => BridgeName,
+                options => BridgeOpts
+            })
+    end,
+    Conf#{qos => QoS}.
+
+downgrade_ingress_qos(2) ->
+    1;
+downgrade_ingress_qos(QoS) ->
+    QoS.
+
 get_pid(Name) ->
     case gproc:where(?NAME(Name)) of
         Pid when is_pid(Pid) ->