Просмотр исходного кода

fix(ignore-loop-deliver): fix issue#3738 (#3741)

tigercl 5 лет назад
Родитель
Сommit
da4d71917f
5 измененных файлов с 57 добавлено и 39 удалено
  1. 12 0
      src/emqx.appup.src
  2. 4 4
      src/emqx_alarm.erl
  3. 32 27
      src/emqx_channel.erl
  4. 1 1
      src/emqx_session.erl
  5. 8 7
      test/emqx_channel_SUITE.erl

+ 12 - 0
src/emqx.appup.src

@@ -0,0 +1,12 @@
+{"4.2.1",
+  [
+    {"4.2.0", [
+      {load_module, emqx_channel, brutal_purge, soft_purge, []}
+    ]}
+  ],
+  [
+    {"4.2.0", [
+      {load_module, emqx_channel, brutal_purge, soft_purge, []}
+    ]}
+  ]
+}.

+ 4 - 4
src/emqx_alarm.erl

@@ -200,10 +200,10 @@ handle_call({deactivate_alarm, Name}, _From, State = #state{actions = Actions,
                     ok
             end,
             Alarm = #deactivated_alarm{activate_at = ActivateAt,
-                                        name = Name,
-                                        details = Details,
-                                        message = Message,
-                                        deactivate_at = erlang:system_time(microsecond)},
+                                       name = Name,
+                                       details = Details,
+                                       message = Message,
+                                       deactivate_at = erlang:system_time(microsecond)},
             mnesia:dirty_delete(?ACTIVATED_ALARM, Name),
             mnesia:dirty_write(?DEACTIVATED_ALARM, Alarm),
             do_actions(deactivate, Alarm, Actions),

+ 32 - 27
src/emqx_channel.erl

@@ -648,17 +648,21 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel.
 -spec(handle_deliver(list(emqx_types:deliver()), channel())
       -> {ok, channel()} | {ok, replies(), channel()}).
 handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
