Explorar el Código

fix(test): wait until the bridge ready

Shawn hace 4 años
padre
commit
e95445728c

+ 5 - 0
apps/emqx_bridge/src/emqx_bridge.erl

@@ -35,6 +35,7 @@
         ]).
 
 -export([ load/0
+        , lookup/1
         , lookup/2
         , lookup/3
         , list/0
@@ -191,6 +192,10 @@ list_bridges_by_connector(ConnectorId) ->
     [B || B = #{raw_config := #{<<"connector">> := Id}} <- list(),
          ConnectorId =:= Id].
 
+lookup(Id) ->
+    {Type, Name} = parse_bridge_id(Id),
+    lookup(Type, Name).
+
 lookup(Type, Name) ->
     RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
     lookup(Type, Name, RawConf).

+ 13 - 0
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -160,6 +160,7 @@ t_http_crud_apis(_) ->
      } = jsx:decode(Bridge),
 
     %% send an message to emqx and the message should be forwarded to the HTTP server
+    wait_for_resource_ready(BridgeID, 5),
     Body = <<"my msg">>,
     emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
     ?assert(
@@ -212,6 +213,7 @@ t_http_crud_apis(_) ->
                   }, jsx:decode(Bridge3Str)),
 
     %% send an message to emqx again, check the path has been changed
+    wait_for_resource_ready(BridgeID, 5),
     emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
     ?assert(
         receive
@@ -320,3 +322,14 @@ auth_header_() ->
 
 operation_path(Oper, BridgeID) ->
     uri(["bridges", BridgeID, "operation", Oper]).
+
+wait_for_resource_ready(InstId, 0) ->
+    ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]),
+    ct:fail(wait_resource_timeout);
+wait_for_resource_ready(InstId, Retry) ->
+    case emqx_bridge:lookup(InstId) of
+        {ok, #{resource_data := #{status := started}}} -> ok;
+        _ ->
+            timer:sleep(100),
+            wait_for_resource_ready(InstId, Retry-1)
+    end.

+ 22 - 1
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -241,6 +241,7 @@ t_mqtt_conn_bridge_ingress(_) ->
     emqx:subscribe(LocalTopic),
     %% PUBLISH a message to the 'remote' broker, as we have only one broker,
     %% the remote broker is also the local one.
+    wait_for_resource_ready(BridgeIDIngress, 5),
     emqx:publish(emqx_message:make(RemoteTopic, Payload)),
     %% we should receive a message on the local broker, with specified topic
     ?assert(
@@ -309,6 +310,7 @@ t_mqtt_conn_bridge_egress(_) ->
     emqx:subscribe(RemoteTopic),
     %% PUBLISH a message to the 'local' broker, as we have only one broker,
     %% the remote broker is also the local one.
+    wait_for_resource_ready(BridgeIDEgress, 5),
     emqx:publish(emqx_message:make(LocalTopic, Payload)),
 
     %% we should receive a message on the "remote" broker, with specified topic
@@ -370,6 +372,7 @@ t_mqtt_conn_update(_) ->
      , <<"status">> := <<"connected">>
      , <<"connector">> := ConnctorID
      } = jsx:decode(Bridge),
+    wait_for_resource_ready(BridgeIDEgress, 2),
 
     %% then we try to update 'server' of the connector, to an unavailable IP address
     %% the update should fail because of 'unreachable' or 'connrefused'
@@ -412,6 +415,11 @@ t_mqtt_conn_update2(_) ->
      , <<"status">> := <<"disconnected">>
      , <<"connector">> := ConnctorID
      } = jsx:decode(Bridge),
+    %% We try to fix the 'server' parameter, to another unavailable server..
+    %% The update should success: we don't check the connectivity of the new config
+    %% if the resource is now disconnected.
+    {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
+                                 ?MQTT_CONNECOTR2(<<"127.0.0.1:2604">>)),
     %% we fix the 'server' parameter to a normal one, it should work
     {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
                                  ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)),
@@ -444,9 +452,9 @@ t_mqtt_conn_update3(_) ->
             <<"name">> => ?BRIDGE_NAME_EGRESS
         }),
     #{ <<"id">> := BridgeIDEgress
-     , <<"status">> := <<"connected">>
      , <<"connector">> := ConnctorID
      } = jsx:decode(Bridge),
+    wait_for_resource_ready(BridgeIDEgress, 2),
 
     %% delete the connector should fail because it is in use by a bridge
     {ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []),
@@ -499,6 +507,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
     emqx:subscribe(LocalTopic),
     %% PUBLISH a message to the 'remote' broker, as we have only one broker,
     %% the remote broker is also the local one.
+    wait_for_resource_ready(BridgeIDIngress, 5),
     emqx:publish(emqx_message:make(RemoteTopic, Payload)),
     %% we should receive a message on the local broker, with specified topic
     ?assert(
@@ -563,6 +572,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
     emqx:subscribe(RemoteTopic),
     %% PUBLISH a message to the 'local' broker, as we have only one broker,
     %% the remote broker is also the local one.
+    wait_for_resource_ready(BridgeIDEgress, 5),
     emqx:publish(emqx_message:make(LocalTopic, Payload)),
     %% we should receive a message on the "remote" broker, with specified topic
     ?assert(
@@ -583,6 +593,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
     RuleTopic = <<"t/1">>,
     RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>,
     emqx:subscribe(RemoteTopic2),
+    wait_for_resource_ready(BridgeIDEgress, 5),
     emqx:publish(emqx_message:make(RuleTopic, Payload2)),
     {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
     #{ <<"id">> := RuleId
@@ -646,3 +657,13 @@ auth_header_() ->
     {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
     {"Authorization", "Bearer " ++ binary_to_list(Token)}.
 
+wait_for_resource_ready(InstId, 0) ->
+    ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]),
+    ct:fail(wait_resource_timeout);
+wait_for_resource_ready(InstId, Retry) ->
+    case emqx_bridge:lookup(InstId) of
+        {ok, #{resource_data := #{status := started}}} -> ok;
+        _ ->
+            timer:sleep(100),
+            wait_for_resource_ready(InstId, Retry-1)
+    end.