Quellcode durchsuchen

fix: account calls when resource is not connected as matched

Thales Macedo Garitezi vor 3 Jahren
Ursprung
Commit
2d01726b22

+ 35 - 11
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -23,6 +23,7 @@
 -include("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include("emqx_dashboard/include/emqx_dashboard.hrl").
 
 %% output functions
@@ -511,15 +512,15 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
     %% we now test if the bridge works as expected
     LocalTopic = <<"local_topic/1">>,
     RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
-    Payload = <<"hello">>,
+    Payload0 = <<"hello">>,
     emqx:subscribe(RemoteTopic),
     timer:sleep(100),
     %% PUBLISH a message to the 'local' broker, as we have only one broker,
     %% the remote broker is also the local one.
-    emqx:publish(emqx_message:make(LocalTopic, Payload)),
+    emqx:publish(emqx_message:make(LocalTopic, Payload0)),
 
     %% we should receive a message on the "remote" broker, with specified topic
-    assert_mqtt_msg_received(RemoteTopic, Payload),
+    assert_mqtt_msg_received(RemoteTopic, Payload0),
 
     %% verify the metrics of the bridge
     {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
@@ -543,18 +544,40 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
     ct:sleep(1500),
 
     %% PUBLISH 2 messages to the 'local' broker, the message should
-    emqx:publish(emqx_message:make(LocalTopic, Payload)),
-    emqx:publish(emqx_message:make(LocalTopic, Payload)),
+    ok = snabbkaffe:start_trace(),
+    {ok, SRef} =
+        snabbkaffe:subscribe(
+            fun
+                (
+                    #{
+                        ?snk_kind := call_query_enter,
+                        query := {query, _From, {send_message, #{}}, _Sent}
+                    }
+                ) ->
+                    true;
+                (_) ->
+                    false
+            end,
+            _NEvents = 2,
+            _Timeout = 1_000
+        ),
+    Payload1 = <<"hello2">>,
+    Payload2 = <<"hello3">>,
+    emqx:publish(emqx_message:make(LocalTopic, Payload1)),
+    emqx:publish(emqx_message:make(LocalTopic, Payload2)),
+    {ok, _} = snabbkaffe:receive_events(SRef),
+    ok = snabbkaffe:stop(),
 
     %% verify the metrics of the bridge, the message should be queued
     {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    %% matched >= 3 because of possible retries.
     ?assertMatch(
         #{
             <<"status">> := Status,
             <<"metrics">> := #{
-                <<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2
+                <<"matched">> := Matched, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2
             }
-        } when Status == <<"connected">> orelse Status == <<"connecting">>,
+        } when Matched >= 3 andalso (Status == <<"connected">> orelse Status == <<"connecting">>),
         jsx:decode(BridgeStr1)
     ),
 
@@ -563,22 +586,23 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
     timer:sleep(1500),
     %% verify the metrics of the bridge, the 2 queued messages should have been sent
     {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    %% matched >= 3 because of possible retries.
     ?assertMatch(
         #{
             <<"status">> := <<"connected">>,
             <<"metrics">> := #{
-                <<"matched">> := 3,
+                <<"matched">> := Matched,
                 <<"success">> := 3,
                 <<"failed">> := 0,
                 <<"queuing">> := 0,
                 <<"retried">> := _
             }
-        },
+        } when Matched >= 3,
         jsx:decode(BridgeStr2)
     ),
     %% also verify the 2 messages have been sent to the remote broker
-    assert_mqtt_msg_received(RemoteTopic, Payload),
-    assert_mqtt_msg_received(RemoteTopic, Payload),
+    assert_mqtt_msg_received(RemoteTopic, Payload1),
+    assert_mqtt_msg_received(RemoteTopic, Payload2),
     %% delete the bridge
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),

+ 3 - 0
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -410,6 +410,7 @@ handle_query_result(Id, Result, HasSent, BlockWorker) ->
     BlockWorker.
 
 call_query(QM0, Id, Query, QueryOpts) ->
+    ?tp(call_query_enter, #{id => Id, query => Query}),
     case emqx_resource_manager:ets_lookup(Id) of
         {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} ->
             QM =
@@ -421,8 +422,10 @@ call_query(QM0, Id, Query, QueryOpts) ->
             emqx_resource_metrics:matched_inc(Id),
             apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts);
         {ok, _Group, #{status := stopped}} ->
+            emqx_resource_metrics:matched_inc(Id),
             ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
         {ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
+            emqx_resource_metrics:matched_inc(Id),
             ?RESOURCE_ERROR(not_connected, "resource not connected");
         {error, not_found} ->
             ?RESOURCE_ERROR(not_found, "resource not found")