Преглед изворни кода

feat: Add new configuration item shared_subscription_initial_sticky_pick

Add new configuration item shared_subscription_initial_sticky_pick for
choosing the strategy for the initial pick when shared_subscription_strategy
is 'sticky'.
Heikki Holstila пре 1 година
родитељ
комит
3d2d8a5cc4

+ 26 - 0
apps/emqx/src/emqx_schema.erl

@@ -1332,6 +1332,19 @@ fields("shared_subscription_group") ->
                     default => random,
                     desc => ?DESC(shared_subscription_strategy_enum)
                 }
+            )},
+        {"initial_sticky_pick",
+            sc(
+                hoconsc:enum([
+                    random,
+                    local,
+                    hash_topic,
+                    hash_clientid
+                ]),
+                #{
+                    default => random,
+                    desc => ?DESC(shared_subscription_initial_sticky_pick_enum)
+                }
             )}
     ];
 fields("broker_perf") ->
@@ -3581,6 +3594,19 @@ mqtt_general() ->
                     desc => ?DESC(mqtt_shared_subscription_strategy)
                 }
             )},
+        {"shared_subscription_initial_sticky_pick",
+            sc(
+                hoconsc:enum([
+                    random,
+                    local,
+                    hash_topic,
+                    hash_clientid
+                ]),
+                #{
+                    default => random,
+                    desc => ?DESC(mqtt_shared_subscription_initial_sticky_pick)
+                }
+            )},
         {"exclusive_subscription",
             sc(
                 boolean(),

+ 30 - 3
apps/emqx/src/emqx_shared_sub.erl

@@ -54,7 +54,8 @@
 -ifdef(TEST).
 -export([
     subscribers/2,
-    strategy/1
+    strategy/1,
+    initial_sticky_pick/1
 ]).
 -endif.
 
@@ -84,6 +85,14 @@
     | hash_clientid
     | hash_topic.
 
+-export_type([initial_sticky_pick/0]).
+
+-type initial_sticky_pick() ::
+    random
+    | local
+    | hash_clientid
+    | hash_topic.
+
 -define(SERVER, ?MODULE).
 
 -define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
@@ -167,6 +176,20 @@ strategy(Group) ->
             get_default_shared_subscription_strategy()
     end.
 
+-spec initial_sticky_pick(emqx_types:group()) -> initial_sticky_pick().
+initial_sticky_pick(Group) ->
+    try binary_to_existing_atom(Group) of
+        GroupAtom ->
+            Key = [broker, shared_subscription_group, GroupAtom, initial_sticky_pick],
+            case emqx:get_config(Key, ?CONFIG_NOT_FOUND_MAGIC) of
+                ?CONFIG_NOT_FOUND_MAGIC -> get_default_shared_subscription_initial_sticky_pick();
+                InitialStickyPick -> InitialStickyPick
+            end
+    catch
+        error:badarg ->
+            get_default_shared_subscription_initial_sticky_pick()
+    end.
+
 -spec ack_enabled() -> boolean().
 ack_enabled() ->
     emqx:get_config([broker, shared_dispatch_ack_enabled], false).
@@ -318,9 +341,10 @@ pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
             %% keep using it for sticky strategy
             {fresh, Sub0};
         false ->
-            %% randomly pick one for the first message
+            %% pick the initial subscriber by the configured strategy
+            InitialStrategy = initial_sticky_pick(Group),
             FailedSubs1 = FailedSubs#{Sub0 => ?SUBSCRIBER_DOWN},
-            Res = do_pick(All, random, ClientId, SourceTopic, Group, Topic, FailedSubs1),
+            Res = do_pick(All, InitialStrategy, ClientId, SourceTopic, Group, Topic, FailedSubs1),
             case Res of
                 {_, Sub} ->
                     %% stick to whatever pick result
@@ -556,3 +580,6 @@ delete_route_if_needed({Group, Topic} = GroupTopic) ->
 
 get_default_shared_subscription_strategy() ->
     emqx:get_config([mqtt, shared_subscription_strategy]).
+
+get_default_shared_subscription_initial_sticky_pick() ->
+    emqx:get_config([mqtt, shared_subscription_initial_sticky_pick]).

+ 1 - 0
apps/emqx/test/emqx_config_SUITE.erl

@@ -451,6 +451,7 @@ zone_global_defaults() ->
                 session_expiry_interval => 7200000,
                 shared_subscription => true,
                 shared_subscription_strategy => round_robin,
+                shared_subscription_initial_sticky_pick => random,
                 strict_mode => false,
                 upgrade_qos => false,
                 use_username_as_clientid => false,

+ 42 - 5
apps/emqx/test/emqx_shared_sub_SUITE.erl

@@ -346,6 +346,14 @@ t_sticky(Config) when is_list(Config) ->
     ok = ensure_config(sticky, true),
     test_two_messages(sticky).
 
+t_sticky_initial_pick_hash_clientid(Config) when is_list(Config) ->
+    ok = ensure_config(sticky, hash_clientid, false),
+    test_two_messages(sticky).
+
+t_sticky_initial_pick_hash_topic(Config) when is_list(Config) ->
+    ok = ensure_config(sticky, hash_topic, false),
+    test_two_messages(sticky).
+
 %% two subscribers in one shared group
 %% one unsubscribe after receiving a message
 %% the other one in the group should receive the next message
@@ -531,6 +539,8 @@ t_per_group_config(Config) when is_list(Config) ->
         <<"local_group">> => local,
         <<"round_robin_group">> => round_robin,
         <<"sticky_group">> => sticky,
+        <<"sticky_group_initial_pick_hash_clientid">> => {sticky, hash_clientid},
+        <<"sticky_group_initial_pick_hash_topic">> => {sticky, hash_topic},
         <<"round_robin_per_group_group">> => round_robin_per_group
     }),
     %% Each test is repeated 4 times because random strategy may technically pass the test
