Просмотр исходного кода

Merge pull request #9882 from fwup/fix/no-mqtt-bridge-middleman

refactor(mqtt-worker): avoid unnecessary abstraction
Andrew Mayorov 3 лет назад
Родитель
Сommit
ca5c192f4b

+ 226 - 325
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -32,7 +32,6 @@
 
 -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
 -define(TYPE_MQTT, <<"mqtt">>).
--define(NAME_MQTT, <<"my_mqtt_bridge">>).
 -define(BRIDGE_NAME_INGRESS, <<"ingress_mqtt_bridge">>).
 -define(BRIDGE_NAME_EGRESS, <<"egress_mqtt_bridge">>).
 
@@ -98,6 +97,24 @@
     }
 }).
 
+-define(assertMetrics(Pat, BridgeID),
+    ?assertMetrics(Pat, true, BridgeID)
+).
+-define(assertMetrics(Pat, Guard, BridgeID),
+    ?assertMatch(
+        #{
+            <<"metrics">> := Pat,
+            <<"node_metrics">> := [
+                #{
+                    <<"node">> := _,
+                    <<"metrics">> := Pat
+                }
+            ]
+        } when Guard,
+        request_bridge_metrics(BridgeID)
+    )
+).
+
 inspect(Selected, _Envs, _Args) ->
     persistent_term:put(?MODULE, #{inspect => Selected}).
 
@@ -176,7 +193,7 @@ t_mqtt_conn_bridge_ingress(_) ->
     {ok, 201, Bridge} = request(
         post,
         uri(["bridges"]),
-        ?SERVER_CONF(User1)#{
+        ServerConf = ?SERVER_CONF(User1)#{
             <<"type">> => ?TYPE_MQTT,
             <<"name">> => ?BRIDGE_NAME_INGRESS,
             <<"ingress">> => ?INGRESS_CONF
@@ -186,8 +203,21 @@ t_mqtt_conn_bridge_ingress(_) ->
         <<"type">> := ?TYPE_MQTT,
         <<"name">> := ?BRIDGE_NAME_INGRESS
     } = jsx:decode(Bridge),
+
     BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
 
+    %% try to create the bridge again
+    ?assertMatch(
+        {ok, 400, _},
+        request(post, uri(["bridges"]), ServerConf)
+    ),
+
+    %% try to reconfigure the bridge
+    ?assertMatch(
+        {ok, 200, _},
+        request(put, uri(["bridges", BridgeIDIngress]), ServerConf)
+    ),
+
     %% we now test if the bridge works as expected
     RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
     LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
@@ -198,34 +228,12 @@ t_mqtt_conn_bridge_ingress(_) ->
     %% the remote broker is also the local one.
     emqx:publish(emqx_message:make(RemoteTopic, Payload)),
     %% we should receive a message on the local broker, with specified topic
-    ?assert(
-        receive
-            {deliver, LocalTopic, #message{payload = Payload}} ->
-                ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
-                true;
-            Msg ->
-                ct:pal("Msg: ~p", [Msg]),
-                false
-        after 100 ->
-            false
-        end
-    ),
+    assert_mqtt_msg_received(LocalTopic, Payload),
 
     %% verify the metrics of the bridge
-    {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []),
-    ?assertMatch(
-        #{
-            <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
-            <<"node_metrics">> :=
-                [
-                    #{
-                        <<"node">> := _,
-                        <<"metrics">> :=
-                            #{<<"matched">> := 0, <<"received">> := 1}
-                    }
-                ]
-        },
-        jsx:decode(BridgeMetricsStr)
+    ?assertMetrics(
+        #{<<"matched">> := 0, <<"received">> := 1},
+        BridgeIDIngress
     ),
 
     %% delete the bridge
@@ -236,21 +244,13 @@ t_mqtt_conn_bridge_ingress(_) ->
 
 t_mqtt_conn_bridge_ingress_no_payload_template(_) ->
     User1 = <<"user1">>,
-    %% create an MQTT bridge, using POST
-    {ok, 201, Bridge} = request(
-        post,
-        uri(["bridges"]),
+    BridgeIDIngress = create_bridge(
         ?SERVER_CONF(User1)#{
             <<"type">> => ?TYPE_MQTT,
             <<"name">> => ?BRIDGE_NAME_INGRESS,
             <<"ingress">> => ?INGRESS_CONF_NO_PAYLOAD_TEMPLATE
         }
     ),
-    #{
-        <<"type">> := ?TYPE_MQTT,
-        <<"name">> := ?BRIDGE_NAME_INGRESS
-    } = jsx:decode(Bridge),
-    BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
 
     %% we now test if the bridge works as expected
     RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
