Просмотр исходного кода

feat(shared_sub): Per-group shared subscription and local strategy

Georgy Sychev 3 лет назад
Родитель
Сommit
0522361604

+ 21 - 1
apps/emqx/etc/emqx.conf

@@ -1023,15 +1023,35 @@ broker {
   ## Dispatch strategy for shared subscription
   ##
   ## @doc broker.shared_subscription_strategy
-  ## ValueType: random | round_robin | sticky | hash
+  ## ValueType: random | round_robin | sticky | hash | local
   ##   - random: dispatch the message to a random selected subscriber
   ##   - round_robin: select the subscribers in a round-robin manner
+  ##   - local: select random local subscriber otherwise select random cluster-wide
   ##   - sticky: always use the last selected subscriber to dispatch,
   ##     until the subscriber disconnects.
   ##   - hash: select the subscribers by the hash of clientIds
   ## Default: round_robin
   shared_subscription_strategy = round_robin
 
+  ## Per-group dispatch strategy for shared subscription
+  ##
+  ## @doc broker.shared_subscription_group.$group_name.strategy
+  ## ValueType: random | round_robin | sticky | hash | local
+  ##   - random: dispatch the message to a random selected subscriber
+  ##   - round_robin: select the subscribers in a round-robin manner
+  ##   - local: select the local subscriber otherwise random cluster-wide
+  ##   - sticky: always use the last selected subscriber to dispatch,
+  ##     until the subscriber disconnects.
+  ##   - hash: select the subscribers by the hash of clientIds
+  ## Default: round_robin
+  shared_subscription_group {
+
+    ## example_group {
+    ##   strategy = random
+    ## }
+
+  }
+
   ## Enable/disable shared dispatch acknowledgement for QoS1 and QoS2 messages
   ## This should allow messages to be dispatched to a different subscriber in
   ## the group in case the picked (based on shared_subscription_strategy) one # is offline

+ 37 - 0
apps/emqx/i18n/emqx_schema_i18n.conf

@@ -1085,6 +1085,43 @@ This should allow messages to be dispatched to a different subscriber in the gro
         }
     }
 