@@ -538,10 +548,18 @@ t_per_group_config(Config) when is_list(Config) ->
 
     test_two_messages(sticky, <<"sticky_group">>),
     test_two_messages(sticky, <<"sticky_group">>),
+    test_two_messages(sticky, <<"sticky_group_initial_pick_hash_clientid">>),
+    test_two_messages(sticky, <<"sticky_group_initial_pick_hash_clientid">>),
+    test_two_messages(sticky, <<"sticky_group_initial_pick_hash_topic">>),
+    test_two_messages(sticky, <<"sticky_group_initial_pick_hash_topic">>),
     test_two_messages(round_robin, <<"round_robin_group">>),
     test_two_messages(round_robin, <<"round_robin_group">>),
     test_two_messages(sticky, <<"sticky_group">>),
     test_two_messages(sticky, <<"sticky_group">>),
+    test_two_messages(sticky, <<"sticky_group_initial_pick_hash_clientid">>),
+    test_two_messages(sticky, <<"sticky_group_initial_pick_hash_clientid">>),
+    test_two_messages(sticky, <<"sticky_group_initial_pick_hash_topic">>),
+    test_two_messages(sticky, <<"sticky_group_initial_pick_hash_topic">>),
     test_two_messages(round_robin, <<"round_robin_group">>),
     test_two_messages(round_robin, <<"round_robin_group">>),
     test_two_messages(round_robin_per_group, <<"round_robin_per_group_group">>),
@@ -1182,10 +1200,14 @@ collect_msgs(Acc, Timeout) ->
     end.
 
 ensure_config(Strategy) ->
-    ensure_config(Strategy, _AckEnabled = true).
+    ensure_config(Strategy, _InitialStickyPick = random, _AckEnabled = true).
+
+ensure_config(Strategy, AckEnabled) when is_boolean(AckEnabled) ->
+    ensure_config(Strategy, _InitialStickyPick = random, AckEnabled).
 
-ensure_config(Strategy, AckEnabled) ->
+ensure_config(Strategy, InitialStickyPick, AckEnabled) ->
     emqx_config:put([mqtt, shared_subscription_strategy], Strategy),
+    emqx_config:put([mqtt, shared_subscription_initial_sticky_pick], InitialStickyPick),
     emqx_config:put([broker, shared_dispatch_ack_enabled], AckEnabled),
     ok.
 