@@ -262,40 +262,13 @@ t_mqtt_conn_bridge_ingress_no_payload_template(_) ->
     %% the remote broker is also the local one.
     emqx:publish(emqx_message:make(RemoteTopic, Payload)),
     %% we should receive a message on the local broker, with specified topic
-    ?assert(
-        receive
-            {deliver, LocalTopic, #message{payload = MapMsg}} ->
-                ct:pal("local broker got message: ~p on topic ~p", [MapMsg, LocalTopic]),
-                %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here.
-                case jsx:decode(MapMsg) of
-                    #{<<"payload">> := Payload} ->
-                        true;
-                    _ ->
-                        false
-                end;
-            Msg ->
-                ct:pal("Msg: ~p", [Msg]),
-                false
-        after 100 ->
-            false
-        end
-    ),
+    Msg = assert_mqtt_msg_received(LocalTopic),
+    ?assertMatch(#{<<"payload">> := Payload}, jsx:decode(Msg#message.payload)),
 
     %% verify the metrics of the bridge
-    {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []),
-    ?assertMatch(
-        #{
-            <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
-            <<"node_metrics">> :=
-                [
-                    #{
-                        <<"node">> := _,
-                        <<"metrics">> :=
-                            #{<<"matched">> := 0, <<"received">> := 1}
-                    }
-                ]
-        },
-        jsx:decode(BridgeStr)
+    ?assertMetrics(
+        #{<<"matched">> := 0, <<"received">> := 1},
+        BridgeIDIngress
     ),
 
     %% delete the bridge
@@ -307,22 +280,15 @@ t_mqtt_conn_bridge_ingress_no_payload_template(_) ->
 t_mqtt_conn_bridge_egress(_) ->
     %% then we add a mqtt connector, using POST
     User1 = <<"user1">>,
-
-    {ok, 201, Bridge} = request(
-        post,
-        uri(["bridges"]),
+    BridgeIDEgress = create_bridge(
         ?SERVER_CONF(User1)#{
             <<"type">> => ?TYPE_MQTT,
             <<"name">> => ?BRIDGE_NAME_EGRESS,
             <<"egress">> => ?EGRESS_CONF
         }
     ),
-    #{
-        <<"type">> := ?TYPE_MQTT,
-        <<"name">> := ?BRIDGE_NAME_EGRESS
-    } = jsx:decode(Bridge),
-    BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
     ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
+
     %% we now test if the bridge works as expected
     LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
     RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
@@ -334,36 +300,14 @@ t_mqtt_conn_bridge_egress(_) ->
     emqx:publish(emqx_message:make(LocalTopic, Payload)),
 
     %% we should receive a message on the "remote" broker, with specified topic
