Просмотр исходного кода

Merge pull request #7860 from gsychev/shared_redispatch5

Shared redispatch for 5.0
gsychev 3 лет назад
Родитель
Сommit
b80f038fcc
3 измененных файлов с 123 добавлено и 63 удалено
  1. 41 20
      apps/emqx/src/emqx_session.erl
  2. 41 32
      apps/emqx/src/emqx_shared_sub.erl
  3. 41 11
      apps/emqx/test/emqx_shared_sub_SUITE.erl

+ 41 - 20
apps/emqx/src/emqx_session.erl

@@ -561,10 +561,13 @@ deliver_msg(
                 end,
             {ok, Session1};
         false ->
+            %% Note that we publish message without shared ack header
+            %% But add to inflight with ack headers
+            %% This ack header is required for redispatch-on-terminate feature to work
             Publish = {PacketId, maybe_ack(Msg)},
-            Msg2 = mark_begin_deliver(Msg),
-            Session1 = await(PacketId, Msg2, Session),
-            {ok, [Publish], next_pkt_id(Session1)}
+            MarkedMsg = mark_begin_deliver(Msg),
+            Inflight1 = emqx_inflight:insert(PacketId, with_ts(MarkedMsg), Inflight),
+            {ok, [Publish], next_pkt_id(Session#session{inflight = Inflight1})}
     end.
 
 -spec enqueue(
@@ -625,14 +628,10 @@ enrich_deliver({deliver, Topic, Msg}, Session = #session{subscriptions = Subs})
     enrich_subopts(get_subopts(Topic, Subs), Msg, Session).
 
 maybe_ack(Msg) ->
-    case emqx_shared_sub:is_ack_required(Msg) of
-        true -> emqx_shared_sub:maybe_ack(Msg);
-        false -> Msg
-    end.
+    emqx_shared_sub:maybe_ack(Msg).
 
 maybe_nack(Msg) ->
-    emqx_shared_sub:is_ack_required(Msg) andalso
-        (ok == emqx_shared_sub:maybe_nack_dropped(Msg)).
+    emqx_shared_sub:maybe_nack_dropped(Msg).
 
 get_subopts(Topic, SubMap) ->
     case maps:find(Topic, SubMap) of
@@ -673,14 +672,6 @@ enrich_subopts([{subid, SubId} | Opts], Msg, Session) ->
     Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg),
     enrich_subopts(Opts, Msg1, Session).
 
-%%--------------------------------------------------------------------
-%% Awaiting ACK for QoS1/QoS2 Messages
-%%--------------------------------------------------------------------
-
-await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
-    Inflight1 = emqx_inflight:insert(PacketId, with_ts(Msg), Inflight),
-    Session#session{inflight = Inflight1}.
-
 %%--------------------------------------------------------------------
 %% Retry Delivery
 %%--------------------------------------------------------------------
@@ -808,13 +799,43 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) ->
     end.
 
 -spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok.
-terminate(ClientInfo, discarded, Session) ->
+terminate(ClientInfo, Reason, Session) ->
+    run_terminate_hooks(ClientInfo, Reason, Session),
+    redispatch_shared_messages(Session),
+    ok.
+
+run_terminate_hooks(ClientInfo, discarded, Session) ->
     run_hook('session.discarded', [ClientInfo, info(Session)]);
-terminate(ClientInfo, takenover, Session) ->
+run_terminate_hooks(ClientInfo, takenover, Session) ->
     run_hook('session.takenover', [ClientInfo, info(Session)]);
-terminate(ClientInfo, Reason, Session) ->
+run_terminate_hooks(ClientInfo, Reason, Session) ->
     run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
 
+redispatch_shared_messages(#session{inflight = Inflight}) ->
+    InflightList = emqx_inflight:to_list(Inflight),
+    lists:foreach(
+        fun
+            %% Only QoS1 messages get redispatched, because QoS2 messages
+            %% must be sent to the same client, once they're in flight
+            ({_, #inflight_data{message = #message{qos = ?QOS_2} = Msg}}) ->
+                ?SLOG(warning, #{msg => qos2_lost_no_redispatch}, #{message => Msg});
+            ({_, #inflight_data{message = #message{topic = Topic, qos = ?QOS_1} = Msg}}) ->
+                case emqx_shared_sub:get_group(Msg) of
+                    {ok, Group} ->
+                        %% Note that dispatch is called with self() in failed subs
+                        %% This is done to avoid dispatching back to caller
+                        Delivery = #delivery{sender = self(), message = Msg},
+                        emqx_shared_sub:dispatch(Group, Topic, Delivery, [self()]);
+                    _ ->
+                        false
+                end;
+            (_) ->
+                ok
+        end,
+        InflightList
+    ).
+
+-compile({inline, [run_hook/2]}).
 run_hook(Name, Args) ->
     ok = emqx_metrics:inc(Name),
     emqx_hooks:run(Name, Args).

+ 41 - 32
apps/emqx/src/emqx_shared_sub.erl

@@ -36,13 +36,17 @@
     unsubscribe/3
 ]).
 
--export([dispatch/3]).
+-export([
+    dispatch/3,
+    dispatch/4
+]).
 
 -export([
     maybe_ack/1,
     maybe_nack_dropped/1,
     nack_no_connection/1,
-    is_ack_required/1
+    is_ack_required/1,
+    get_group/1
 ]).
 
 %% for testing
@@ -132,7 +136,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
         false ->
             {error, no_subscribers};
         {Type, SubPid} ->
-            case do_dispatch(SubPid, Topic, Msg, Type) of
+            case do_dispatch(SubPid, Group, Topic, Msg, Type) of
                 ok ->
                     {ok, 1};
                 {error, _Reason} ->
@@ -152,36 +156,33 @@ strategy(Group) ->
 ack_enabled() ->
     emqx:get_config([broker, shared_dispatch_ack_enabled]).
 
-do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() ->
+do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
     %% Deadlock otherwise
-    _ = erlang:send(SubPid, {deliver, Topic, Msg}),
+    SubPid ! {deliver, Topic, Msg},
     ok;
-do_dispatch(SubPid, Topic, Msg, Type) ->
-    dispatch_per_qos(SubPid, Topic, Msg, Type).
-
 %% return either 'ok' (when everything is fine) or 'error'
-dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
+do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
     %% For QoS 0 message, send it as regular dispatch
-    _ = erlang:send(SubPid, {deliver, Topic, Msg}),
+    SubPid ! {deliver, Topic, Msg},
     ok;
-dispatch_per_qos(SubPid, Topic, Msg, retry) ->
+do_dispatch(SubPid, _Group, Topic, Msg, retry) ->
     %% Retry implies all subscribers nack:ed, send again without ack
-    _ = erlang:send(SubPid, {deliver, Topic, Msg}),
+    SubPid ! {deliver, Topic, Msg},
     ok;
-dispatch_per_qos(SubPid, Topic, Msg, fresh) ->
+do_dispatch(SubPid, Group, Topic, Msg, fresh) ->
     case ack_enabled() of
         true ->
-            dispatch_with_ack(SubPid, Topic, Msg);
+            dispatch_with_ack(SubPid, Group, Topic, Msg);
         false ->
-            _ = erlang:send(SubPid, {deliver, Topic, Msg}),
+            SubPid ! {deliver, Topic, Msg},
             ok
     end.
 
-dispatch_with_ack(SubPid, Topic, Msg) ->
+dispatch_with_ack(SubPid, Group, Topic, Msg) ->
     %% For QoS 1/2 message, expect an ack
     Ref = erlang:monitor(process, SubPid),
     Sender = self(),
-    _ = erlang:send(SubPid, {deliver, Topic, with_ack_ref(Msg, {Sender, Ref})}),
+    SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)},
     Timeout =
         case Msg#message.qos of
             ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
@@ -203,24 +204,32 @@ dispatch_with_ack(SubPid, Topic, Msg) ->
         ok = emqx_pmon:demonitor(Ref)
     end.
 
-with_ack_ref(Msg, SenderRef) ->
-    emqx_message:set_headers(#{shared_dispatch_ack => SenderRef}, Msg).
+with_group_ack(Msg, Group, Sender, Ref) ->
+    emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg).
 
-without_ack_ref(Msg) ->
+-spec without_group_ack(emqx_types:message()) -> emqx_types:message().
+without_group_ack(Msg) ->
     emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg).
 
-get_ack_ref(Msg) ->
+get_group_ack(Msg) ->
     emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
 
 -spec is_ack_required(emqx_types:message()) -> boolean().
-is_ack_required(Msg) -> ?NO_ACK =/= get_ack_ref(Msg).
+is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
+
+-spec get_group(emqx_types:message()) -> {ok, any()} | error.
+get_group(Msg) ->
+    case get_group_ack(Msg) of
+        ?NO_ACK -> error;
+        {Group, _Sender, _Ref} -> {ok, Group}
+    end.
 
 %% @doc Negative ack dropped message due to inflight window or message queue being full.
--spec maybe_nack_dropped(emqx_types:message()) -> ok.
+-spec maybe_nack_dropped(emqx_types:message()) -> boolean().
 maybe_nack_dropped(Msg) ->
-    case get_ack_ref(Msg) of
-        ?NO_ACK -> ok;
-        {Sender, Ref} -> nack(Sender, Ref, dropped)
+    case get_group_ack(Msg) of
+        ?NO_ACK -> false;
+        {_Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped)
     end.
 
 %% @doc Negative ack message due to connection down.
@@ -228,22 +237,22 @@ maybe_nack_dropped(Msg) ->
 %% i.e is_ack_required returned true.
 -spec nack_no_connection(emqx_types:message()) -> ok.
 nack_no_connection(Msg) ->
-    {Sender, Ref} = get_ack_ref(Msg),
+    {_Group, Sender, Ref} = get_group_ack(Msg),
     nack(Sender, Ref, no_connection).
 
 -spec nack(pid(), reference(), dropped | no_connection) -> ok.
 nack(Sender, Ref, Reason) ->
-    erlang:send(Sender, {Ref, ?NACK(Reason)}),
+    Sender ! {Ref, ?NACK(Reason)},
     ok.
 
 -spec maybe_ack(emqx_types:message()) -> emqx_types:message().
 maybe_ack(Msg) ->
-    case get_ack_ref(Msg) of
+    case get_group_ack(Msg) of
         ?NO_ACK ->
             Msg;
-        {Sender, Ref} ->
-            erlang:send(Sender, {Ref, ?ACK}),
-            without_ack_ref(Msg)
+        {_Group, Sender, Ref} ->
+            Sender ! {Ref, ?ACK},
+            without_group_ack(Msg)
     end.
 
 pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->

+ 41 - 11
apps/emqx/test/emqx_shared_sub_SUITE.erl

@@ -49,9 +49,9 @@ t_is_ack_required(_) ->
     ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
 
 t_maybe_nack_dropped(_) ->
-    ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
-    Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}},
-    ?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(Msg)),
+    ?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
+    Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
+    ?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)),
     ?assertEqual(
         ok,
         receive
@@ -61,7 +61,7 @@ t_maybe_nack_dropped(_) ->
     ).
 
 t_nack_no_connection(_) ->
