Parcourir la source

fix(bridge): stop http failed due to econnrefused

Shawn il y a 4 ans
Parent
commit
11e8e0db69

+ 28 - 10
apps/emqx_bridge/src/emqx_bridge.erl

@@ -203,7 +203,7 @@ create(Type, Name, Conf) ->
         {error, Reason} -> {error, Reason}
     end.
 
-update(Type, Name, {_OldConf, Conf}) ->
+update(Type, Name, {OldConf, Conf}) ->
     %% TODO: sometimes its not necessary to restart the bridge connection.
     %%
     %% - if the connection related configs like `servers` is updated, we should restart/start
@@ -212,15 +212,22 @@ update(Type, Name, {_OldConf, Conf}) ->
     %% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated
     %% without restarting the bridge.
     %%
-    ?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
-        config => Conf}),
-    case recreate(Type, Name, Conf) of
-        {ok, _} -> maybe_disable_bridge(Type, Name, Conf);
-        {error, not_found} ->
-            ?SLOG(warning, #{ msg => "updating a non-exist bridge, create a new one"
-                            , type => Type, name => Name, config => Conf}),
-            create(Type, Name, Conf);
-        {error, _} = Err -> Err
+    case if_only_to_toggole_enable(OldConf, Conf) of
+        false ->
+            ?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
+                config => Conf}),
+            case recreate(Type, Name, Conf) of
+                {ok, _} -> maybe_disable_bridge(Type, Name, Conf);
+                {error, not_found} ->
+                    ?SLOG(warning, #{ msg => "updating a non-exist bridge, create a new one"
+                                    , type => Type, name => Name, config => Conf}),
+                    create(Type, Name, Conf);
+                {error, Reason} -> {update_bridge_failed, Reason}
+            end;
+        true ->
+            %% we don't need to recreate the bridge if this config change is only to
+            %% toggole the config 'bridge.{type}.{name}.enable'
+            ok
     end.
 
 recreate(Type, Name) ->
@@ -344,6 +351,17 @@ maybe_disable_bridge(Type, Name, Conf) ->
         true -> ok
     end.
 