-    ?assert(
-        receive
-            {deliver, RemoteTopic, #message{payload = Payload, from = From}} ->
-                ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
-                Size = byte_size(ResourceID),
-                ?assertMatch(<<ResourceID:Size/binary, _/binary>>, From),
-                true;
-            Msg ->
-                ct:pal("Msg: ~p", [Msg]),
-                false
-        after 100 ->
-            false
-        end
-    ),
+    Msg = assert_mqtt_msg_received(RemoteTopic, Payload),
+    Size = byte_size(ResourceID),
+    ?assertMatch(<<ResourceID:Size/binary, _/binary>>, Msg#message.from),
 
     %% verify the metrics of the bridge
-    {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
-    ?assertMatch(
-        #{
-            <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
-            <<"node_metrics">> :=
-                [
-                    #{
-                        <<"node">> := _,
-                        <<"metrics">> :=
-                            #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}
-                    }
-                ]
-        },
-        jsx:decode(BridgeMetricsStr)
+    ?assertMetrics(
+        #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
+        BridgeIDEgress
     ),
 
     %% delete the bridge
@@ -375,21 +319,15 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
     %% then we add a mqtt connector, using POST
     User1 = <<"user1">>,
 
-    {ok, 201, Bridge} = request(
-        post,
-        uri(["bridges"]),
+    BridgeIDEgress = create_bridge(
         ?SERVER_CONF(User1)#{
             <<"type">> => ?TYPE_MQTT,
             <<"name">> => ?BRIDGE_NAME_EGRESS,
             <<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE
         }
     ),
-    #{
-        <<"type">> := ?TYPE_MQTT,
-        <<"name">> := ?BRIDGE_NAME_EGRESS
-    } = jsx:decode(Bridge),
-    BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
     ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
+
     %% we now test if the bridge works as expected
     LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
     RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
@@ -401,42 +339,15 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
     emqx:publish(emqx_message:make(LocalTopic, Payload)),
 
     %% we should receive a message on the "remote" broker, with specified topic
-    ?assert(
-        receive
-            {deliver, RemoteTopic, #message{payload = MapMsg, from = From}} ->
-                ct:pal("local broker got message: ~p on topic ~p", [MapMsg, RemoteTopic]),
-                %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here.
-                Size = byte_size(ResourceID),
-                ?assertMatch(<<ResourceID:Size/binary, _/binary>>, From),
-                case jsx:decode(MapMsg) of
-                    #{<<"payload">> := Payload} ->
-                        true;
-                    _ ->
-                        false
-                end;
-            Msg ->
-                ct:pal("Msg: ~p", [Msg]),
-                false
-        after 100 ->
-            false
-        end
-    ),
+    Msg = assert_mqtt_msg_received(RemoteTopic),
+    %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here.
+    ?assertMatch(<<ResourceID:(byte_size(ResourceID))/binary, _/binary>>, Msg#message.from),
+    ?assertMatch(#{<<"payload">> := Payload}, jsx:decode(Msg#message.payload)),
 
     %% verify the metrics of the bridge
-    {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
-    ?assertMatch(
-        #{
-            <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
-            <<"node_metrics">> :=
-                [
-                    #{
-                        <<"node">> := _,
-                        <<"metrics">> :=
-                            #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}
-                    }
-                ]
-        },
-        jsx:decode(BridgeStr)
+    ?assertMetrics(
+        #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
+        BridgeIDEgress
     ),
 
     %% delete the bridge
@@ -447,9 +358,7 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
 
 t_egress_custom_clientid_prefix(_Config) ->
     User1 = <<"user1">>,
-    {ok, 201, Bridge} = request(
-        post,
-        uri(["bridges"]),
+    BridgeIDEgress = create_bridge(
         ?SERVER_CONF(User1)#{
             <<"clientid_prefix">> => <<"my-custom-prefix">>,
             <<"type">> => ?TYPE_MQTT,
@@ -457,11 +366,6 @@ t_egress_custom_clientid_prefix(_Config) ->
             <<"egress">> => ?EGRESS_CONF
         }
     ),
-    #{
-        <<"type">> := ?TYPE_MQTT,
-        <<"name">> := ?BRIDGE_NAME_EGRESS
-    } = jsx:decode(Bridge),
-    BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
     ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
     LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
     RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
@@ -470,58 +374,36 @@ t_egress_custom_clientid_prefix(_Config) ->
     timer:sleep(100),
     emqx:publish(emqx_message:make(LocalTopic, Payload)),
 
-    receive
-        {deliver, RemoteTopic, #message{from = From}} ->
-            Size = byte_size(ResourceID),
-            ?assertMatch(<<"my-custom-prefix:", _ResouceID:Size/binary, _/binary>>, From),
-            ok
-    after 1000 ->
-        ct:fail("should have published message")
-    end,
+    Msg = assert_mqtt_msg_received(RemoteTopic, Payload),
+    Size = byte_size(ResourceID),
+    ?assertMatch(<<"my-custom-prefix:", _ResouceID:Size/binary, _/binary>>, Msg#message.from),
 
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
     ok.
 
 t_mqtt_conn_bridge_ingress_and_egress(_) ->
     User1 = <<"user1">>,
-    %% create an MQTT bridge, using POST
-    {ok, 201, Bridge} = request(
-        post,
-        uri(["bridges"]),
+    BridgeIDIngress = create_bridge(
         ?SERVER_CONF(User1)#{
             <<"type">> => ?TYPE_MQTT,
             <<"name">> => ?BRIDGE_NAME_INGRESS,
             <<"ingress">> => ?INGRESS_CONF
         }
     ),
-
-    #{
-        <<"type">> := ?TYPE_MQTT,
-        <<"name">> := ?BRIDGE_NAME_INGRESS
-    } = jsx:decode(Bridge),
-    BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
-    {ok, 201, Bridge2} = request(
-        post,
-        uri(["bridges"]),
+    BridgeIDEgress = create_bridge(
         ?SERVER_CONF(User1)#{
             <<"type">> => ?TYPE_MQTT,
             <<"name">> => ?BRIDGE_NAME_EGRESS,
             <<"egress">> => ?EGRESS_CONF
         }
     ),
-    #{
-        <<"type">> := ?TYPE_MQTT,
-        <<"name">> := ?BRIDGE_NAME_EGRESS
-    } = jsx:decode(Bridge2),
 
-    BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
     %% we now test if the bridge works as expected
     LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
     RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
     Payload = <<"hello">>,
     emqx:subscribe(RemoteTopic),
 
-    {ok, 200, BridgeMetricsStr1} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
     #{
         <<"metrics">> := #{
             <<"matched">> := CntMatched1, <<"success">> := CntSuccess1, <<"failed">> := 0
@@ -538,29 +420,17 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
                         }
                 }
             ]
