Преглед изворни кода

Merge pull request #1823 from emqx/add-shared-sub-strategy

Add new shared subscription dispatch strategy
turtleDeng пре 7 година
родитељ
комит
925e98a3e5
6 измењених фајлова са 286 додато и 50 уклоњено
  1. 8 8
      Makefile
  2. 9 2
      priv/emqx.schema
  3. 53 13
      src/emqx_shared_sub.erl
  4. 17 26
      test/emqx_mock_client.erl
  5. 1 1
      test/emqx_session_SUITE.erl
  6. 198 0
      test/emqx_shared_sub_SUITE.erl

+ 8 - 8
Makefile

@@ -39,7 +39,7 @@ CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access
 			emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \
 			emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
 			emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \
-		 	emqx_mountpoint emqx_listeners emqx_protocol emqx_pool
+		 	emqx_mountpoint emqx_listeners emqx_protocol emqx_pool emqx_shared_sub
 
 CT_NODE_NAME = emqxct@127.0.0.1
 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
@@ -84,23 +84,23 @@ rebar-cover:
 coveralls:
 	@rebar3 coveralls send
 
-cuttlefish: deps
-	@mv ./deps/cuttlefish/cuttlefish ./cuttlefish
 
-rebar-cuttlefish: rebar-deps
-	@make -C _build/default/lib/cuttlefish
-	@mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish
+cuttlefish: rebar-deps
+	@if [ ! -f cuttlefish ]; then \
+		make -C _build/default/lib/cuttlefish; \
+		mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish; \
+	fi
 
 rebar-deps:
 	@rebar3 get-deps
 
-rebar-eunit: rebar-cuttlefish
+rebar-eunit: cuttlefish
 	@rebar3 eunit
 
 rebar-compile:
 	@rebar3 compile
 
-rebar-ct: rebar-cuttlefish app.config
+rebar-ct: cuttlefish app.config
 	@rebar3 as test compile
 	@ln -s -f '../../../../etc' _build/test/lib/emqx/
 	@ln -s -f '../../../../data' _build/test/lib/emqx/

+ 9 - 2
priv/emqx.schema

@@ -1734,9 +1734,16 @@ end}.
   {datatype, {enum, [local,one,quorum,all]}}
 ]}.
 
+%% @doc Shared Subscription Dispatch Strategy.
 {mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [
-  {default, random},
-  {datatype, {enum, [random, round_robbin, hash]}}
+  {default, round_robbin},
+  {datatype,
+   {enum,
+    [random, %% randomly pick a subscriber
+     round_robbin, %% round robin alive subscribers one message after another
+     sticky, %% pick a random subscriber and stick to it
+     hash %% hash client ID to a group member
+    ]}}
 ]}.
 
 {mapping, "broker.route_batch_clean", "emqx.route_batch_clean", [

+ 53 - 13
src/emqx_shared_sub.erl

@@ -26,7 +26,6 @@
 
 -export([start_link/0]).
 
--export([strategy/0]).
 -export([subscribe/3, unsubscribe/3]).
 -export([dispatch/3]).
 
@@ -36,6 +35,7 @@
 
 -define(SERVER, ?MODULE).
 -define(TAB, emqx_shared_subscription).
+-define(ALIVE_SUBS, emqx_alive_shared_subscribers).
 
 -record(state, {pmon}).
 -record(emqx_shared_subscription, {group, topic, subpid}).
@@ -62,9 +62,9 @@ mnesia(copy) ->
 start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
--spec(strategy() -> round_robin | random | hash).
+-spec(strategy() -> random | round_robin | stiky | hash).
 strategy() ->
-    emqx_config:get_env(shared_subscription_strategy, random).
+    emqx_config:get_env(shared_subscription_strategy, round_robin).
 
 subscribe(undefined, _Topic, _SubPid) ->
     ok;
@@ -80,23 +80,56 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
 record(Group, Topic, SubPid) ->
     #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
 
-%% TODO: dispatch strategy, ensure the delivery...
 dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}) ->
-    case pick(subscribers(Group, Topic)) of
+    #message{from = ClientId} = Msg,
+    case pick(strategy(), ClientId, Group, Topic) of
         false  -> Delivery;
         SubPid -> SubPid ! {dispatch, Topic, Msg},
                   Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]}
     end.
 
