zhongwencool 1 год назад
Родитель
Сommit
e148d903e8

+ 1 - 1
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 %% -*- mode: erlang -*-
 {application, emqx_bridge_mqtt, [
 {application, emqx_bridge_mqtt, [
     {description, "EMQX MQTT Broker Bridge"},
     {description, "EMQX MQTT Broker Bridge"},
-    {vsn, "0.2.2"},
+    {vsn, "0.2.3"},
     {registered, []},
     {registered, []},
     {applications, [
     {applications, [
         kernel,
         kernel,

+ 10 - 7
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl

@@ -207,7 +207,7 @@ start_mqtt_clients(ResourceId, Conf) ->
     start_mqtt_clients(ResourceId, Conf, ClientOpts).
     start_mqtt_clients(ResourceId, Conf, ClientOpts).
 
 
 start_mqtt_clients(ResourceId, StartConf, ClientOpts) ->
 start_mqtt_clients(ResourceId, StartConf, ClientOpts) ->
-    PoolName = <<ResourceId/binary>>,
+    PoolName = ResourceId,
     #{
     #{
         pool_size := PoolSize
         pool_size := PoolSize
     } = StartConf,
     } = StartConf,
@@ -227,7 +227,7 @@ start_mqtt_clients(ResourceId, StartConf, ClientOpts) ->
 on_stop(ResourceId, State) ->
 on_stop(ResourceId, State) ->
     ?SLOG(info, #{
     ?SLOG(info, #{
         msg => "stopping_mqtt_connector",
         msg => "stopping_mqtt_connector",
-        connector => ResourceId
+        resource_id => ResourceId
     }),
     }),
     %% on_stop can be called with State = undefined
     %% on_stop can be called with State = undefined
     StateMap =
     StateMap =
@@ -271,7 +271,7 @@ on_query(
 on_query(ResourceId, {_ChannelId, Msg}, #{}) ->
 on_query(ResourceId, {_ChannelId, Msg}, #{}) ->
     ?SLOG(error, #{
     ?SLOG(error, #{
         msg => "forwarding_unavailable",
         msg => "forwarding_unavailable",
-        connector => ResourceId,
+        resource_id => ResourceId,
         message => Msg,
         message => Msg,
         reason => "Egress is not configured"
         reason => "Egress is not configured"
     }).
     }).
@@ -298,7 +298,7 @@ on_query_async(
 on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) ->
 on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) ->
     ?SLOG(error, #{
     ?SLOG(error, #{
         msg => "forwarding_unavailable",
         msg => "forwarding_unavailable",
-        connector => ResourceId,
+        resource_id => ResourceId,
         message => Msg,
         message => Msg,
         reason => "Egress is not configured"
         reason => "Egress is not configured"
     }).
     }).
@@ -463,8 +463,10 @@ connect(Options) ->
         {ok, Pid} ->
         {ok, Pid} ->
             connect(Pid, Name);
             connect(Pid, Name);
         {error, Reason} = Error ->
         {error, Reason} = Error ->
-            ?SLOG(error, #{
+            IsDryRun = emqx_resource:is_dry_run(Name),
+            ?SLOG(?LOG_LEVEL(IsDryRun), #{
                 msg => "client_start_failed",
                 msg => "client_start_failed",
+                resource_id => Name,
                 config => emqx_utils:redact(ClientOpts),
                 config => emqx_utils:redact(ClientOpts),
                 reason => Reason
                 reason => Reason
             }),
             }),
@@ -508,10 +510,11 @@ connect(Pid, Name) ->
         {ok, _Props} ->
         {ok, _Props} ->
             {ok, Pid};
             {ok, Pid};
         {error, Reason} = Error ->
         {error, Reason} = Error ->
-            ?SLOG(warning, #{
+            IsDryRun = emqx_resource:is_dry_run(Name),
+            ?SLOG(?LOG_LEVEL(IsDryRun), #{
                 msg => "ingress_client_connect_failed",
                 msg => "ingress_client_connect_failed",
                 reason => Reason,
                 reason => Reason,
-                name => Name
+                resource_id => Name
             }),
             }),
             _ = catch emqtt:stop(Pid),
             _ = catch emqtt:stop(Pid),
             Error
             Error

+ 1 - 0
apps/emqx_resource/include/emqx_resource.hrl

@@ -159,5 +159,6 @@
 %% See `hocon_tconf`
 %% See `hocon_tconf`
 -define(TEST_ID_PREFIX, "t_probe_").
 -define(TEST_ID_PREFIX, "t_probe_").
 -define(RES_METRICS, resource_metrics).
 -define(RES_METRICS, resource_metrics).
+-define(LOG_LEVEL(_L_), case _L_ of true -> info; false -> warning end).
 
 
 -define(RESOURCE_ALLOCATION_TAB, emqx_resource_allocations).
 -define(RESOURCE_ALLOCATION_TAB, emqx_resource_allocations).

+ 1 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -340,7 +340,7 @@ remove_local(ResId) ->
         Error ->
         Error ->
             %% Only log, the ResId worker is always removed in manager's remove action.
             %% Only log, the ResId worker is always removed in manager's remove action.
             ?SLOG(warning, #{
             ?SLOG(warning, #{
-                msg => "remove_local_resource_failed",
+                msg => "remove_resource_failed",
                 error => Error,
                 error => Error,
                 resource_id => ResId
                 resource_id => ResId
             }),
             }),

+ 7 - 4
apps/emqx_resource/src/emqx_resource_pool.erl

@@ -26,6 +26,7 @@
 ]).
 ]).
 
 
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include("emqx_resource.hrl").
 
 
 -ifndef(TEST).
 -ifndef(TEST).
 -define(HEALTH_CHECK_TIMEOUT, 15000).
 -define(HEALTH_CHECK_TIMEOUT, 15000).
@@ -44,9 +45,10 @@ start(Name, Mod, Options) ->
             start(Name, Mod, Options);
             start(Name, Mod, Options);
         {error, Reason} ->
         {error, Reason} ->
             NReason = parse_reason(Reason),
             NReason = parse_reason(Reason),
-            ?SLOG(error, #{
+            IsDryRun = emqx_resource:is_dry_run(Name),
+            ?SLOG(?LOG_LEVEL(IsDryRun), #{
                 msg => "start_ecpool_error",
                 msg => "start_ecpool_error",
-                pool_name => Name,
+                resource_id => Name,
                 reason => NReason
                 reason => NReason
             }),
             }),
             {error, {start_pool_failed, Name, NReason}}
             {error, {start_pool_failed, Name, NReason}}
@@ -59,9 +61,10 @@ stop(Name) ->
         {error, not_found} ->
         {error, not_found} ->
             ok;
             ok;
         {error, Reason} ->
         {error, Reason} ->
-            ?SLOG(error, #{
+            IsDryRun = emqx_resource:is_dry_run(Name),
+            ?SLOG(?LOG_LEVEL(IsDryRun), #{
                 msg => "stop_ecpool_failed",
                 msg => "stop_ecpool_failed",
-                pool_name => Name,
+                resource_id => Name,
                 reason => Reason
                 reason => Reason
             }),
             }),
             error({stop_pool_failed, Name, Reason})
             error({stop_pool_failed, Name, Reason})