+    shared_subscription_group_strategy {
+        desc {
+            en: """Per group dispatch strategy for shared subscription.
+This config is a map from shared subscription group name to the strategy
+name. The group name should be of format `[A-Za-z0-9]`. i.e. no
+special characters are allowed.
+"""
+            cn: """设置共享订阅组为单位的分发策略。该配置是一个从组名到
+策略名的一个map,组名不得包含 `[A-Za-z0-9]` 之外的特殊字符。
+"""
+          }
+
+    }
+
+    shared_subscription_strategy_enum {
+        desc {
+            en: """Dispatch strategy for shared subscription.
+- `random`: dispatch the message to a random selected subscriber
+- `round_robin`: select the subscribers in a round-robin manner
+- `sticky`: always use the last selected subscriber to dispatch,
+until the subscriber disconnects.
+- `hash`: select the subscribers by the hash of `clientIds`
+- `local`: send to a random local subscriber. If local
+subscriber was not found, send to a random subscriber cluster-wide
+"""
+            cn: """共享订阅的分发策略名称。
+- `random`: 随机选择一个组内成员;
+- `round_robin`: 循环选择下一个成员;
+- `sticky`: 使用上一次选中的成员;
+- `hash`: 根据 ClientID 哈希映射到一个成员;
+- `local`: 随机分发到节点本地成成员,如果本地成员不存在,则随机分发
+到任意一个成员。
+"""
+
+          }
+      }
+
     broker_perf_route_lock_type {
         desc {
             en: """Performance tuning for subscribing/unsubscribing a wildcard topic.

+ 66 - 26
apps/emqx/src/emqx_config.erl

@@ -47,6 +47,8 @@
     find_raw/1,
     put/1,
     put/2,
+    force_put/2,
+    force_put/3,
     erase/1
 ]).
 
@@ -91,14 +93,6 @@
 -define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]).
 -define(LISTENER_CONF_PATH(TYPE, LISTENER, PATH), [listeners, TYPE, LISTENER | PATH]).
 
--define(ATOM_CONF_PATH(PATH, EXP, EXP_ON_FAIL),
-    try [atom(Key) || Key <- PATH] of
-        AtomKeyPath -> EXP
-    catch
-        error:badarg -> EXP_ON_FAIL
-    end
-).
-
 -export_type([
     update_request/0,
     raw_config/0,
@@ -166,10 +160,10 @@ find([]) ->
         Res -> {ok, Res}
     end;
 find(KeyPath) ->
-    ?ATOM_CONF_PATH(
+    atom_conf_path(
         KeyPath,
-        emqx_map_lib:deep_find(AtomKeyPath, get_root(KeyPath)),
-        {not_found, KeyPath}
+        fun(AtomKeyPath) -> emqx_map_lib:deep_find(AtomKeyPath, get_root(KeyPath)) end,
+        {return, {not_found, KeyPath}}
     ).
 
 -spec find_raw(emqx_map_lib:config_key_path()) ->
@@ -239,7 +233,29 @@ erase(RootName) ->
     persistent_term:erase(?PERSIS_KEY(?RAW_CONF, bin(RootName))).
 
 -spec put(emqx_map_lib:config_key_path(), term()) -> ok.
-put(KeyPath, Config) -> do_put(?CONF, KeyPath, Config).
+put(KeyPath, Config) ->
+    Putter = fun(Path, Map, Value) ->
+        emqx_map_lib:deep_put(Path, Map, Value)
+    end,
+    do_put(?CONF, Putter, KeyPath, Config).
+
+%% Puts value into configuration even if path doesn't exist
+%% For paths of non-existing atoms use force_put(KeyPath, Config, unsafe)
+-spec force_put(emqx_map_lib:config_key_path(), term()) -> ok.
+force_put(KeyPath, Config) ->
+    force_put(KeyPath, Config, safe).
+
+-spec force_put(emqx_map_lib:config_key_path(), term(), safe | unsafe) -> ok.
+force_put(KeyPath0, Config, Safety) ->
+    KeyPath =
+        case Safety of
+            safe -> KeyPath0;
+            unsafe -> [unsafe_atom(Key) || Key <- KeyPath0]
+        end,
+    Putter = fun(Path, Map, Value) ->
+        emqx_map_lib:deep_force_put(Path, Map, Value)
+    end,
+    do_put(?CONF, Putter, KeyPath, Config).
 
 -spec get_default_value(emqx_map_lib:config_key_path()) -> {ok, term()} | {error, term()}.
 get_default_value([RootName | _] = KeyPath) ->
@@ -277,7 +293,11 @@ put_raw(Config) ->
     ).
 
 -spec put_raw(emqx_map_lib:config_key_path(), term()) -> ok.
-put_raw(KeyPath, Config) -> do_put(?RAW_CONF, KeyPath, Config).
+put_raw(KeyPath, Config) ->
+    Putter = fun(Path, Map, Value) ->
+        emqx_map_lib:deep_force_put(Path, Map, Value)
+    end,
+    do_put(?RAW_CONF, Putter, KeyPath, Config).
 
 %%============================================================================
 %% Load/Update configs From/To files
@@ -541,41 +561,48 @@ do_get(Type, [RootName | KeyPath], Default) ->
     RootV = persistent_term:get(?PERSIS_KEY(Type, bin(RootName)), #{}),
     do_deep_get(Type, KeyPath, RootV, Default).
 
-do_put(Type, [], DeepValue) ->
+do_put(Type, Putter, [], DeepValue) ->
     maps:fold(
         fun(RootName, Value, _Res) ->
-            do_put(Type, [RootName], Value)
+            do_put(Type, Putter, [RootName], Value)
         end,
         ok,
         DeepValue
     );
-do_put(Type, [RootName | KeyPath], DeepValue) ->
+do_put(Type, Putter, [RootName | KeyPath], DeepValue) ->
     OldValue = do_get(Type, [RootName], #{}),
-    NewValue = do_deep_put(Type, KeyPath, OldValue, DeepValue),
+    NewValue = do_deep_put(Type, Putter, KeyPath, OldValue, DeepValue),
     persistent_term:put(?PERSIS_KEY(Type, bin(RootName)), NewValue).
 
 do_deep_get(?CONF, KeyPath, Map, Default) ->
-    ?ATOM_CONF_PATH(
+    atom_conf_path(
         KeyPath,
-        emqx_map_lib:deep_get(AtomKeyPath, Map, Default),
-        Default
+        fun(AtomKeyPath) -> emqx_map_lib:deep_get(AtomKeyPath, Map, Default) end,
+        {return, Default}
     );
 do_deep_get(?RAW_CONF, KeyPath, Map, Default) ->
     emqx_map_lib:deep_get([bin(Key) || Key <- KeyPath], Map, Default).
 
-do_deep_put(?CONF, KeyPath, Map, Value) ->
-    ?ATOM_CONF_PATH(
+do_deep_put(?CONF, Putter, KeyPath, Map, Value) ->
+    atom_conf_path(
         KeyPath,
-        emqx_map_lib:deep_put(AtomKeyPath, Map, Value),
-        error({not_found, KeyPath})
+        fun(AtomKeyPath) -> Putter(AtomKeyPath, Map, Value) end,
+        {raise_error, {not_found, KeyPath}}
     );
-do_deep_put(?RAW_CONF, KeyPath, Map, Value) ->
-    emqx_map_lib:deep_put([bin(Key) || Key <- KeyPath], Map, Value).
+do_deep_put(?RAW_CONF, Putter, KeyPath, Map, Value) ->
+    Putter([bin(Key) || Key <- KeyPath], Map, Value).
 
 root_names_from_conf(RawConf) ->
     Keys = maps:keys(RawConf),
     [Name || Name <- get_root_names(), lists:member(Name, Keys)].
 
+unsafe_atom(Bin) when is_binary(Bin) ->
+    binary_to_atom(Bin, utf8);
+unsafe_atom(Str) when is_list(Str) ->
+    list_to_atom(Str);
+unsafe_atom(Atom) when is_atom(Atom) ->
+    Atom.
+
 atom(Bin) when is_binary(Bin) ->
     binary_to_existing_atom(Bin, utf8);
 atom(Str) when is_list(Str) ->
@@ -591,3 +618,16 @@ conf_key(?CONF, RootName) ->
     atom(RootName);
 conf_key(?RAW_CONF, RootName) ->
     bin(RootName).
+
+atom_conf_path(Path, ExpFun, OnFail) ->
+    try [atom(Key) || Key <- Path] of
+        AtomKeyPath -> ExpFun(AtomKeyPath)
+    catch
+        error:badarg ->
+            case OnFail of
+                {return, Val} ->
+                    Val;
+                {raise_error, Err} ->
+                    error(Err)
+            end
+    end.

+ 21 - 0
apps/emqx/src/emqx_map_lib.erl

@@ -20,6 +20,7 @@
     deep_get/3,
     deep_find/2,
     deep_put/3,
+    deep_force_put/3,
     deep_remove/2,
     deep_merge/2,
     safe_atom_key_map/1,
@@ -73,6 +74,26 @@ deep_put([Key | KeyPath], Map, Data) ->
     SubMap = maps:get(Key, Map, #{}),
     Map#{Key => deep_put(KeyPath, SubMap, Data)}.
 
+%% Like deep_put, but ensures that the key path is present.
+%% If key path is not present in map, creates the keys, until it's present
+%% deep_force_put([x, y, z], #{a => 1}, 0) -> #{a => 1, x => #{y => #{z => 0}}}
+-spec deep_force_put(config_key_path(), map(), term()) -> map().
+deep_force_put([], _Map, Data) ->
+    Data;
+deep_force_put([Key | KeyPath] = FullPath, Map, Data) ->
+    case Map of
+        #{Key := InnerValue} ->
+            Map#{Key => deep_force_put(KeyPath, InnerValue, Data)};
+        #{} ->
+            maps:put(Key, path_to_map(KeyPath, Data), Map);
+        _ ->
+            path_to_map(FullPath, Data)
+    end.
+
+-spec path_to_map(config_key_path(), term()) -> map().
+path_to_map([], Data) -> Data;
+path_to_map([Key | Tail], Data) -> #{Key => path_to_map(Tail, Data)}.
+
 -spec deep_remove(config_key_path(), map()) -> map().
 deep_remove([], Map) ->
     Map;

+ 18 - 1
apps/emqx/src/emqx_schema.erl

@@ -1137,7 +1137,7 @@ fields("broker") ->
             )},
         {"shared_subscription_strategy",
             sc(
-                hoconsc:enum([random, round_robin, sticky, hash_topic, hash_clientid]),
+                hoconsc:enum([random, round_robin, sticky, local, hash_topic, hash_clientid]),
                 #{
                     default => round_robin,
                     desc => ?DESC(broker_shared_subscription_strategy)
@@ -1163,6 +1163,21 @@ fields("broker") ->
             sc(
                 ref("broker_perf"),
                 #{}
+            )},
+        {"shared_subscription_group",
+            sc(
+                map(name, ref("shared_subscription_group")),
+                #{desc => ?DESC(shared_subscription_group_strategy)}
+            )}
+    ];
+fields("shared_subscription_group") ->
+    [
+        {"strategy",
+            sc(
+                hoconsc:enum([random, round_robin, sticky, local, hash_topic, hash_clientid]),
+                #{
+                    desc => ?DESC(shared_subscription_strategy_enum)
+                }
             )}
     ];
 fields("broker_perf") ->
@@ -1712,6 +1727,8 @@ desc("alarm") ->
     "Settings for the alarms.";
 desc("trace") ->
     "Real-time filtering logs for the ClientID or Topic or IP for debugging.";
+desc("shared_subscription_group") ->
+    "Per group dispatch strategy for shared subscription";
 desc(_) ->
     undefined.
 

+ 21 - 5
apps/emqx/src/emqx_shared_sub.erl

@@ -46,7 +46,12 @@
 ]).
 
 %% for testing
--export([subscribers/2]).
+-ifdef(TEST).
+-export([
+    subscribers/2,
+    strategy/1
+]).
+-endif.
 
 %% gen_server callbacks
 -export([
@@ -64,6 +69,7 @@
     random
     | round_robin
     | sticky
+    | local
     %% same as hash_clientid, backward compatible
     | hash
     | hash_clientid
@@ -122,7 +128,7 @@ dispatch(Group, Topic, Delivery) ->
 
 dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
     #message{from = ClientId, topic = SourceTopic} = Msg,
-    case pick(strategy(), ClientId, SourceTopic, Group, Topic, FailedSubs) of
+    case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of
         false ->
             {error, no_subscribers};
         {Type, SubPid} ->
@@ -135,9 +141,12 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
             end
     end.
 
--spec strategy() -> strategy().
-strategy() ->
-    emqx:get_config([broker, shared_subscription_strategy]).
+-spec strategy(emqx_topic:group()) -> strategy().
+strategy(Group) ->
+    case emqx:get_config([broker, shared_subscription_group, Group, strategy], undefined) of
+        undefined -> emqx:get_config([broker, shared_subscription_strategy]);
+        Strategy -> Strategy
+    end.
 
 -spec ack_enabled() -> boolean().
 ack_enabled() ->
@@ -270,6 +279,13 @@ do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
 
 pick_subscriber(_Group, _Topic, _Strategy, _ClientId, _SourceTopic, [Sub]) ->
     Sub;
+pick_subscriber(Group, Topic, local, ClientId, SourceTopic, Subs) ->
+    case lists:filter(fun(Pid) -> erlang:node(Pid) =:= node() end, Subs) of
+        [_ | _] = LocalSubs ->
+            pick_subscriber(Group, Topic, random, ClientId, SourceTopic, LocalSubs);
+        [] ->
+            pick_subscriber(Group, Topic, random, ClientId, SourceTopic, Subs)
+    end;
 pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs) ->
     Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, length(Subs)),
     lists:nth(Nth, Subs).

+ 206 - 39
apps/emqx/test/emqx_shared_sub_SUITE.erl

@@ -37,6 +37,7 @@
 all() -> emqx_common_test_helpers:all(?SUITE).
 
 init_per_suite(Config) ->
+    net_kernel:start(['master@127.0.0.1', longnames]),
     emqx_common_test_helpers:boot_modules(all),
     emqx_common_test_helpers:start_apps([]),
     Config.
@@ -187,19 +188,24 @@ t_no_connection_nack(_) ->
     ok.
 
 t_random(_) ->
+    ok = ensure_config(random, true),
     test_two_messages(random).
 
 t_round_robin(_) ->
+    ok = ensure_config(round_robin, true),
     test_two_messages(round_robin).
 
 t_sticky(_) ->
+    ok = ensure_config(sticky, true),
     test_two_messages(sticky).
 
 t_hash(_) ->
-    test_two_messages(hash, false).
+    ok = ensure_config(hash, false),
+    test_two_messages(hash).
 
 t_hash_clinetid(_) ->
-    test_two_messages(hash_clientid, false).
+    ok = ensure_config(hash_clientid, false),
+    test_two_messages(hash_clientid).
 
 t_hash_topic(_) ->
     ok = ensure_config(hash_topic, false),
@@ -271,53 +277,39 @@ t_not_so_sticky(_) ->
     ok.
 
 test_two_messages(Strategy) ->
-    test_two_messages(Strategy, _WithAck = true).
+    test_two_messages(Strategy, <<"group1">>).
 
-test_two_messages(Strategy, WithAck) ->
-    ok = ensure_config(Strategy, WithAck),
+test_two_messages(Strategy, Group) ->
     Topic = <<"foo/bar">>,
     ClientId1 = <<"ClientId1">>,
     ClientId2 = <<"ClientId2">>,
     {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
-    {ok, _} = emqtt:connect(ConnPid1),
     {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
+    {ok, _} = emqtt:connect(ConnPid1),
     {ok, _} = emqtt:connect(ConnPid2),
 
+    emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/", Topic/binary>>, 0}),
+    emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/", Topic/binary>>, 0}),
+
     Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
     Message2 = emqx_message:make(ClientId1, 0, Topic, <<"hello2">>),
-    emqtt:subscribe(ConnPid1, {<<"$share/group1/foo/bar">>, 0}),
-    emqtt:subscribe(ConnPid2, {<<"$share/group1/foo/bar">>, 0}),
     ct:sleep(100),
+
     emqx:publish(Message1),
-    Me = self(),
-    WaitF = fun(ExpectedPayload) ->
-        case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of
-            {true, Pid} ->
-                Me ! {subscriber, Pid},
-                true;
-            Other ->
-                Other
-        end
-    end,
-    WaitF(<<"hello1">>),
-    UsedSubPid1 =
-        receive
-            {subscriber, P1} -> P1
-        end,
-    emqx_broker:publish(Message2),
-    WaitF(<<"hello2">>),
-    UsedSubPid2 =
-        receive
-            {subscriber, P2} -> P2
-        end,
+    {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
+
+    emqx:publish(Message2),
+    {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
+
+    emqtt:stop(ConnPid1),
+    emqtt:stop(ConnPid2),
+
     case Strategy of
-        sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2);
-        round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2);
-        hash -> ?assert(UsedSubPid1 =:= UsedSubPid2);
+        sticky -> ?assertEqual(UsedSubPid1, UsedSubPid2);
+        round_robin -> ?assertNotEqual(UsedSubPid1, UsedSubPid2);
+        hash -> ?assertEqual(UsedSubPid1, UsedSubPid2);
         _ -> ok
     end,
-    emqtt:stop(ConnPid1),
-    emqtt:stop(ConnPid2),
     ok.
 
 last_message(ExpectedPayload, Pids) ->
@@ -325,7 +317,7 @@ last_message(ExpectedPayload, Pids) ->
         {publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
             ct:pal("~p ====== ~p", [Pids, Pid]),
             {true, Pid}
-    after 100 ->
+    after 500 ->
         <<"not yet?">>
     end.
 
@@ -353,6 +345,101 @@ t_uncovered_func(_) ->
     ignored = emqx_shared_sub ! ignored,
     {mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}.
 
+t_per_group_config(_) ->
+    ok = ensure_group_config(#{
+        <<"local_group">> => local,
+        <<"round_robin_group">> => round_robin,
+        <<"sticky_group">> => sticky
+    }),
+    %% Each test is repeated 4 times because random strategy may technically pass the test
+    %% so we run 8 tests to make random pass in only 1/256 runs
+
+    test_two_messages(sticky, <<"sticky_group">>),
+    test_two_messages(sticky, <<"sticky_group">>),
+    test_two_messages(round_robin, <<"round_robin_group">>),
+    test_two_messages(round_robin, <<"round_robin_group">>),
+    test_two_messages(sticky, <<"sticky_group">>),
+    test_two_messages(sticky, <<"sticky_group">>),
+    test_two_messages(round_robin, <<"round_robin_group">>),
+    test_two_messages(round_robin, <<"round_robin_group">>).
+
+t_local(_) ->
+    GroupConfig = #{
+        <<"local_group">> => local,
+        <<"round_robin_group">> => round_robin,
+        <<"sticky_group">> => sticky
+    },
+
+    Node = start_slave('local_shared_sub_testtesttest', 21999),
+    ok = ensure_group_config(GroupConfig),
+    ok = ensure_group_config(Node, GroupConfig),
+
+    Topic = <<"local_foo/bar">>,
+    ClientId1 = <<"ClientId1">>,
+    ClientId2 = <<"ClientId2">>,
+
+    {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
+    {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {port, 21999}]),
+
+    {ok, _} = emqtt:connect(ConnPid1),
+    {ok, _} = emqtt:connect(ConnPid2),
+
+    emqtt:subscribe(ConnPid1, {<<"$share/local_group/", Topic/binary>>, 0}),
+    emqtt:subscribe(ConnPid2, {<<"$share/local_group/", Topic/binary>>, 0}),
+
+    ct:sleep(100),
+
+    Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
+    Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
+
+    emqx:publish(Message1),
+    {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
+
+    rpc:call(Node, emqx, publish, [Message2]),
+    {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
+    RemoteLocalGroupStrategy = rpc:call(Node, emqx_shared_sub, strategy, [<<"local_group">>]),
+
+    emqtt:stop(ConnPid1),
+    emqtt:stop(ConnPid2),
+    stop_slave(Node),
+
+    ?assertEqual(local, emqx_shared_sub:strategy(<<"local_group">>)),
+    ?assertEqual(local, RemoteLocalGroupStrategy),
+
+    ?assertNotEqual(UsedSubPid1, UsedSubPid2),
+    ok.
+
+t_local_fallback(_) ->
+    ok = ensure_group_config(#{
+        <<"local_group">> => local,
+        <<"round_robin_group">> => round_robin,
+        <<"sticky_group">> => sticky
+    }),
+
+    Topic = <<"local_foo/bar">>,
+    ClientId1 = <<"ClientId1">>,
+    ClientId2 = <<"ClientId2">>,
+    Node = start_slave('local_fallback_shared_sub_test', 11888),
+
+    {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
+    {ok, _} = emqtt:connect(ConnPid1),
+    Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
+    Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
+
+    emqtt:subscribe(ConnPid1, {<<"$share/local_group/", Topic/binary>>, 0}),
+
+    emqx:publish(Message1),
+    {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),
+
+    rpc:call(Node, emqx, publish, [Message2]),
+    {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1]),
+
+    emqtt:stop(ConnPid1),
+    stop_slave(Node),
+
+    ?assertEqual(UsedSubPid1, UsedSubPid2),
+    ok.
+
 %%--------------------------------------------------------------------
 %% help functions
 %%--------------------------------------------------------------------
@@ -365,6 +452,29 @@ ensure_config(Strategy, AckEnabled) ->
     emqx_config:put([broker, shared_dispatch_ack_enabled], AckEnabled),
     ok.
 
+ensure_group_config(Group2Strategy) ->
+    lists:foreach(
+        fun({Group, Strategy}) ->
+            emqx_config:force_put(
+                [broker, shared_subscription_group, Group, strategy], Strategy, unsafe
+            )
+        end,
+        maps:to_list(Group2Strategy)
+    ).
+
+ensure_group_config(Node, Group2Strategy) ->
+    lists:foreach(
+        fun({Group, Strategy}) ->
+            rpc:call(
+                Node,
+                emqx_config,
+                force_put,
+                [[broker, shared_subscription_group, Group, strategy], Strategy, unsafe]
+            )
+        end,
+        maps:to_list(Group2Strategy)
+    ).
+
 subscribed(Group, Topic, Pid) ->
     lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)).
 
@@ -376,10 +486,67 @@ recv_msgs(0, Msgs) ->
 recv_msgs(Count, Msgs) ->
     receive
         {publish, Msg} ->
-            recv_msgs(Count - 1, [Msg | Msgs]);
-        %%TODO:: remove the branch?
-        _Other ->
-            recv_msgs(Count, Msgs)
+            recv_msgs(Count - 1, [Msg | Msgs])
     after 100 ->
         Msgs
     end.
+
+start_slave(Name, Port) ->
+    {ok, Node} = ct_slave:start(
+        list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
+        [
+            {kill_if_fail, true},
+            {monitor_master, true},
+            {init_timeout, 10000},
+            {startup_timeout, 10000},
+            {erl_flags, ebin_path()}
+        ]
+    ),
+
+    pong = net_adm:ping(Node),
+    setup_node(Node, Port),
+    Node.
+
+stop_slave(Node) ->
+    rpc:call(Node, mria, leave, []),
+    ct_slave:stop(Node).
+
+host() ->
+    [_, Host] = string:tokens(atom_to_list(node()), "@"),
+    Host.
+
+ebin_path() ->
+    string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
+
+is_lib(Path) ->
+    string:prefix(Path, code:lib_dir()) =:= nomatch.
+
+setup_node(Node, Port) ->
+    EnvHandler =
+        fun(_) ->
+            %% We load configuration, and than set the special enviroment variable
+            %% which says that emqx shouldn't load configuration at startup
+            emqx_config:init_load(emqx_schema),
+            application:set_env(emqx, init_config_load_done, true),
+
+            ok = emqx_config:put([listeners, tcp, default, bind], {{127, 0, 0, 1}, Port}),
+            ok = emqx_config:put([listeners, ssl, default, bind], {{127, 0, 0, 1}, Port + 1}),
+            ok = emqx_config:put([listeners, quic, default, bind], {{127, 0, 0, 1}, Port + 2}),
+            ok = emqx_config:put([listeners, ws, default, bind], {{127, 0, 0, 1}, Port + 3}),
+            ok = emqx_config:put([listeners, wss, default, bind], {{127, 0, 0, 1}, Port + 4}),
+            ok
+        end,
+
+    %% Load env before doing anything
+    [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx, ekka, mria]],
+
+    %% Needs to be set explicitly because ekka:start() (which calls `gen` is called without Handler
+    %% in emqx_common_test_helpers:start_apps(...)
+    ok = rpc:call(Node, application, set_env, [gen_rpc, tcp_server_port, Port - 1]),
+    ok = rpc:call(Node, application, set_env, [gen_rpc, port_discovery, manual]),
+
+    %% Here we start the node and make it join the cluster
+    ok = rpc:call(Node, emqx_common_test_helpers, start_apps, [[], EnvHandler]),
+    rpc:call(Node, mria, join, [node()]),
+
+    ok.

+ 3 - 3
apps/emqx_machine/src/emqx_machine.erl

@@ -92,10 +92,10 @@ update_vips() ->
 
 configure_shard_transports() ->
     ShardTransports = application:get_env(emqx_machine, custom_shard_transports, #{}),
-    maps:foreach(
-        fun(ShardBin, Transport) ->
+    lists:foreach(
+        fun({ShardBin, Transport}) ->
             ShardName = binary_to_existing_atom(ShardBin),
             mria_config:set_shard_transport(ShardName, Transport)
         end,
-        ShardTransports
+        maps:to_list(ShardTransports)
     ).