-                                            session    = Session}) ->
-    NSession = emqx_session:enqueue(maybe_nack(Delivers), Session),
+                                            session    = Session,
+                                            clientinfo = #{clientid := ClientId}}) ->
+    NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session),
     {ok, Channel#channel{session = NSession}};
 
 handle_deliver(Delivers, Channel = #channel{takeover = true,
-                                            pendings = Pendings}) ->
-    NPendings = lists:append(Pendings, maybe_nack(Delivers)),
+                                            pendings = Pendings,
+                                            session = Session,
+                                            clientinfo = #{clientid := ClientId}}) ->
+    NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)),
     {ok, Channel#channel{pendings = NPendings}};
 
-handle_deliver(Delivers, Channel = #channel{session = Session}) ->
-    case emqx_session:deliver(Delivers, Session) of
+handle_deliver(Delivers, Channel = #channel{session = Session,
+                                            clientinfo = #{clientid := ClientId}}) ->
+    case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
         {ok, Publishes, NSession} ->
             NChannel = Channel#channel{session = NSession},
             handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
@@ -666,6 +670,19 @@ handle_deliver(Delivers, Channel = #channel{session = Session}) ->
             {ok, Channel#channel{session = NSession}}
     end.
 
+ignore_local(Delivers, Subscriber, Session) ->
+    Subs = emqx_session:info(subscriptions, Session),
+    lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) ->
+                        case maps:find(Topic, Subs) of
+                            {ok, #{nl := 1}} when Subscriber =:= Publisher ->
+                                ok = emqx_metrics:inc('delivery.dropped'),
+                                ok = emqx_metrics:inc('delivery.dropped.no_local'),
+                                true;
+                            _ ->
+                                false
+                        end
+                    end, Delivers).
+
 %% Nack delivers from shared subscription
 maybe_nack(Delivers) ->
     lists:filter(fun not_nacked/1, Delivers).
@@ -782,22 +799,15 @@ do_deliver({pubrel, PacketId}, Channel) ->
 
 do_deliver({PacketId, Msg}, Channel = #channel{clientinfo = ClientInfo =
                                      #{mountpoint := MountPoint}}) ->
-    case ignore_local(Msg, ClientInfo) of
-        true ->
-            ok = emqx_metrics:inc('delivery.dropped'),
-            ok = emqx_metrics:inc('delivery.dropped.no_local'),
-            {[], Channel};
-        false ->
-            ok = emqx_metrics:inc('messages.delivered'),
-            Msg1 = emqx_hooks:run_fold('message.delivered',
-                                       [ClientInfo],
-                                       emqx_message:update_expiry(Msg)
-                                      ),
-            Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
-            Packet = emqx_message:to_packet(PacketId, Msg2),
-            {NPacket, NChannel} = packing_alias(Packet, Channel),
-            {[NPacket], NChannel}
-    end;
+    ok = emqx_metrics:inc('messages.delivered'),
+    Msg1 = emqx_hooks:run_fold('message.delivered',
+                                [ClientInfo],
+                                emqx_message:update_expiry(Msg)
+                                ),
+    Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
+    Packet = emqx_message:to_packet(PacketId, Msg2),
+    {NPacket, NChannel} = packing_alias(Packet, Channel),
+    {[NPacket], NChannel};
 
 do_deliver([Publish], Channel) ->
     do_deliver(Publish, Channel);
@@ -810,11 +820,6 @@ do_deliver(Publishes, Channel) when is_list(Publishes) ->
         end, {[], Channel}, Publishes),
     {lists:reverse(Packets), NChannel}.
 
-ignore_local(#message{flags = #{nl := true}, from = ClientId},
-             #{clientid := ClientId}) ->
-    true;
-ignore_local(_Msg, _ClientInfo) -> false.
-
 %%--------------------------------------------------------------------
 %% Handle out suback
 %%--------------------------------------------------------------------

+ 1 - 1
src/emqx_session.erl

@@ -429,7 +429,7 @@ deliver(Delivers, Session) ->
 deliver([], Publishes, Session) ->
     {ok, lists:reverse(Publishes), Session};
 
-deliver([Msg|More], Acc, Session) ->
+deliver([Msg | More], Acc, Session) ->
     case deliver_msg(Msg, Session) of
         {ok, Session1} ->
             deliver(More, Acc, Session1);

+ 8 - 7
test/emqx_channel_SUITE.erl

@@ -417,6 +417,14 @@ t_handle_deliver(_) ->
     {ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_deliver(Delivers, channel()),
     ?assertEqual([?QOS_1, ?QOS_2], [emqx_packet:qos(Pkt)|| Pkt <- Packets]).
 
+t_handle_deliver_nl(_) ->
+    ClientInfo = clientinfo(#{clientid => <<"clientid">>}),
+    Session = session(#{subscriptions => #{<<"t1">> => #{nl => 1}}}),
+    Channel = channel(#{clientinfo => ClientInfo, session => Session}),
+    Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>),
+    NMsg = emqx_message:set_flag(nl, Msg),
+    {ok, Channel} = emqx_channel:handle_deliver([{deliver, <<"t1">>, NMsg}], Channel).
+
 %%--------------------------------------------------------------------
 %% Test cases for handle_out
 %%--------------------------------------------------------------------
@@ -434,13 +442,6 @@ t_handle_out_publish_1(_) ->
     {ok, {outgoing, [?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)]}, _Chan}
         = emqx_channel:handle_out(publish, [{1, Msg}], channel()).
 
-t_handle_out_publish_nl(_) ->
-    ClientInfo = clientinfo(#{clientid => <<"clientid">>}),
-    Channel = channel(#{clientinfo => ClientInfo}),
-    Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>),
-    Pubs = [{1, emqx_message:set_flag(nl, Msg)}],
-    {ok, {outgoing,[]}, Channel} = emqx_channel:handle_out(publish, Pubs, Channel).
-
 t_handle_out_connack_sucess(_) ->
     {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} =
         emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, #{}}, channel()),