-pick([]) ->
-    false;
-pick([SubPid]) ->
-    SubPid;
-pick(SubPids) ->
-    lists:nth(rand:uniform(length(SubPids)), SubPids).
+pick(sticky, ClientId, Group, Topic) ->
+    Sub0 = erlang:get(shared_sub_sticky),
+    case is_sub_alive(Sub0) of
+        true ->
+            %% the old subscriber is still alive
+            %% keep using it for sticky strategy
+            Sub0;
+        false ->
+            %% randomly pick one for the first message
+            Sub = do_pick(random, ClientId, Group, Topic),
+            %% stick to whatever pick result
+            erlang:put(shared_sub_sticky, Sub),
+            Sub
+    end;
+pick(Strategy, ClientId, Group, Topic) ->
+    do_pick(Strategy, ClientId, Group, Topic).
+
+do_pick(Strategy, ClientId, Group, Topic) ->
+    All = subscribers(Group, Topic),
+    pick_subscriber(Strategy, ClientId, All).
+
+pick_subscriber(_, _ClientId, []) -> false;
+pick_subscriber(_, _ClientId, [Sub]) -> Sub;
+pick_subscriber(Strategy, ClientId, Subs) ->
+    Nth = do_pick_subscriber(Strategy, ClientId, length(Subs)),
+    lists:nth(Nth, Subs).
+
+do_pick_subscriber(random, _ClientId, Count) ->
+    rand:uniform(Count);
+do_pick_subscriber(hash, ClientId, Count) ->
+    1 + erlang:phash2(ClientId) rem Count;
+do_pick_subscriber(round_robin, _ClientId, Count) ->
+    Rem = case erlang:get(shared_sub_round_robin) of
+              undefined -> 0;
+              N -> (N + 1) rem Count
+          end,
+    _ = erlang:put(shared_sub_round_robin, Rem),
+    Rem + 1.
 
 subscribers(Group, Topic) ->
     ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
+
 %%-----------------------------------------------------------------------------
 %% gen_server callbacks
 %%-----------------------------------------------------------------------------
@@ -104,6 +137,7 @@ subscribers(Group, Topic) ->
 init([]) ->
     {atomic, PMon} = mnesia:transaction(fun init_monitors/0),
     mnesia:subscribe({table, ?TAB, simple}),
+    ets:new(?ALIVE_SUBS, [named_table, {read_concurrency, true}, protected]),
     {ok, update_stats(#state{pmon = PMon})}.
 
 init_monitors() ->
@@ -117,8 +151,9 @@ handle_call(Req, _From, State) ->
     {reply, ignored, State}.
 
 handle_cast({monitor, SubPid}, State= #state{pmon = PMon}) ->
-    {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
-
+    NewPmon = emqx_pmon:monitor(SubPid, PMon),
+    ets:insert(?ALIVE_SUBS, {SubPid}),
+    {noreply, update_stats(State#state{pmon = NewPmon})};
 handle_cast(Msg, State) ->
     emqx_logger:error("[SharedSub] unexpected cast: ~p", [Msg]),
     {noreply, State}.
@@ -154,6 +189,7 @@ code_change(_OldVsn, State, _Extra) ->
 %%--------------------------------------------------------------------
 
 cleanup_down(SubPid) ->
+    ets:delete(?ALIVE_SUBS, SubPid),
     lists:foreach(
         fun(Record) ->
             mnesia:dirty_delete_object(?TAB, Record)
@@ -162,3 +198,7 @@ cleanup_down(SubPid) ->
 update_stats(State) ->
     emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), State.
 
+%% erlang:is_process_alive/1 is expensive
+%% and does not work with remote pids
+is_sub_alive(Sub) -> [] =/= ets:lookup(?ALIVE_SUBS, Sub).
+

+ 17 - 26
test/emqx_mock_client.erl

@@ -16,14 +16,12 @@
 
 -behaviour(gen_server).
 
--export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0]).
+-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/1]).
 
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
--record(state, {clean_start, client_id, client_pid}).
-
--define(TAB, messages).
+-record(state, {clean_start, client_id, client_pid, last_msg}).
 
 start_link(ClientId) ->
     gen_server:start_link(?MODULE, [ClientId], []).
@@ -37,19 +35,14 @@ close_session(ClientPid, SessPid) ->
 stop(CPid) ->
     gen_server:call(CPid, stop).
 
-get_last_message() ->
-    [{last_message, Msg}] = ets:lookup(?TAB, last_message),
-    Msg.
+get_last_message(Pid) ->
+    gen_server:call(Pid, get_last_message).
 
 init([ClientId]) ->
-    Result = lists:member(?TAB, ets:all()),
-    if Result == false -> 
-        ets:new(?TAB, [set, named_table, public]);
-       true -> ok
-    end,
-    {ok, 
-     #state{clean_start = true,
-            client_id = ClientId}
+    {ok, #state{clean_start = true,
+                client_id = ClientId,
+                last_msg = undefined
+               }
     }.
 
 handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
@@ -61,28 +54,26 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
                expiry_interval  => 0
              },
     {ok, SessPid} = emqx_sm:open_session(Attrs),