-    } = jsx:decode(BridgeMetricsStr1),
+    } = request_bridge_metrics(BridgeIDEgress),
     timer:sleep(100),
     %% PUBLISH a message to the 'local' broker, as we have only one broker,
     %% the remote broker is also the local one.
     emqx:publish(emqx_message:make(LocalTopic, Payload)),
 
     %% we should receive a message on the "remote" broker, with specified topic
-    ?assert(
-        receive
-            {deliver, RemoteTopic, #message{payload = Payload}} ->
-                ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
-                true;
-            Msg ->
-                ct:pal("Msg: ~p", [Msg]),
-                false
-        after 100 ->
-            false
-        end
-    ),
+    assert_mqtt_msg_received(RemoteTopic, Payload),
 
     %% verify the metrics of the bridge
     timer:sleep(1000),
-    {ok, 200, BridgeMetricsStr2} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
     #{
         <<"metrics">> := #{
             <<"matched">> := CntMatched2, <<"success">> := CntSuccess2, <<"failed">> := 0
@@ -577,7 +447,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
                         }
                 }
             ]
-    } = jsx:decode(BridgeMetricsStr2),
+    } = request_bridge_metrics(BridgeIDEgress),
     ?assertEqual(CntMatched2, CntMatched1 + 1),
     ?assertEqual(CntSuccess2, CntSuccess1 + 1),
     ?assertEqual(NodeCntMatched2, NodeCntMatched1 + 1),
@@ -590,16 +460,13 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
     ok.
 
 t_ingress_mqtt_bridge_with_rules(_) ->
-    {ok, 201, _} = request(
-        post,
-        uri(["bridges"]),
+    BridgeIDIngress = create_bridge(
         ?SERVER_CONF(<<"user1">>)#{
             <<"type">> => ?TYPE_MQTT,
             <<"name">> => ?BRIDGE_NAME_INGRESS,
             <<"ingress">> => ?INGRESS_CONF
         }
     ),