+if_only_to_toggole_enable(OldConf, Conf) ->
+    #{added := Added, removed := Removed, changed := Updated} =
+        emqx_map_lib:diff_maps(OldConf, Conf),
+    case {Added, Removed, Updated} of
+        {Added, Removed, #{enable := _}= Updated}
+            when map_size(Added) =:= 0,
+                 map_size(Removed) =:= 0,
+                 map_size(Updated) =:= 1 -> true;
+        {_, _, _} -> false
+    end.
+
 bin(Bin) when is_binary(Bin) -> Bin;
 bin(Str) when is_list(Str) -> list_to_binary(Str);
 bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -334,7 +334,7 @@ lookup_from_local_node(BridgeType, BridgeName) ->
         invalid -> {404, error_msg('BAD_ARG', <<"invalid operation">>)};
         UpReq ->
             case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
-                    UpReq, #{override_to => cluster}) of
+                    {UpReq, BridgeType, BridgeName}, #{override_to => cluster}) of
                 {ok, _} -> {200};
                 {error, {pre_config_update, _, bridge_not_found}} ->
                     {404, error_msg('NOT_FOUND', <<"bridge not found">>)};

+ 12 - 3
apps/emqx_bridge/src/emqx_bridge_app.erl

@@ -40,10 +40,15 @@ stop(_State) ->
     ok.
 
 -define(IS_OPER(O), when Oper == start; Oper == stop; Oper == restart).
-pre_config_update(_, Oper, undefined) ?IS_OPER(Oper) ->
+pre_config_update(_, {Oper, _, _}, undefined) ?IS_OPER(Oper) ->
     {error, bridge_not_found};
-pre_config_update(_, Oper, OldConfig) ?IS_OPER(Oper) ->
-    {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
+pre_config_update(_, {Oper, Type, Name}, OldConfig) ?IS_OPER(Oper) ->
+    case perform_operation(Oper, Type, Name) of
+        ok ->
+            %% we also need to save the 'enable' to the config files
+            {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
+        {error, _} = Err -> Err
+    end;
 pre_config_update(_, Conf, _OldConfig) ->
     {ok, Conf}.
 
@@ -51,3 +56,7 @@ pre_config_update(_, Conf, _OldConfig) ->
 operation_to_enable(start) -> true;
 operation_to_enable(stop) -> false;
 operation_to_enable(restart) -> true.
+
+perform_operation(start, Type, Name) -> emqx_bridge:restart(Type, Name);
+perform_operation(restart, Type, Name) -> emqx_bridge:restart(Type, Name);
+perform_operation(stop, Type, Name) -> emqx_bridge:stop(Type, Name).

+ 21 - 11
apps/emqx_connector/src/emqx_connector_http.erl

@@ -169,14 +169,20 @@ on_start(InstId, #{base_url := #{scheme := Scheme,
         pool_name => PoolName,
         host => Host,
         port => Port,
+        connect_timeout => ConnectTimeout,
         base_path => BasePath,
         request => preprocess_request(maps:get(request, Config, undefined))
     },
-    case ehttpc_sup:start_pool(PoolName, PoolOpts) of
-        {ok, _} -> {ok, State};
-        {error, {already_started, _}} -> {ok, State};
+    case do_health_check(Host, Port, ConnectTimeout) of
+        ok ->
+            case ehttpc_sup:start_pool(PoolName, PoolOpts) of
+                {ok, _} -> {ok, State};
+                {error, {already_started, _}} -> {ok, State};
+                {error, Reason} ->
+                    {error, Reason}
+            end;
         {error, Reason} ->
-            {error, Reason}
+            {error, {http_start_failed, Reason}}
     end.
 
 on_stop(InstId, #{pool_name := PoolName}) ->
@@ -216,13 +222,17 @@ on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery,
     end,
     Result.
 
-on_health_check(_InstId, #{host := Host, port := Port} = State) ->
-    case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), 3000) of
-        {ok, Sock} ->
-            gen_tcp:close(Sock),
-            {ok, State};
-        {error, _Reason} ->
-            {error, test_query_failed, State}
+on_health_check(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} = State) ->
+    case do_health_check(Host, Port, Timeout) of
+        ok -> {ok, State};
+        {error, Reason} ->
+            {error, {http_health_check_failed, Reason}, State}
+    end.
+
+do_health_check(Host, Port, Timeout) ->
+    case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of
+        {ok, Sock} -> gen_tcp:close(Sock), ok;
+        {error, Reason} -> {error, Reason}
     end.
 
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -188,7 +188,7 @@ query(InstId, Request) ->
 query(InstId, Request, AfterQuery) ->
     case get_instance(InstId) of
         {ok, #{status := stopped}} ->
-            error({InstId, stopped});
+            error({resource_stopped, InstId});
         {ok, #{mod := Mod, state := ResourceState, status := started}} ->
             %% the resource state is readonly to Module:on_query/4
             %% and the `after_query()` functions should be thread safe

+ 23 - 20
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -173,18 +173,19 @@ do_create(InstId, ResourceType, Config) ->
     case lookup(InstId) of
         {ok, _} -> {ok, already_created};
         _ ->
+            Res0 = #{id => InstId, mod => ResourceType, config => Config,
+                     status => stopped, state => undefined},
             case emqx_resource:call_start(InstId, ResourceType, Config) of
                 {ok, ResourceState} ->
-                    ets:insert(emqx_resource_instance, {InstId,
-                        #{mod => ResourceType, config => Config,
-                          state => ResourceState, status => stopped}}),
-                    _ = do_health_check(InstId),
                     ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId),
+                    %% this is the first time we do health check, this will update the
+                    %% status and then do ets:insert/2
+                    _ = do_health_check(Res0#{state => ResourceState}),
                     {ok, force_lookup(InstId)};
                 {error, Reason} ->
                     logger:error("start ~ts resource ~ts failed: ~p",
                                  [ResourceType, InstId, Reason]),
-                    {error, Reason}
+                    {ok, Res0}
             end
     end.
 
@@ -243,22 +244,24 @@ do_stop(InstId) ->
             Error
     end.
 
-do_health_check(InstId) ->
+do_health_check(InstId) when is_binary(InstId) ->
     case lookup(InstId) of
-        {ok, #{mod := Mod, state := ResourceState0} = Data} ->
-            case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of
-                {ok, ResourceState1} ->
-                    ets:insert(emqx_resource_instance,
-                        {InstId, Data#{status => started, state => ResourceState1}}),
-                    ok;
-                {error, Reason, ResourceState1} ->
-                    logger:error("health check for ~p failed: ~p", [InstId, Reason]),
-                    ets:insert(emqx_resource_instance,
-                        {InstId, Data#{status => stopped, state => ResourceState1}}),
-                    {error, Reason}
-            end;
-        Error ->
-            Error
+        {ok, Data} -> do_health_check(Data);
+        Error -> Error
+    end;
+do_health_check(#{state := undefined}) ->
+    {error, resource_not_initialized};
+do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) ->
+    case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of
+        {ok, ResourceState1} ->
+            ets:insert(emqx_resource_instance,
+                {InstId, Data#{status => started, state => ResourceState1}}),
+            ok;
+        {error, Reason, ResourceState1} ->
+            logger:error("health check for ~p failed: ~p", [InstId, Reason]),
+            ets:insert(emqx_resource_instance,
+                {InstId, Data#{status => stopped, state => ResourceState1}}),
+            {error, Reason}
     end.
 
 %%------------------------------------------------------------------------------