-    {reply, {ok, SessPid}, State#state{
-                             clean_start = true,
-                             client_id = ClientId, 
-                             client_pid = ClientPid
-                            }};
-
+    {reply, {ok, SessPid},
+     State#state{clean_start = true,
+                 client_id = ClientId,
+                 client_pid = ClientPid
+                }};
 handle_call({stop_session, SessPid}, _From, State) ->
     emqx_sm:close_session(SessPid),
     {stop, normal, ok, State};
-
+handle_call(get_last_message, _From, #state{last_msg = Msg} = State) ->
+    {reply, Msg, State};
 handle_call(stop, _From, State) ->
     {stop, normal, ok, State};
-
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
 
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info({_, Msg}, State) ->
-    ets:insert(?TAB, {last_message, Msg}),
-    {noreply, State};
+handle_info({deliver, Msg}, State) ->
+    {noreply, State#state{last_msg = Msg}};
 handle_info(_Info, State) ->
     {noreply, State}.
 

+ 1 - 1
test/emqx_session_SUITE.erl

@@ -40,7 +40,7 @@ t_session_all(_) ->
     [{<<"topic">>, _}] = emqx:subscriptions({SPid, <<"ClientId">>}),
     emqx_session:publish(SPid, 1, Message1),
     timer:sleep(200),
-    {publish, 1, _} = emqx_mock_client:get_last_message(),
+    {publish, 1, _} = emqx_mock_client:get_last_message(ConnPid),
     emqx_session:puback(SPid, 2),
     emqx_session:puback(SPid, 3, reasoncode),
     emqx_session:pubrec(SPid, 4),

+ 198 - 0
test/emqx_shared_sub_SUITE.erl

@@ -0,0 +1,198 @@
+
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_shared_sub_SUITE).
+
+-export([all/0, init_per_suite/1, end_per_suite/1]).
+-export([t_random_basic/1, t_random/1, t_round_robin/1, t_sticky/1, t_hash/1, t_not_so_sticky/1]).
+
+-include("emqx.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(wait(For, Timeout), wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
+
+all() -> [t_random_basic, t_random, t_round_robin, t_sticky, t_hash, t_not_so_sticky].
+
+init_per_suite(Config) ->
+    emqx_ct_broker_helpers:run_setup_steps(),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_broker_helpers:run_teardown_steps().
+
+t_random_basic(_) ->
+    application:set_env(?APPLICATION, shared_subscription_strategy, random),
+    ClientId = <<"ClientId">>,
+    {ok, ConnPid} = emqx_mock_client:start_link(ClientId),
+    {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),
+    Message1 = emqx_message:make(<<"ClientId">>, 2, <<"foo">>, <<"hello">>),
+    emqx_session:subscribe(SPid, [{<<"foo">>, #{qos => 2, share => <<"group1">>}}]),
+    %% wait for the subscription to show up
+    ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid) =:= [{SPid}], 1000),
+    emqx_session:publish(SPid, 1, Message1),
+    ?wait(case emqx_mock_client:get_last_message(ConnPid) of
+              {publish, 1, _} -> true;
+              Other -> Other
+          end, 1000),
+    emqx_session:puback(SPid, 2),
+    emqx_session:puback(SPid, 3, reasoncode),
+    emqx_session:pubrec(SPid, 4),
+    emqx_session:pubrec(SPid, 5, reasoncode),
+    emqx_session:pubrel(SPid, 6, reasoncode),
+    emqx_session:pubcomp(SPid, 7, reasoncode),
+    emqx_mock_client:close_session(ConnPid, SPid),
+    ok.
+
+t_random(_) ->
+    test_two_messages(random).
+
+t_round_robin(_) ->
+    test_two_messages(round_robin).
+
+t_sticky(_) ->
+    test_two_messages(sticky).
+
+t_hash(_) ->
+    test_two_messages(hash).
+
+%% if the original subscriber dies, change to another one alive
+t_not_so_sticky(_) ->
+    application:set_env(?APPLICATION, shared_subscription_strategy, sticky),
+    ClientId1 = <<"ClientId1">>,
+    ClientId2 = <<"ClientId2">>,
+    {ok, ConnPid1} = emqx_mock_client:start_link(ClientId1),
+    {ok, ConnPid2} = emqx_mock_client:start_link(ClientId2),
+    {ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal),
+    {ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal),
+    Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>),
+    Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>),
+    emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]),
+    %% wait for the subscription to show up
+    ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}], 1000),
+    emqx_session:publish(SPid1, 1, Message1),
+    ?wait(case emqx_mock_client:get_last_message(ConnPid1) of
+              {publish, _, #message{payload = <<"hello1">>}} -> true;
+              Other -> Other
+          end, 1000),
+    emqx_mock_client:close_session(ConnPid1, SPid1),
+    ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [], 1000),
+    emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]),
+    ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000),
+    emqx_session:publish(SPid2, 2, Message2),
+    ?wait(case emqx_mock_client:get_last_message(ConnPid2) of
+              {publish, _, #message{payload = <<"hello2">>}} -> true;
+              Other -> Other
+          end, 1000),
+    emqx_mock_client:close_session(ConnPid2, SPid2),
+    ?wait(ets:tab2list(emqx_alive_shared_subscribers) =:= [], 1000),
+    ok.
+
+test_two_messages(Strategy) ->
+    application:set_env(?APPLICATION, shared_subscription_strategy, Strategy),
+    ClientId1 = <<"ClientId1">>,
+    ClientId2 = <<"ClientId2">>,
+    {ok, ConnPid1} = emqx_mock_client:start_link(ClientId1),
+    {ok, ConnPid2} = emqx_mock_client:start_link(ClientId2),
+    {ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal),
+    {ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal),
+    Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>),
+    Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>),
+    emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]),
+    emqx_session:subscribe(SPid2, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]),
+    %% wait for the subscription to show up
+    ?wait(ets:lookup(emqx_alive_shared_subscribers, SPid1) =:= [{SPid1}] andalso
+          ets:lookup(emqx_alive_shared_subscribers, SPid2) =:= [{SPid2}], 1000),
+    emqx_session:publish(SPid1, 1, Message1),
+    Me = self(),
+    WaitF = fun(ExpectedPayload) ->
+                    case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of
+                        {true, Pid} ->
+                            Me ! {subscriber, Pid},
+                            true;
+                        Other ->
+                            Other
+                    end
+            end,
+    ?wait(WaitF(<<"hello1">>), 2000),
+    UsedSubPid1 = receive {subscriber, P1} -> P1 end,
+    %% publish both messages with SPid1
+    emqx_session:publish(SPid1, 2, Message2),
+    ?wait(WaitF(<<"hello2">>), 2000),
+    UsedSubPid2 = receive {subscriber, P2} -> P2 end,
+    case Strategy of
+        sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2);
+        round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2);
+        hash -> ?assert(UsedSubPid1 =:= UsedSubPid2);
+        _ -> ok
+    end,
+    emqx_mock_client:close_session(ConnPid1, SPid1),
+    emqx_mock_client:close_session(ConnPid2, SPid2),
+    ok.
+
+last_message(_ExpectedPayload, []) -> <<"not yet?">>;
+last_message(ExpectedPayload, [Pid | Pids]) ->
+    case emqx_mock_client:get_last_message(Pid) of
+        {publish, _, #message{payload = ExpectedPayload}} -> {true, Pid};
+        _Other -> last_message(ExpectedPayload, Pids)
+    end.
+
+%%------------------------------------------------------------------------------
+%% help functions
+%%------------------------------------------------------------------------------
+
+wait_for(Fn, Ln, F, Timeout) ->
+    {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end),
+    wait_for_down(Fn, Ln, Timeout, Pid, Mref, false).
+
+wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) ->
+    receive
+        {'DOWN', Mref, process, Pid, normal} ->
+            ok;
+        {'DOWN', Mref, process, Pid, {C, E, S}} ->
+            erlang:raise(C, {Fn, Ln, E}, S)
+    after
+        Timeout ->
+            case Kill of
+                true ->
+                    erlang:demonitor(Mref, [flush]),
+                    erlang:exit(Pid, kill),
+                    erlang:error({Fn, Ln, timeout});
+                false ->
+                    Pid ! stop,
+                    wait_for_down(Fn, Ln, Timeout, Pid, Mref, true)
+            end
+    end.
+
+wait_loop(_F, true) -> exit(normal);
+wait_loop(F, LastRes) ->
+    Res = catch_call(F),
+    receive
+        stop -> erlang:exit(LastRes)
+    after
+        100 -> wait_loop(F, Res)
+    end.
+
+catch_call(F) ->
+    try
+        case F() of
+            true -> true;
+            Other -> erlang:error({unexpected, Other})
+        end
+    catch
+        C : E : S ->
+            {C, E, S}
+    end.
+