Просмотр исходного кода

Merge pull request #12556 from zhongwencool/fix-rabbitmq-source-metrics

Fix rabbitmq source metrics
zhongwencool 2 лет назад
Родитель
Сommit
4afcb0542f

+ 1 - 1
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_gcp_pubsub, [
     {description, "EMQX Enterprise GCP Pub/Sub Bridge"},
-    {vsn, "0.2.1"},
+    {vsn, "0.2.2"},
     {registered, []},
     {applications, [
         kernel,

+ 7 - 8
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl

@@ -317,17 +317,17 @@ handle_result(Res) ->
 
 make_channel(PoolName, ChannelId, Params) ->
     Conns = get_rabbitmq_connections(PoolName),
-    make_channel(Conns, PoolName, ChannelId, Params, #{}).
+    make_channel(Conns, ChannelId, Params, #{}).
 
-make_channel([], _PoolName, _ChannelId, _Param, Acc) ->
+make_channel([], _ChannelId, _Param, Acc) ->
     {ok, Acc};
-make_channel([Conn | Conns], PoolName, ChannelId, Params, Acc) ->
+make_channel([Conn | Conns], ChannelId, Params, Acc) ->
     maybe
         {ok, RabbitMQChannel} ?= amqp_connection:open_channel(Conn),
         ok ?= try_confirm_channel(Params, RabbitMQChannel),
-        ok ?= try_subscribe(Params, RabbitMQChannel, PoolName, ChannelId),
+        ok ?= try_subscribe(Params, RabbitMQChannel, ChannelId),
         NewAcc = Acc#{Conn => RabbitMQChannel},
-        make_channel(Conns, PoolName, ChannelId, Params, NewAcc)
+        make_channel(Conns, ChannelId, Params, NewAcc)
     end.
 
 %% We need to enable confirmations if we want to wait for them
@@ -396,16 +396,15 @@ get_rabbitmq_connections(PoolName) ->
 try_subscribe(
     #{queue := Queue, no_ack := NoAck, config_root := sources} = Params,
     RabbitChan,
-    PoolName,
     ChannelId
 ) ->
-    WorkState = {RabbitChan, PoolName, Params},
+    WorkState = {RabbitChan, ChannelId, Params},
     {ok, ConsumePid} = emqx_bridge_rabbitmq_sup:ensure_started(ChannelId, WorkState),
     BasicConsume = #'basic.consume'{queue = Queue, no_ack = NoAck},
     #'basic.consume_ok'{consumer_tag = _} =
         amqp_channel:subscribe(RabbitChan, BasicConsume, ConsumePid),
     ok;
-try_subscribe(#{config_root := actions}, _RabbitChan, _PoolName, _ChannelId) ->
+try_subscribe(#{config_root := actions}, _RabbitChan, _ChannelId) ->
     ok.
 
 try_unsubscribe(ChannelId, Channels) ->

+ 67 - 0
apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl

@@ -9,6 +9,7 @@
 
 -include_lib("emqx_connector/include/emqx_connector.hrl").
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
 -include_lib("stdlib/include/assert.hrl").
 -include_lib("amqp_client/include/amqp_client.hrl").
 
@@ -189,9 +190,35 @@ t_source(Config) ->
         receive_messages(1)
     ),
     ok = emqtt:disconnect(C1),
+    InstanceId = instance_id(sources, Name),
+    #{counters := Counters} = emqx_resource:get_metrics(InstanceId),
     ok = delete_source(Name),
     SourcesAfterDelete = emqx_bridge_v2:list(sources),
     ?assertNot(lists:any(Any, SourcesAfterDelete), SourcesAfterDelete),
+    ?assertMatch(
+        #{
+            dropped := 0,
+            success := 0,
+            matched := 0,
+            failed := 0,
+            received := 1
+        },
+        Counters
+    ),
+    ok.
+
+t_source_probe(_Config) ->
+    Name = atom_to_binary(?FUNCTION_NAME),
+    Source = rabbitmq_source(),
+    {ok, Res0} = probe_bridge_api(Name, "sources_probe", Source),
+    ?assertMatch({{_, 204, _}, _, _}, Res0),
+    ok.
+
+t_action_probe(_Config) ->
+    Name = atom_to_binary(?FUNCTION_NAME),
+    Action = rabbitmq_action(),
+    {ok, Res0} = probe_bridge_api(Name, "actions_probe", Action),
+    ?assertMatch({{_, 204, _}, _, _}, Res0),
     ok.
 
 t_action(Config) ->
@@ -218,9 +245,21 @@ t_action(Config) ->
     Msg = receive_message_from_rabbitmq(Config),
     ?assertMatch(Payload, Msg),
     ok = emqtt:disconnect(C1),
+    InstanceId = instance_id(actions, Name),
+    #{counters := Counters} = emqx_resource:get_metrics(InstanceId),
     ok = delete_action(Name),
     ActionsAfterDelete = emqx_bridge_v2:list(actions),
     ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete),
+    ?assertMatch(
+        #{
+            dropped := 0,
+            success := 0,
+            matched := 1,
+            failed := 0,
+            received := 0
+        },
+        Counters
+    ),
     ok.
 
 receive_messages(Count) ->
@@ -261,3 +300,31 @@ send_test_message_to_rabbitmq(Config) ->
         }
     ),
     ok.
+
+probe_bridge_api(Name, PathStr, Config) ->
+    Params = Config#{<<"type">> => ?TYPE, <<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path([PathStr]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    ct:pal("probing bridge (via http): ~p", [Params]),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
+            {ok, {{_, 204, _}, _Headers, _Body0} = Res0} ->
+                {ok, Res0};
+            {error, {Status, Headers, Body0}} ->
+                {error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}};
+            Error ->
+                Error
+        end,
+    ct:pal("bridge probe result: ~p", [Res]),
+    Res.
+
+instance_id(Type, Name) ->
+    ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name),
+    BridgeId = emqx_bridge_resource:bridge_id(?TYPE, Name),
+    TypeBin =
+        case Type of
+            sources -> <<"source:">>;
+            actions -> <<"action:">>
+        end,
+    <<TypeBin/binary, BridgeId/binary, ":", ConnectorId/binary>>.