Sfoglia il codice sorgente

Merge pull request #13216 from zmstone/0610-mqtt-bridge-respect-clientid-prefix

fix(bridge/mqtt): respect client ID prefix
zmstone 1 anno fa
parent
commit
f911347318

+ 1 - 1
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_mqtt, [
     {description, "EMQX MQTT Broker Bridge"},
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {registered, []},
     {applications, [
         kernel,

+ 14 - 4
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl

@@ -57,6 +57,7 @@
 -define(HEALTH_CHECK_TIMEOUT, 1000).
 -define(INGRESS, "I").
 -define(EGRESS, "E").
+-define(IS_NO_PREFIX(P), (P =:= undefined orelse P =:= <<>>)).
 
 %% ===================================================================
 %% When use this bridge as a data source, ?MODULE:on_message_received will be called
@@ -441,9 +442,9 @@ ms_to_s(Ms) ->
 clientid(Name, _Conf = #{clientid_prefix := Prefix}) when
     is_binary(Prefix) andalso Prefix =/= <<>>
 ->
-    emqx_bridge_mqtt_lib:clientid_base([Prefix, $:, Name]);
+    {Prefix, emqx_bridge_mqtt_lib:clientid_base(Name)};
 clientid(Name, _Conf) ->
-    emqx_bridge_mqtt_lib:clientid_base([Name]).
+    {undefined, emqx_bridge_mqtt_lib:clientid_base(Name)}.
 
 %% @doc Start an ingress bridge worker.
 -spec connect([option() | {ecpool_worker_id, pos_integer()}]) ->
@@ -481,8 +482,17 @@ mk_client_opts(
         msg_handler => mk_client_event_handler(Name, TopicToHandlerIndex)
     }.
 
-mk_clientid(WorkerId, ClientId) ->
-    emqx_bridge_mqtt_lib:bytes23([ClientId], WorkerId).
+mk_clientid(WorkerId, {Prefix, ClientId}) when ?IS_NO_PREFIX(Prefix) ->
+    %% When there is no prefix, try to keep the client ID length within 23 bytes
+    emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId);
+mk_clientid(WorkerId, {Prefix, ClientId}) when size(Prefix) < 20 ->
+    %% Try to respect client ID prefix when it's less than 20 bytes
+    %% meaning there is at least 3 bytes to randomize
+    %% Must add $: for backward compatibility
+    emqx_bridge_mqtt_lib:bytes23_with_prefix(Prefix, ClientId, WorkerId);
+mk_clientid(WorkerId, {Prefix, ClientId}) ->
+    %% There is no other option but to use a long client ID
+    iolist_to_binary([Prefix, ClientId, $:, integer_to_binary(WorkerId)]).
 
 mk_client_event_handler(Name, TopicToHandlerIndex) ->
     #{

+ 17 - 6
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_lib.erl

@@ -16,7 +16,7 @@
 
 -module(emqx_bridge_mqtt_lib).
 
--export([clientid_base/1, bytes23/2]).
+-export([clientid_base/1, bytes23/2, bytes23_with_prefix/3]).
 
 %% @doc Make the base ID of client IDs.
 %% A base ID is used to concatenate with pool worker ID to build a
@@ -28,18 +28,29 @@ clientid_base(Name) ->
     bin([Name, shortener(atom_to_list(node()), 8)]).
 
 %% @doc Limit the number of bytes for client ID under 23 bytes.
-%% If Prefix and suffix concatenated is longer than 23 bytes
+%% If ClientID base and suffix concatenated is longer than 23 bytes
 %% it hashes the concatenation and replace the non-random suffix.
-bytes23(Prefix, SeqNo) ->
+bytes23(ClientId, SeqNo) ->
+    bytes_n(ClientId, SeqNo, 23).
+
+bytes_n(ClientId, SeqNo, N) ->
     Suffix = integer_to_binary(SeqNo),
-    Concat = bin([Prefix, $:, Suffix]),
-    case size(Concat) =< 23 of
+    Concat = bin([ClientId, $:, Suffix]),
+    case size(Concat) =< N of
         true ->
             Concat;
         false ->
-            shortener(Concat, 23)
+            shortener(Concat, N)
     end.
 
+%% @doc Limit the number of bytes for client ID under 23 bytes.
+%% If Prefix, ClientID base and suffix concatenated is longer than 23 bytes
+%% it hashes the ClientID and SeqNo before appended to the Prefix
+bytes23_with_prefix(Prefix, ClientId, SeqNo) when Prefix =/= <<>> ->
+    SuffixLen = 23 - size(Prefix),
+    true = (SuffixLen > 0),
+    bin([Prefix, bytes_n(ClientId, SeqNo, SuffixLen)]).
+
 %% @private SHA hash a string and return the prefix of
 %% the given length as hex string in binary format.
 shortener(Str, Length) when is_list(Str) ->

+ 25 - 1
apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl

@@ -568,6 +568,7 @@ t_egress_short_clientid(_Config) ->
     Name = <<"abc01234">>,
     BaseId = emqx_bridge_mqtt_lib:clientid_base([Name]),
     ExpectedClientId = iolist_to_binary([BaseId, $:, "1"]),
+    ?assertMatch(<<"abc01234", _/binary>>, ExpectedClientId),
     test_egress_clientid(Name, ExpectedClientId).
 
 t_egress_long_clientid(_Config) ->
@@ -578,11 +579,34 @@ t_egress_long_clientid(_Config) ->
     ExpectedClientId = emqx_bridge_mqtt_lib:bytes23(BaseId, 1),
     test_egress_clientid(Name, ExpectedClientId).
 
+t_egress_with_short_prefix(_Config) ->
+    %% Expect the actual client ID in use is hashed from
+    %% <prefix>head(sha1(<name><nodename-hash>:<pool_worker_id>), 16)
+    Prefix = <<"012-">>,
+    Name = <<"345">>,
+    BaseId = emqx_bridge_mqtt_lib:clientid_base([Name]),
+    ExpectedClientId = emqx_bridge_mqtt_lib:bytes23_with_prefix(Prefix, BaseId, 1),
+    ?assertMatch(<<"012-", _/binary>>, ExpectedClientId),
+    test_egress_clientid(Name, Prefix, ExpectedClientId).
+
+t_egress_with_long_prefix(_Config) ->
+    %% Expect the actual client ID in use is hashed from
+    %% <prefix><name><nodename-hash>:<pool_worker_id>
+    Prefix = <<"0123456789abcdef01234-">>,
+    Name = <<"345">>,
+    BaseId = emqx_bridge_mqtt_lib:clientid_base([Name]),
+    ExpectedClientId = iolist_to_binary([Prefix, BaseId, <<":1">>]),
+    test_egress_clientid(Name, Prefix, ExpectedClientId).
+
 test_egress_clientid(Name, ExpectedClientId) ->
+    test_egress_clientid(Name, <<>>, ExpectedClientId).
+
+test_egress_clientid(Name, ClientIdPrefix, ExpectedClientId) ->
     BridgeIDEgress = create_bridge(
         ?SERVER_CONF#{
             <<"name">> => Name,
-            <<"egress">> => (?EGRESS_CONF)#{<<"pool_size">> => 1}
+            <<"egress">> => (?EGRESS_CONF)#{<<"pool_size">> => 1},
+            <<"clientid_prefix">> => ClientIdPrefix
         }
     ),
     LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,

+ 10 - 0
changes/ce/fix-13216.en.md

@@ -0,0 +1,10 @@
+Respcet `clientid_prefix` config for MQTT bridges.
+
+As of version 5.4.1, EMQX limits MQTT Client ID lengths to 23 bytes.
+Previously, the system included the `clientid_prefix` in the hash calculation of the original, excessively long Client ID, thereby impacting the resulting shortened ID.
+
+Change Details:
+- Without Prefix: Behavior remains unchanged; EMQX will hash the entire Client ID into a 23-byte space (when longer than 23 bytes).
+- With Prefix:
+  - Prefix no more than 19 bytes: The prefix is preserved, and the remaining suffix is hashed into a 4-byte space.
+  - Prefix is 20 or more bytes: EMQX no longer attempts to shorten the Client ID, respecting the configured prefix in its entirety.