Przeglądaj źródła

Merge pull request #7673 from terry-xiaoyu/mqtt_bridge_issues

fix: improve the error logs if update bridge failed
Xinyu Liu 3 lat temu
rodzic
commit
f8d046b259

+ 1 - 1
apps/emqx/src/emqx_config_handler.erl

@@ -205,7 +205,7 @@ handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
                 key_path => ConfKeyPath,
                 key_path => ConfKeyPath,
                 stacktrace => ST
                 stacktrace => ST
             }),
             }),
-            {error, config_update_crashed}
+            {error, {config_update_crashed, Reason}}
     end.
     end.
 
 
 do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
 do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->

+ 10 - 1
apps/emqx/test/emqx_common_test_helpers.erl

@@ -150,7 +150,7 @@ start_apps(Apps, Handler) when is_function(Handler) ->
     %% Load all application code to beam vm first
     %% Load all application code to beam vm first
     %% Because, minirest, ekka etc.. application will scan these modules
     %% Because, minirest, ekka etc.. application will scan these modules
     lists:foreach(fun load/1, [emqx | Apps]),
     lists:foreach(fun load/1, [emqx | Apps]),
-    ekka:start(),
+    ok = start_ekka(),
     ok = emqx_ratelimiter_SUITE:base_conf(),
     ok = emqx_ratelimiter_SUITE:base_conf(),
     lists:foreach(fun(App) -> start_app(App, Handler) end, [emqx | Apps]).
     lists:foreach(fun(App) -> start_app(App, Handler) end, [emqx | Apps]).
 
 
@@ -484,3 +484,12 @@ is_tcp_server_available(Host, Port, Timeout) ->
         {error, _} ->
         {error, _} ->
             false
             false
     end.
     end.
+
+start_ekka() ->
+    try mnesia_hook:module_info() of
+        _ -> ekka:start()
+    catch _:_ ->
+        %% Falling back to using Mnesia DB backend.
+        application:set_env(mria, db_backend, mnesia),
+        ekka:start()
+    end.

+ 1 - 1
apps/emqx/test/emqx_config_handler_SUITE.erl

@@ -223,7 +223,7 @@ t_callback_crash(_Config) ->
     Opts = #{rawconf_with_defaults => true},
     Opts = #{rawconf_with_defaults => true},
     ok = emqx_config_handler:add_handler(CrashPath, ?MODULE),
     ok = emqx_config_handler:add_handler(CrashPath, ?MODULE),
     Old = emqx:get_raw_config(CrashPath),
     Old = emqx:get_raw_config(CrashPath),
-    ?assertEqual({error, config_update_crashed}, emqx:update_config(CrashPath, <<"89%">>, Opts)),
+    ?assertMatch({error, {config_update_crashed, _}}, emqx:update_config(CrashPath, <<"89%">>, Opts)),
     New = emqx:get_raw_config(CrashPath),
     New = emqx:get_raw_config(CrashPath),
     ?assertEqual(Old, New),
     ?assertEqual(Old, New),
     ok = emqx_config_handler:remove_handler(CrashPath),
     ok = emqx_config_handler:remove_handler(CrashPath),

+ 1 - 0
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -207,6 +207,7 @@ basic_config(#{
         replayq => ReplayQ,
         replayq => ReplayQ,
         %% connection opts
         %% connection opts
         server => Server,
         server => Server,
+        connect_timeout => 30, %% 30s
         reconnect_interval => ReconnIntv,
         reconnect_interval => ReconnIntv,
         proto_ver => ProtoVer,
         proto_ver => ProtoVer,
         bridge_mode => true,
         bridge_mode => true,

+ 9 - 5
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl

@@ -52,7 +52,8 @@ start(Config) ->
     Mountpoint = maps:get(receive_mountpoint, Config, undefined),
     Mountpoint = maps:get(receive_mountpoint, Config, undefined),
     Subscriptions = maps:get(subscriptions, Config, undefined),
     Subscriptions = maps:get(subscriptions, Config, undefined),
     Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions),
     Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions),