-    BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
 
     {ok, 201, Rule} = request(
         post,
@@ -624,18 +491,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
     %% the remote broker is also the local one.
     emqx:publish(emqx_message:make(RemoteTopic, Payload)),
     %% we should receive a message on the local broker, with specified topic
-    ?assert(
-        receive
-            {deliver, LocalTopic, #message{payload = Payload}} ->
-                ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
-                true;
-            Msg ->
-                ct:pal("Msg: ~p", [Msg]),
-                false
-        after 100 ->
-            false
-        end
-    ),
+    assert_mqtt_msg_received(LocalTopic, Payload),
     %% and also the rule should be matched, with matched + 1:
     {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
     {ok, 200, Metrics} = request(get, uri(["rules", RuleId, "metrics"]), []),
@@ -680,37 +536,22 @@ t_ingress_mqtt_bridge_with_rules(_) ->
     ),
 
     %% verify the metrics of the bridge
-    {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []),
-    ?assertMatch(
-        #{
-            <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
-            <<"node_metrics">> :=
-                [
-                    #{
-                        <<"node">> := _,
-                        <<"metrics">> :=
-                            #{<<"matched">> := 0, <<"received">> := 1}
-                    }
-                ]
-        },
-        jsx:decode(BridgeMetricsStr)
+    ?assertMetrics(
+        #{<<"matched">> := 0, <<"received">> := 1},
+        BridgeIDIngress
     ),
 
     {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []).
 
 t_egress_mqtt_bridge_with_rules(_) ->
-    {ok, 201, Bridge} = request(
-        post,
-        uri(["bridges"]),
+    BridgeIDEgress = create_bridge(
         ?SERVER_CONF(<<"user1">>)#{
             <<"type">> => ?TYPE_MQTT,
             <<"name">> => ?BRIDGE_NAME_EGRESS,
             <<"egress">> => ?EGRESS_CONF
         }
     ),
-    #{<<"type">> := ?TYPE_MQTT, <<"name">> := ?BRIDGE_NAME_EGRESS} = jsx:decode(Bridge),
-    BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
 
     {ok, 201, Rule} = request(
         post,
@@ -734,18 +575,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
     %% the remote broker is also the local one.
     emqx:publish(emqx_message:make(LocalTopic, Payload)),
     %% we should receive a message on the "remote" broker, with specified topic
-    ?assert(
-        receive
-            {deliver, RemoteTopic, #message{payload = Payload}} ->
-                ct:pal("remote broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
-                true;
-            Msg ->
-                ct:pal("Msg: ~p", [Msg]),
-                false
-        after 100 ->
-            false
-        end
-    ),
+    assert_mqtt_msg_received(RemoteTopic, Payload),
     emqx:unsubscribe(RemoteTopic),
 
     %% PUBLISH a message to the rule.
@@ -780,35 +610,12 @@ t_egress_mqtt_bridge_with_rules(_) ->
     ),
 
     %% we should receive a message on the "remote" broker, with specified topic
-    ?assert(
-        receive
-            {deliver, RemoteTopic2, #message{payload = Payload2}} ->
-                ct:pal("remote broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]),
-                true;
-            Msg ->
-                ct:pal("Msg: ~p", [Msg]),
-                false
-        after 100 ->
-            false
-        end
-    ),
+    assert_mqtt_msg_received(RemoteTopic2, Payload2),
 
     %% verify the metrics of the bridge
-    {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
-    ?assertMatch(
-        #{
-            <<"metrics">> := #{<<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0},
-            <<"node_metrics">> :=
-                [
-                    #{
-                        <<"node">> := _,
-                        <<"metrics">> := #{
-                            <<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0
-                        }
-                    }
-                ]
-        },
-        jsx:decode(BridgeMetricsStr)
+    ?assertMetrics(
+        #{<<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0},
+        BridgeIDEgress
     ),
 
     {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
@@ -817,10 +624,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
 t_mqtt_conn_bridge_egress_reconnect(_) ->
     %% then we add a mqtt connector, using POST
     User1 = <<"user1">>,
-
-    {ok, 201, Bridge} = request(
-        post,
-        uri(["bridges"]),
+    BridgeIDEgress = create_bridge(
         ?SERVER_CONF(User1)#{
             <<"type">> => ?TYPE_MQTT,
             <<"name">> => ?BRIDGE_NAME_EGRESS,
@@ -837,17 +641,14 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
             }
         }
     ),
-    #{
-        <<"type">> := ?TYPE_MQTT,
-        <<"name">> := ?BRIDGE_NAME_EGRESS
-    } = jsx:decode(Bridge),
-    BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
+
     on_exit(fun() ->
         %% delete the bridge
         {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
         {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
         ok
     end),
+
     %% we now test if the bridge works as expected
     LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
     RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
@@ -862,20 +663,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
     assert_mqtt_msg_received(RemoteTopic, Payload0),
 
     %% verify the metrics of the bridge
-    {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
-    ?assertMatch(
-        #{
-            <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
-            <<"node_metrics">> :=
-                [
-                    #{
-                        <<"node">> := _,
-                        <<"metrics">> :=
-                            #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}
-                    }
-                ]
-        },
-        jsx:decode(BridgeMetricsStr)
+    ?assertMetrics(
+        #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
+        BridgeIDEgress
     ),
 
     %% stop the listener 1883 to make the bridge disconnected
@@ -906,63 +696,174 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
     {ok, _} = snabbkaffe:receive_events(SRef),
 
     %% verify the metrics of the bridge, the message should be queued
-    {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
-    {ok, 200, BridgeMetricsStr1} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
-    Decoded1 = jsx:decode(BridgeStr1),
-    DecodedMetrics1 = jsx:decode(BridgeMetricsStr1),
     ?assertMatch(
-        Status when (Status == <<"connecting">> orelse Status == <<"disconnected">>),
-        maps:get(<<"status">>, Decoded1)
+        #{<<"status">> := Status} when Status == <<"connecting">>; Status == <<"disconnected">>,
+        request_bridge(BridgeIDEgress)
     ),
     %% matched >= 3 because of possible retries.
-    ?assertMatch(
+    ?assertMetrics(
         #{
             <<"matched">> := Matched,
             <<"success">> := 1,
             <<"failed">> := 0,
             <<"queuing">> := Queuing,
             <<"inflight">> := Inflight
-        } when Matched >= 3 andalso Inflight + Queuing == 2,
-        maps:get(<<"metrics">>, DecodedMetrics1)
+        },
+        Matched >= 3 andalso Inflight + Queuing == 2,
+        BridgeIDEgress
     ),
 
     %% start the listener 1883 to make the bridge reconnected
     ok = emqx_listeners:start_listener('tcp:default'),
     timer:sleep(1500),
     %% verify the metrics of the bridge, the 2 queued messages should have been sent
-    {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []),
-    {ok, 200, BridgeMetricsStr2} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
-    Decoded2 = jsx:decode(BridgeStr2),
-    ?assertEqual(<<"connected">>, maps:get(<<"status">>, Decoded2)),
+    ?assertMatch(#{<<"status">> := <<"connected">>}, request_bridge(BridgeIDEgress)),
     %% matched >= 3 because of possible retries.
-    ?assertMatch(
+    ?assertMetrics(
         #{
-            <<"metrics">> := #{
-                <<"matched">> := Matched,
-                <<"success">> := 3,
-                <<"failed">> := 0,
-                <<"queuing">> := 0,
-                <<"retried">> := _
-            }
-        } when Matched >= 3,
-        jsx:decode(BridgeMetricsStr2)
+            <<"matched">> := Matched,
+            <<"success">> := 3,
+            <<"failed">> := 0,
+            <<"queuing">> := 0,
+            <<"retried">> := _
+        },
+        Matched >= 3,
+        BridgeIDEgress
     ),
     %% also verify the 2 messages have been sent to the remote broker
     assert_mqtt_msg_received(RemoteTopic, Payload1),
     assert_mqtt_msg_received(RemoteTopic, Payload2),
     ok.
 
+t_mqtt_conn_bridge_egress_async_reconnect(_) ->
+    User1 = <<"user1">>,
+    BridgeIDEgress = create_bridge(
+        ?SERVER_CONF(User1)#{
+            <<"type">> => ?TYPE_MQTT,
+            <<"name">> => ?BRIDGE_NAME_EGRESS,
+            <<"egress">> => ?EGRESS_CONF,
+            <<"resource_opts">> => #{
+                <<"worker_pool_size">> => 2,
+                <<"query_mode">> => <<"async">>,
+                %% using a long time so we can test recovery
+                <<"request_timeout">> => <<"15s">>,
+                %% to make it check the healthy quickly
+                <<"health_check_interval">> => <<"0.5s">>,
+                %% to make it reconnect quickly
+                <<"auto_restart_interval">> => <<"1s">>
+            }
+        }
+    ),
+
+    on_exit(fun() ->
+        %% delete the bridge
+        {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
+        {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+        ok
+    end),
+
+    Self = self(),
+    LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
+    RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
+    emqx:subscribe(RemoteTopic),
+
+    Publisher = start_publisher(LocalTopic, 200, Self),
+    ct:sleep(1000),
+
+    %% stop the listener 1883 to make the bridge disconnected
+    ok = emqx_listeners:stop_listener('tcp:default'),
+    ct:sleep(1500),
+    ?assertMatch(
+        #{<<"status">> := Status} when Status == <<"connecting">>; Status == <<"disconnected">>,
+        request_bridge(BridgeIDEgress)
+    ),
+
+    %% start the listener 1883 to make the bridge reconnected
+    ok = emqx_listeners:start_listener('tcp:default'),
+    timer:sleep(1500),
+    ?assertMatch(
+        #{<<"status">> := <<"connected">>},
+        request_bridge(BridgeIDEgress)
+    ),
+
+    N = stop_publisher(Publisher),
+
+    %% all those messages should eventually be delivered
+    [
+        assert_mqtt_msg_received(RemoteTopic, Payload)
+     || I <- lists:seq(1, N),
+        Payload <- [integer_to_binary(I)]
+    ],
+
+    ok.
+
+start_publisher(Topic, Interval, CtrlPid) ->
+    spawn_link(fun() -> publisher(Topic, 1, Interval, CtrlPid) end).
+
+stop_publisher(Pid) ->
+    _ = Pid ! {self(), stop},
+    receive
+        {Pid, N} -> N
+    after 1_000 -> ct:fail("publisher ~p did not stop", [Pid])
+    end.
+
+publisher(Topic, N, Delay, CtrlPid) ->
+    _ = emqx:publish(emqx_message:make(Topic, integer_to_binary(N))),
+    receive
+        {CtrlPid, stop} ->
+            CtrlPid ! {self(), N}
+    after Delay ->
+        publisher(Topic, N + 1, Delay, CtrlPid)
+    end.
+
+%%
+
+assert_mqtt_msg_received(Topic) ->
+    assert_mqtt_msg_received(Topic, '_', 200).
+
 assert_mqtt_msg_received(Topic, Payload) ->
-    ct:pal("checking if ~p has been received on ~p", [Payload, Topic]),
+    assert_mqtt_msg_received(Topic, Payload, 200).
+
+assert_mqtt_msg_received(Topic, Payload, Timeout) ->
     receive
-        {deliver, Topic, #message{payload = Payload}} ->
-            ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]),
-            ok
-    after 300 ->
+        {deliver, Topic, Msg = #message{}} when Payload == '_' ->
+            ct:pal("received mqtt ~p on topic ~p", [Msg, Topic]),
+            Msg;
+        {deliver, Topic, Msg = #message{payload = Payload}} ->
+            ct:pal("received mqtt ~p on topic ~p", [Msg, Topic]),
+            Msg
+    after Timeout ->
         {messages, Messages} = process_info(self(), messages),
-        Msg = io_lib:format("timeout waiting for ~p on topic ~p", [Payload, Topic]),
-        error({Msg, #{messages => Messages}})
+        ct:fail("timeout waiting ~p ms for ~p on topic '~s', messages = ~0p", [
+            Timeout,
+            Payload,
+            Topic,
+            Messages
+        ])
     end.
 
+create_bridge(Config = #{<<"type">> := Type, <<"name">> := Name}) ->
+    {ok, 201, Bridge} = request(
+        post,
+        uri(["bridges"]),
+        Config
+    ),
+    ?assertMatch(
+        #{
+            <<"type">> := Type,
+            <<"name">> := Name
+        },
+        jsx:decode(Bridge)
+    ),
+    emqx_bridge_resource:bridge_id(Type, Name).
+
+request_bridge(BridgeID) ->
+    {ok, 200, Bridge} = request(get, uri(["bridges", BridgeID]), []),
+    jsx:decode(Bridge).
+
+request_bridge_metrics(BridgeID) ->
+    {ok, 200, BridgeMetrics} = request(get, uri(["bridges", BridgeID, "metrics"]), []),
+    jsx:decode(BridgeMetrics).
+
 request(Method, Url, Body) ->
     request(<<"connector_admin">>, Method, Url, Body).

+ 3 - 2
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -198,8 +198,9 @@ on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) ->
     ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
     case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of
         ok ->
-            % TODO this is racy
-            {ok, emqx_connector_mqtt_worker:pid(InstanceId)};
+            ok;
+        {ok, Pid} ->
+            {ok, Pid};
         {error, Reason} ->
             classify_error(Reason)
     end.

+ 42 - 36
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl

@@ -67,8 +67,7 @@
 %% APIs
 -export([
     start_link/2,
-    stop/1,
-    pid/1
+    stop/1
 ]).
 
 %% management APIs
@@ -175,7 +174,7 @@ mk_client_event_handler(undefined, _Opts) ->
 
 connect(Name) ->
     #{subscriptions := Subscriptions} = get_config(Name),
-    case emqtt:connect(pid(Name)) of
+    case emqtt:connect(get_pid(Name)) of
         {ok, Properties} ->
             case subscribe_remote_topics(Name, Subscriptions) of
                 ok ->
@@ -206,37 +205,28 @@ subscribe_remote_topics(_Ref, undefined) ->
 stop(Ref) ->
     emqtt:stop(ref(Ref)).
 
-pid(Name) ->
-    gproc:lookup_pid(?NAME(Name)).
-
 status(Ref) ->
-    trycall(
-        fun() ->
-            Info = emqtt:info(ref(Ref)),
-            case proplists:get_value(socket, Info) of
-                Socket when Socket /= undefined ->
-                    connected;
-                undefined ->
-                    connecting
-            end
-        end,
-        #{noproc => disconnected}
-    ).
+    try
+        Info = emqtt:info(ref(Ref)),
+        case proplists:get_value(socket, Info) of
+            Socket when Socket /= undefined ->
+                connected;
+            undefined ->
+                connecting
+        end
+    catch
+        exit:{noproc, _} ->
+            disconnected
+    end.
 
 ping(Ref) ->
     emqtt:ping(ref(Ref)).
 
 send_to_remote(Name, MsgIn) ->
-    trycall(
-        fun() -> do_send(Name, export_msg(Name, MsgIn)) end,
-        #{
-            badarg => {error, disconnected},
-            noproc => {error, disconnected}
-        }
-    ).
+    trycall(fun() -> do_send(Name, export_msg(Name, MsgIn)) end).
 
 do_send(Name, {true, Msg}) ->
-    case emqtt:publish(pid(Name), Msg) of
+    case emqtt:publish(get_pid(Name), Msg) of
         ok ->
             ok;
         {ok, #{reason_code := RC}} when
@@ -263,13 +253,16 @@ do_send(_Name, false) ->
     ok.
 
 send_to_remote_async(Name, MsgIn, Callback) ->
-    trycall(
-        fun() -> do_send_async(Name, export_msg(Name, MsgIn), Callback) end,
-        #{badarg => {error, disconnected}}
-    ).
+    trycall(fun() -> do_send_async(Name, export_msg(Name, MsgIn), Callback) end).
 
 do_send_async(Name, {true, Msg}, Callback) ->
-    emqtt:publish_async(pid(Name), Msg, _Timeout = infinity, Callback);
+    Pid = get_pid(Name),
+    case emqtt:publish_async(Pid, Msg, _Timeout = infinity, Callback) of
+        ok ->
+            {ok, Pid};
+        {error, _} = Error ->
+            Error
+    end;
 do_send_async(_Name, false, _Callback) ->
     ok.
 
@@ -278,14 +271,14 @@ ref(Pid) when is_pid(Pid) ->
 ref(Term) ->
     ?REF(Term).
 
-trycall(Fun, Else) ->
+trycall(Fun) ->
     try
         Fun()
     catch
-        error:badarg ->
-            maps:get(badarg, Else);
+        throw:noproc ->
+            {error, disconnected};
         exit:{noproc, _} ->
-            maps:get(noproc, Else)
+            {error, disconnected}
     end.
 
 format_mountpoint(undefined) ->
@@ -325,8 +318,21 @@ pre_process_conf(Key, Conf) ->
             Conf#{Key => Val}
     end.
 
+get_pid(Name) ->
+    case gproc:where(?NAME(Name)) of
+        Pid when is_pid(Pid) ->
+            Pid;
+        undefined ->
+            throw(noproc)
+    end.
+
 get_config(Name) ->
-    gproc:lookup_value(?NAME(Name)).
+    try
+        gproc:lookup_value(?NAME(Name))
+    catch
+        error:badarg ->
+            throw(noproc)
+    end.
 
 export_msg(Name, Msg) ->
     case get_config(Name) of

+ 2 - 6
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -625,7 +625,7 @@ t_query_counter_async_inflight_batch(_) ->
             %% this will block the resource_worker as the inflight window is full now
             {ok, {ok, _}} =
                 ?wait_async_action(
-                    emqx_resource:query(?ID, {inc_counter, 2}),
+                    emqx_resource:query(?ID, {inc_counter, 2}, ReqOpts()),
                     #{?snk_kind := buffer_worker_flush_but_inflight_full},
                     5_000
                 ),
@@ -635,11 +635,7 @@ t_query_counter_async_inflight_batch(_) ->
         []
     ),
 
-    %% NOTE
-    %% The query above won't affect the size of the results table for some reason,
-    %% it's not clear if this is expected behaviour. Only the `async_reply_fun`
-    %% defined below will be called for the whole batch consisting of 2 increments.
-    Sent2 = Sent1 + 0,
+    Sent2 = Sent1 + 1,
 
     tap_metrics(?LINE),
     %% send query now will fail because the resource is blocked.