@@ -1195,9 +1217,24 @@ ensure_node_config(Node, Strategy) ->
 ensure_group_config(Group2Strategy) ->
     lists:foreach(
         fun({Group, Strategy}) ->
-            emqx_config:force_put(
-                [broker, shared_subscription_group, Group, strategy], Strategy, unsafe
-            )
+            if
+                is_tuple(Strategy) ->
+                    {PrimaryStrategy, InitialStickyPick} = Strategy,
+                    emqx_config:force_put(
+                        [broker, shared_subscription_group, Group, strategy],
+                        PrimaryStrategy,
+                        unsafe
+                    ),
+                    emqx_config:force_put(
+                        [broker, shared_subscription_group, Group, initial_sticky_pick],
+                        InitialStickyPick,
+                        unsafe
+                    );
+                true ->
+                    emqx_config:force_put(
+                        [broker, shared_subscription_group, Group, strategy], Strategy, unsafe
+                    )
+            end
         end,
         maps:to_list(Group2Strategy)
     ).

+ 1 - 0
changes/ce/feat-13525.en.md

@@ -0,0 +1 @@
+Added new configuration item `shared_subscription_initial_sticky_pick` for choosing the strategy to use for the initial pick when `shared_subscription_strategy` is `sticky`.

+ 16 - 2
rel/i18n/emqx_schema.hocon

@@ -1035,7 +1035,14 @@ mqtt_shared_subscription_strategy.desc:
  - `round_robin`: Clients in a shared subscription group will consume messages in turn, and the progress of the loop is recorded independently in each publisher, so two adjacent messages from **different publishers** may be consumed by the same client in the subscription group;
  - `round_robin_per_group`: Clients in a shared subscription group will consume messages in turn, and the progress of the loop is recorded independently in each node, so two adjacent messages from **different nodes** may be consumed by the same client in the subscription group;
  - `local`: Randomly select a subscriber on the current node, if there are no subscribers on the current node, then randomly select within the cluster;
- - `sticky`: Continuously dispatch messages to the initially selected subscriber until their session ends;
+ - `sticky`: Continuously dispatch messages to the initially selected subscriber until their session ends. The initial selection is made based on `mqtt_shared_subscription_initial_sticky_pick`;
+ - `hash_clientid`: Hash the publisher's client ID to select a subscriber;
+ - `hash_topic`: Hash the publishing topic to select a subscriber."""
+
+mqtt_shared_subscription_initial_sticky_pick.desc:
+"""The strategy to use for the initial subscriber pick when shared_subscription_strategy is `sticky`.
+ - `random`: Randomly select the subscriber;
+ - `local`: Randomly select a subscriber on the current node, if there are no subscribers on the current node, then randomly select within the cluster;
  - `hash_clientid`: Hash the publisher's client ID to select a subscriber;
  - `hash_topic`: Hash the publishing topic to select a subscriber."""
 
@@ -1190,7 +1197,14 @@ shared_subscription_strategy_enum.desc:
  - `round_robin`: Clients in a shared subscription group will consume messages in turn, and the progress of the loop is recorded independently in each publisher, so two adjacent messages from **different publishers** may be consumed by the same client in the subscription group;
  - `round_robin_per_group`: Clients in a shared subscription group will consume messages in turn, and the progress of the loop is recorded independently in each node, so two adjacent messages from **different nodes** may be consumed by the same client in the subscription group;
  - `local`: Randomly select a subscriber on the current node, if there are no subscribers on the current node, then randomly select within the cluster;
- - `sticky`: Continuously dispatch messages to the initially selected subscriber until their session ends;
+ - `sticky`: Continuously dispatch messages to the initially selected subscriber until their session ends. The initial selection is made based on the value of `initial_sticky_pick`;
+ - `hash_clientid`: Hash the publisher's client ID to select a subscriber;
+ - `hash_topic`: Hash the publishing topic to select a subscriber."""
+
+shared_subscription_initial_sticky_pick_enum.desc:
+"""The strategy to use for the initial subscriber pick when the shared subscription strategy is `sticky`.
+ - `random`: Randomly select the subscriber;
+ - `local`: Randomly select a subscriber on the current node, if there are no subscribers on the current node, then randomly select within the cluster;
  - `hash_clientid`: Hash the publisher's client ID to select a subscriber;
  - `hash_topic`: Hash the publishing topic to select a subscriber."""