-    Handlers = make_hdlr(Parent, Vars, #{server => ip_port_to_server(Host, Port)}),
+    ServerStr = ip_port_to_server_str(Host, Port),
+    Handlers = make_hdlr(Parent, Vars, #{server => ServerStr}),
     Config1 = Config#{
     Config1 = Config#{
         msg_handler => Handlers,
         msg_handler => Handlers,
         host => Host,
         host => Host,
@@ -70,16 +71,19 @@ start(Config) ->
                     catch
                     catch
                         throw : Reason ->
                         throw : Reason ->
                             ok = stop(#{client_pid => Pid}),
                             ok = stop(#{client_pid => Pid}),
-                            {error, Reason}
+                            {error, error_reason(Reason, ServerStr)}
                     end;
                     end;
                 {error, Reason} ->
                 {error, Reason} ->
                     ok = stop(#{client_pid => Pid}),
                     ok = stop(#{client_pid => Pid}),
-                    {error, Reason}
+                    {error, error_reason(Reason, ServerStr)}
             end;
             end;
         {error, Reason} ->
         {error, Reason} ->
-            {error, Reason}
+            {error, error_reason(Reason, ServerStr)}
     end.
     end.
 
 
+error_reason(Reason, ServerStr) ->
+    #{reason => Reason, server => ServerStr}.
+
 stop(#{client_pid := Pid}) ->
 stop(#{client_pid := Pid}) ->
     safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000),
     safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000),
     ok.
     ok.
@@ -238,7 +242,7 @@ printable_maps(Headers) ->
             (K, V0, AccIn) -> AccIn#{K => V0}
             (K, V0, AccIn) -> AccIn#{K => V0}
         end, #{}, Headers).
         end, #{}, Headers).
 
 
-ip_port_to_server(Host, Port) ->
+ip_port_to_server_str(Host, Port) ->
     HostStr = case inet:ntoa(Host) of
     HostStr = case inet:ntoa(Host) of
         {error, einval} -> Host;
         {error, einval} -> Host;
         IPStr -> IPStr
         IPStr -> IPStr

+ 4 - 3
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -366,9 +366,10 @@ t_mqtt_conn_update(_) ->
     BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
     BridgeIDEgress = emqx_bridge:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
     wait_for_resource_ready(BridgeIDEgress, 5),
     wait_for_resource_ready(BridgeIDEgress, 5),
 
 
-    %% then we try to update 'server' of the connector, to an unavailable IP address
-    %% the update should fail because of 'unreachable' or 'connrefused'
-    {ok, 500, _ErrorMsg} = request(put, uri(["connectors", ConnctorID]),
+    %% Then we try to update 'server' of the connector, to an unavailable IP address
+    %% The update OK, we recreate the resource even if the resource is current connected,
+    %% and the target resource we're going to update is unavailable.
+    {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
                                  ?MQTT_CONNECTOR2(<<"127.0.0.1:2603">>)),
                                  ?MQTT_CONNECTOR2(<<"127.0.0.1:2603">>)),
     %% we fix the 'server' parameter to a normal one, it should work
     %% we fix the 'server' parameter to a normal one, it should work
     {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
     {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),

+ 2 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -166,7 +166,8 @@ create_dry_run(ResourceType, Config) ->
 -spec create_dry_run_local(resource_type(), resource_config()) ->
 -spec create_dry_run_local(resource_type(), resource_config()) ->
     ok | {error, Reason :: term()}.
     ok | {error, Reason :: term()}.
 create_dry_run_local(ResourceType, Config) ->
 create_dry_run_local(ResourceType, Config) ->
-    call_instance(<<?TEST_ID_PREFIX>>, {create_dry_run, ResourceType, Config}).
+    RandId = iolist_to_binary(emqx_misc:gen_id(16)),
+    call_instance(RandId, {create_dry_run, ResourceType, Config}).
 
 
 -spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
 -spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
     {ok, resource_data()} | {error, Reason :: term()}.

+ 3 - 10
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -166,16 +166,9 @@ code_change(_OldVsn, State, _Extra) ->
 
 
 do_recreate(InstId, ResourceType, NewConfig, Opts) ->
 do_recreate(InstId, ResourceType, NewConfig, Opts) ->
     case lookup(InstId) of
     case lookup(InstId) of
-        {ok, Group, #{mod := ResourceType, status := connected} = Data} ->
-            %% If this resource is in use (status='connected'), we should make sure
-            %% the new config is OK before removing the old one.
-            case do_create_dry_run(ResourceType, NewConfig) of
-                ok ->
-                    do_remove(Group, Data, false),
-                    do_create(InstId, Group, ResourceType, NewConfig, Opts);
-                Error ->
-                    Error
-            end;
+        %% We recreate the resource no matter if it is connected and in use!
+        %% As we can not know if the resource is "really disconnected" or we mark the status
+        %% to "disconnected" because the emqx_resource_instance process is not responding.
         {ok, Group, #{mod := ResourceType, status := _} = Data} ->
         {ok, Group, #{mod := ResourceType, status := _} = Data} ->
             do_remove(Group, Data, false),
             do_remove(Group, Data, false),
             do_create(InstId, Group, ResourceType, NewConfig, Opts);
             do_create(InstId, Group, ResourceType, NewConfig, Opts);