Browse Source

Merge pull request #9952 from keynslug/fix/mqtt-bridge-no-qos2-clean-start

fix(mqtt-bridge): disallow QoS 2 on ingress bridges
Andrew Mayorov 3 years atrás
parent
commit
2ed54e560f

+ 37 - 9
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -52,7 +52,7 @@
 -define(INGRESS_CONF, #{
     <<"remote">> => #{
         <<"topic">> => <<?INGRESS_REMOTE_TOPIC, "/#">>,
-        <<"qos">> => 2
+        <<"qos">> => 1
     },
     <<"local">> => #{
         <<"topic">> => <<?INGRESS_LOCAL_TOPIC, "/${topic}">>,
@@ -77,7 +77,7 @@
 -define(INGRESS_CONF_NO_PAYLOAD_TEMPLATE, #{
     <<"remote">> => #{
         <<"topic">> => <<?INGRESS_REMOTE_TOPIC, "/#">>,
-        <<"qos">> => 2
+        <<"qos">> => 1
     },
     <<"local">> => #{
         <<"topic">> => <<?INGRESS_LOCAL_TOPIC, "/${topic}">>,
@@ -242,26 +242,54 @@ t_mqtt_conn_bridge_ingress(_) ->
 
     ok.
 
-t_mqtt_conn_bridge_ignores_clean_start(_) ->
+t_mqtt_egress_bridge_ignores_clean_start(_) ->
     BridgeName = atom_to_binary(?FUNCTION_NAME),
     BridgeID = create_bridge(
         ?SERVER_CONF(<<"user1">>)#{
             <<"type">> => ?TYPE_MQTT,
             <<"name">> => BridgeName,
-            <<"ingress">> => ?INGRESS_CONF,
+            <<"egress">> => ?EGRESS_CONF,
             <<"clean_start">> => false
         }
     ),
 
