Просмотр исходного кода

Merge pull request #12367 from thalesmg/ds-fix-dropped-msgs-metric-emqx-11539-m-20240122

fix(ds): do not count persistent session-only routed messages as dropped
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
8c6a4139e6
2 измененных файлов с 38 добавлено и 5 удалено
  1. 7 5
      apps/emqx/src/emqx_broker.erl
  2. 31 0
      apps/emqx/test/emqx_persistent_messages_SUITE.erl

+ 7 - 5
apps/emqx/src/emqx_broker.erl

@@ -249,7 +249,7 @@ publish(Msg) when is_record(Msg, message) ->
             [];
         Msg1 = #message{topic = Topic} ->
             PersistRes = persist_publish(Msg1),
-            PersistRes ++ route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
+            route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1), PersistRes)
     end.
 
 persist_publish(Msg) ->
@@ -289,18 +289,20 @@ delivery(Msg) -> #delivery{sender = self(), message = Msg}.
 %% Route
 %%--------------------------------------------------------------------
 
--spec route([emqx_types:route_entry()], emqx_types:delivery()) ->
+-spec route([emqx_types:route_entry()], emqx_types:delivery(), nil() | [persisted]) ->
     emqx_types:publish_result().
-route([], #delivery{message = Msg}) ->
+route([], #delivery{message = Msg}, _PersistRes = []) ->
     ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
     ok = inc_dropped_cnt(Msg),
     [];
-route(Routes, Delivery) ->
+route([], _Delivery, PersistRes = [_ | _]) ->
+    PersistRes;
+route(Routes, Delivery, PersistRes) ->
     lists:foldl(
         fun(Route, Acc) ->
             [do_route(Route, Delivery) | Acc]
         end,
-        [],
+        PersistRes,
         Routes
     ).
 

+ 31 - 0
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -450,6 +450,32 @@ t_message_gc(Config) ->
     ),
     ok.
 
+t_metrics_not_dropped(_Config) ->
+    %% Asserts that, if only persisted sessions are subscribed to a topic being published
+    %% to, we don't bump the `message.dropped' metric, nor we run the equivalent hook.
+    Sub = connect(<<?MODULE_STRING "1">>, true, 30),
+    on_exit(fun() -> emqtt:stop(Sub) end),
+    Pub = connect(<<?MODULE_STRING "2">>, true, 30),
+    on_exit(fun() -> emqtt:stop(Pub) end),
+    Hookpoint = 'message.dropped',
+    emqx_hooks:add(Hookpoint, {?MODULE, on_message_dropped, [self()]}, 1_000),
+    on_exit(fun() -> emqx_hooks:del(Hookpoint, {?MODULE, on_message_dropped}) end),
+
+    DroppedBefore = emqx_metrics:val('messages.dropped'),
+    DroppedNoSubBefore = emqx_metrics:val('messages.dropped.no_subscribers'),
+
+    {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Sub, <<"t/+">>, ?QOS_1),
+    emqtt:publish(Pub, <<"t/ps">>, <<"payload">>, ?QOS_1),
+    ?assertMatch([_], receive_messages(1, 1_500)),
+
+    DroppedAfter = emqx_metrics:val('messages.dropped'),
+    DroppedNoSubAfter = emqx_metrics:val('messages.dropped.no_subscribers'),
+
+    ?assertEqual(DroppedBefore, DroppedAfter),
+    ?assertEqual(DroppedNoSubBefore, DroppedNoSubAfter),
+
+    ok.
+
 %%
 
 connect(ClientId, CleanStart, EI) ->
@@ -542,3 +568,8 @@ message(Topic, Payload, PublishedAt) ->
         timestamp = PublishedAt,
         id = emqx_guid:gen()
     }.
+
+on_message_dropped(Msg, Context, Res, TestPid) ->
+    ErrCtx = #{msg => Msg, ctx => Context, res => Res},
+    ct:pal("this hook should not be called.\n  ~p", [ErrCtx]),
+    exit(TestPid, {hookpoint_called, ErrCtx}).