-    Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}},
+    Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
     ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
     ?assertEqual(
         ok,
@@ -73,7 +73,7 @@ t_nack_no_connection(_) ->
 
 t_maybe_ack(_) ->
     ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
-    Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}},
+    Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
     ?assertEqual(
         #message{headers = #{shared_dispatch_ack => ?no_ack}},
         emqx_shared_sub:maybe_ack(Msg)
@@ -313,11 +313,15 @@ test_two_messages(Strategy, Group) ->
     ok.
 
 last_message(ExpectedPayload, Pids) ->
+    last_message(ExpectedPayload, Pids, 100).
+
+last_message(ExpectedPayload, Pids, Timeout) ->
     receive
         {publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
             ct:pal("~p ====== ~p", [Pids, Pid]),
             {true, Pid}
-    after 500 ->
+    after Timeout ->
+        ct:pal("not yet"),
         <<"not yet?">>
     end.
 
@@ -334,11 +338,6 @@ t_dispatch(_) ->
         emqx_shared_sub:dispatch(<<"group1">>, Topic, #delivery{message = #message{}})
     ).
 
-% t_unsubscribe(_) ->
-%     error('TODO').
-
-% t_subscribe(_) ->
-%     error('TODO').
 t_uncovered_func(_) ->
     ignored = gen_server:call(emqx_shared_sub, ignored),
     ok = gen_server:cast(emqx_shared_sub, ignored),
@@ -440,6 +439,37 @@ t_local_fallback(_) ->
     ?assertEqual(UsedSubPid1, UsedSubPid2),
     ok.
 
+%% This one tests that broker tries to select another shared subscriber
+%% If the first one doesn't return an ACK
+t_redispatch(_) ->
+    ok = ensure_config(sticky, true),
+
+    Group = <<"group1">>,
+    Topic = <<"foo/bar">>,
+    ClientId1 = <<"ClientId1">>,
+    ClientId2 = <<"ClientId2">>,
+    {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
+    {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]),
+    {ok, _} = emqtt:connect(ConnPid1),
+    {ok, _} = emqtt:connect(ConnPid2),
+
+    emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
+    emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
+
+    Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>),
+
+    emqx:publish(Message),
+
+    {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
+    ok = emqtt:stop(UsedSubPid1),
+
+    Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000),
+    ?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res),
+
+    {true, UsedSubPid2} = Res,
+    emqtt:stop(UsedSubPid2),
+    ok.
+
 %%--------------------------------------------------------------------
 %% help functions
 %%--------------------------------------------------------------------