-    {ok, 200, BridgeJSON} = request(get, uri(["bridges", BridgeID]), []),
-    Bridge = jsx:decode(BridgeJSON),
+    {ok, _, #{state := #{name := WorkerName}}} =
+        emqx_resource:get_instance(emqx_bridge_resource:resource_id(BridgeID)),
+    ?assertMatch(
+        #{clean_start := true},
+        maps:from_list(emqx_connector_mqtt_worker:info(WorkerName))
+    ),
+
+    %% delete the bridge
+    {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
+
+    ok.
+
+t_mqtt_conn_bridge_ingress_downgrades_qos_2(_) ->
+    BridgeName = atom_to_binary(?FUNCTION_NAME),
+    BridgeID = create_bridge(
+        ?SERVER_CONF(<<"user1">>)#{
+            <<"type">> => ?TYPE_MQTT,
+            <<"name">> => BridgeName,
+            <<"ingress">> => emqx_map_lib:deep_merge(
+                ?INGRESS_CONF,
+                #{<<"remote">> => #{<<"qos">> => 2}}
+            )
+        }
+    ),
+
+    RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
+    LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
+    Payload = <<"whatqos">>,
+    emqx:subscribe(LocalTopic),
+    emqx:publish(emqx_message:make(undefined, _QoS = 2, RemoteTopic, Payload)),
 
-    %% verify that there's no `clean_start` in response
-    ?assertEqual(#{}, maps:with([<<"clean_start">>], Bridge)),
+    %% we should receive a message on the local broker, with specified topic
+    Msg = assert_mqtt_msg_received(LocalTopic, Payload),
+    ?assertMatch(#message{qos = 1}, Msg),
 
     %% delete the bridge
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
-    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
 
     ok.
 

+ 2 - 2
apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf

@@ -159,8 +159,8 @@ broker MUST support this feature."""
 
     clean_start {
         desc {
-          en: "The clean-start or the clean-session of the MQTT protocol"
-          zh: "MQTT 清除会话"
+          en: "Whether or not to start a clean session when reconnecting a remote broker for ingress bridge"
+          zh: "与 ingress MQTT 桥的远程服务器重连时是否清除老的 MQTT 会话"
         }
         label: {
               en: "Clean Session"

+ 2 - 4
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -251,6 +251,7 @@ basic_config(
         server := Server,
         proto_ver := ProtoVer,
         bridge_mode := BridgeMode,
+        clean_start := CleanStart,
         keepalive := KeepAlive,
         retry_interval := RetryIntv,
         max_inflight := MaxInflight,
@@ -270,11 +271,8 @@ basic_config(
         %% non-standard mqtt connection packets will be filtered out by LB.
         %% So let's disable bridge_mode.
         bridge_mode => BridgeMode,
-        %% NOTE
-        %% We are ignoring the user configuration here because there's currently no reliable way
-        %% to ensure proper session recovery according to the MQTT spec.
-        clean_start => true,
         keepalive => ms_to_s(KeepAlive),
+        clean_start => CleanStart,
         retry_interval => RetryIntv,
         max_inflight => MaxInflight,
         ssl => EnableSsl,

+ 4 - 6
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl

@@ -18,6 +18,7 @@
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx/include/logger.hrl").
 
 -behaviour(hocon_schema).
 
@@ -111,9 +112,7 @@ fields("server_configs") ->
                 boolean(),
                 #{
                     default => true,
-                    desc => ?DESC("clean_start"),
-                    hidden => true,
-                    deprecated => {since, "v5.0.16"}
+                    desc => ?DESC("clean_start")
                 }
             )},
         {keepalive, mk_duration("MQTT Keepalive.", #{default => "300s"})},
@@ -143,8 +142,7 @@ fields("ingress") ->
             mk(
                 ref(?MODULE, "ingress_local"),
                 #{
-                    desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local"),
-                    is_required => false
+                    desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local")
                 }
             )}
     ];
@@ -161,7 +159,7 @@ fields("ingress_remote") ->
             )},
         {qos,
             mk(
-                qos(),
+                emqx_schema:qos(),
                 #{
                     default => 1,
                     desc => ?DESC("ingress_remote_qos")

+ 51 - 9
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl

@@ -75,6 +75,7 @@
     connect/1,
     status/1,
     ping/1,
+    info/1,
     send_to_remote/2,
     send_to_remote_async/3
 ]).
@@ -114,7 +115,7 @@ start_link(Name, BridgeOpts) ->
         name => Name,
         options => BridgeOpts
     }),
-    Conf = init_config(BridgeOpts),
+    Conf = init_config(Name, BridgeOpts),
     Options = mk_client_options(Conf, BridgeOpts),
     case emqtt:start_link(Options) of
         {ok, Pid} ->
@@ -129,13 +130,13 @@ start_link(Name, BridgeOpts) ->
             Error
     end.
 
-init_config(Opts) ->
+init_config(Name, Opts) ->
     Mountpoint = maps:get(forward_mountpoint, Opts, undefined),
     Subscriptions = maps:get(subscriptions, Opts, undefined),
     Forwards = maps:get(forwards, Opts, undefined),
     #{
         mountpoint => format_mountpoint(Mountpoint),
-        subscriptions => pre_process_subscriptions(Subscriptions),
+        subscriptions => pre_process_subscriptions(Subscriptions, Name, Opts),
         forwards => pre_process_forwards(Forwards)
     }.
 
@@ -145,6 +146,16 @@ mk_client_options(Conf, BridgeOpts) ->
     Mountpoint = maps:get(receive_mountpoint, BridgeOpts, undefined),
     Subscriptions = maps:get(subscriptions, Conf),
     Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions),
+    CleanStart =
+        case Subscriptions of
+            #{remote := _} ->
+                maps:get(clean_start, BridgeOpts);
+            undefined ->
+                %% NOTE
+                %% We are ignoring the user configuration here because there's currently no reliable way
+                %% to ensure proper session recovery according to the MQTT spec.
+                true
+        end,
     Opts = maps:without(
         [
             address,
@@ -160,6 +171,7 @@ mk_client_options(Conf, BridgeOpts) ->
     Opts#{
         msg_handler => mk_client_event_handler(Vars, #{server => Server}),
         hosts => [HostPort],
+        clean_start => CleanStart,
         force_ping => true,
         proto_ver => maps:get(proto_ver, BridgeOpts, v4)
     }.
@@ -205,10 +217,12 @@ subscribe_remote_topics(_Ref, undefined) ->
 stop(Ref) ->
     emqtt:stop(ref(Ref)).
 
+info(Ref) ->
+    emqtt:info(ref(Ref)).
+
 status(Ref) ->
     try
-        Info = emqtt:info(ref(Ref)),
-        case proplists:get_value(socket, Info) of
+        case proplists:get_value(socket, info(Ref)) of
             Socket when Socket /= undefined ->
                 connected;
             undefined ->
@@ -282,11 +296,18 @@ format_mountpoint(undefined) ->
 format_mountpoint(Prefix) ->
     binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
 
-pre_process_subscriptions(undefined) ->
+pre_process_subscriptions(undefined, _, _) ->
     undefined;
-pre_process_subscriptions(#{local := LC} = Conf) when is_map(Conf) ->
-    Conf#{local => pre_process_in_out_common(LC)};
-pre_process_subscriptions(Conf) when is_map(Conf) ->
+pre_process_subscriptions(
+    #{remote := RC, local := LC} = Conf,
+    BridgeName,
+    BridgeOpts
+) when is_map(Conf) ->
+    Conf#{
+        remote => pre_process_in_remote(RC, BridgeName, BridgeOpts),
+        local => pre_process_in_out_common(LC)
+    };
+pre_process_subscriptions(Conf, _, _) when is_map(Conf) ->
     %% have no 'local' field in the config
     undefined.
 
@@ -314,6 +335,27 @@ pre_process_conf(Key, Conf) ->
             Conf#{Key => Val}
     end.
 
+pre_process_in_remote(#{qos := QoSIn} = Conf, BridgeName, BridgeOpts) ->
+    QoS = downgrade_ingress_qos(QoSIn),
+    case QoS of
+        QoSIn ->
+            ok;
+        _ ->
+            ?SLOG(warning, #{
+                msg => "downgraded_unsupported_ingress_qos",
+                qos_configured => QoSIn,
+                qos_used => QoS,
+                name => BridgeName,
+                options => BridgeOpts
+            })
+    end,
+    Conf#{qos => QoS}.
+
+downgrade_ingress_qos(2) ->
+    1;
+downgrade_ingress_qos(QoS) ->
+    QoS.
+
 get_pid(Name) ->
     case gproc:where(?NAME(Name)) of
         Pid when is_pid(Pid) ->

+ 2 - 0
changes/v5.0.17/fix-9952.en.md

@@ -0,0 +1,2 @@
+Disallow subscribing with QoS 2 for ingress MQTT bridges.
+Allow user to configure `clean_start` option for ingress MQTT bridges however.

+ 2 - 0
changes/v5.0.17/fix-9952.zh.md

@@ -0,0 +1,2 @@
+不允许对 ingress MQTT 网桥的 QoS 2 进行订阅。
+但允许用户为 ingress MQTT 桥配置 "clean_start" 选项。