Bladeren bron

refactor(resource): forbid changing resource state from `on_get_status` return

Since we do health checks asynchronously, this opens up too much space for race conditions
where state is clobbered when a connector/resource health check is returned while the
state has been changed by other operations.
Thales Macedo Garitezi 1 jaar geleden
bovenliggende
commit
687e2a3fae
55 gewijzigde bestanden met toevoegingen van 162 en 257 verwijderingen
  1. 1 0
      apps/emqx_auth_jwt/rebar.config
  2. 1 1
      apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src
  3. 3 2
      apps/emqx_auth_jwt/src/emqx_authn_jwks_connector.erl
  4. 1 1
      apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage.app.src
  5. 2 7
      apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl
  6. 1 1
      apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src
  7. 3 2
      apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl
  8. 1 1
      apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src
  9. 5 5
      apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl
  10. 1 1
      apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src
  11. 5 5
      apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl
  12. 1 1
      apps/emqx_bridge_es/src/emqx_bridge_es.app.src
  13. 2 1
      apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl
  14. 1 1
      apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src
  15. 1 1
      apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl
  16. 6 12
      apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl
  17. 1 1
      apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src
  18. 3 2
      apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl
  19. 2 7
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl
  20. 4 4
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
  21. 1 1
      apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src
  22. 10 33
      apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl
  23. 1 1
      apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src
  24. 2 7
      apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl
  25. 1 1
      apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src
  26. 1 6
      apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl
  27. 1 1
      apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src
  28. 6 25
      apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl
  29. 1 1
      apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src
  30. 7 9
      apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl
  31. 1 1
      apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src
  32. 7 7
      apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl
  33. 1 1
      apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src
  34. 3 3
      apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl
  35. 1 1
      apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src
  36. 3 2
      apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl
  37. 1 1
      apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src
  38. 6 11
      apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl
  39. 1 1
      apps/emqx_cluster_link/src/emqx_cluster_link.app.src
  40. 2 2
      apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl
  41. 1 1
      apps/emqx_connector/test/emqx_connector_api_SUITE.erl
  42. 1 1
      apps/emqx_ldap/src/emqx_ldap.app.src
  43. 4 3
      apps/emqx_ldap/src/emqx_ldap.erl
  44. 2 1
      apps/emqx_ldap/src/emqx_ldap_bind_worker.erl
  45. 1 1
      apps/emqx_mongodb/src/emqx_mongodb.app.src
  46. 4 3
      apps/emqx_mongodb/src/emqx_mongodb.erl
  47. 1 1
      apps/emqx_mysql/src/emqx_mysql.app.src
  48. 5 18
      apps/emqx_mysql/src/emqx_mysql.erl
  49. 1 1
      apps/emqx_postgresql/src/emqx_postgresql.app.src
  50. 16 29
      apps/emqx_postgresql/src/emqx_postgresql.erl
  51. 1 1
      apps/emqx_redis/src/emqx_redis.app.src
  52. 14 14
      apps/emqx_redis/src/emqx_redis.erl
  53. 1 2
      apps/emqx_resource/src/emqx_resource.erl
  54. 7 9
      apps/emqx_resource/src/emqx_resource_manager.erl
  55. 1 1
      apps/emqx_resource/test/emqx_connector_demo.erl

+ 1 - 0
apps/emqx_auth_jwt/rebar.config

@@ -3,5 +3,6 @@
 {deps, [
     {emqx, {path, "../emqx"}},
     {emqx_utils, {path, "../emqx_utils"}},
+    {emqx_resource, {path, "../emqx_resource"}},
     {emqx_auth, {path, "../emqx_auth"}}
 ]}.

+ 1 - 1
apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth_jwt, [
     {description, "EMQX JWT Authentication and Authorization"},
-    {vsn, "0.3.4"},
+    {vsn, "0.3.5"},
     {registered, []},
     {mod, {emqx_auth_jwt_app, []}},
     {applications, [

+ 3 - 2
apps/emqx_auth_jwt/src/emqx_authn_jwks_connector.erl

@@ -19,6 +19,7 @@
 -behaviour(emqx_resource).
 
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 
 %% callbacks of behaviour emqx_resource
 -export([
@@ -75,8 +76,8 @@ on_query(_InstId, {update, Opts}, #{pool_name := PoolName}) ->
 
 on_get_status(_InstId, #{pool_name := PoolName}) ->
     case emqx_resource_pool:health_check_workers(PoolName, fun health_check/1) of
-        true -> connected;
-        false -> disconnected
+        true -> ?status_connected;
+        false -> ?status_disconnected
     end.
 
 health_check(Conn) ->

+ 1 - 1
apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_azure_blob_storage, [
     {description, "EMQX Enterprise Azure Blob Storage Bridge"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, [emqx_bridge_azure_blob_storage_sup]},
     {applications, [
         kernel,

+ 2 - 7
apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl

@@ -184,13 +184,8 @@ on_stop(_ConnResId, _ConnState) ->
 
 -spec on_get_status(connector_resource_id(), connector_state()) ->
     ?status_connected | ?status_disconnected | {?status_disconnected, connector_state(), term()}.
-on_get_status(_ConnResId, ConnState = #{driver_state := DriverState}) ->
-    case health_check(DriverState) of
-        {Status, Message} ->
-            {Status, ConnState, Message};
-        Status when is_atom(Status) ->
-            Status
-    end.
+on_get_status(_ConnResId, #{driver_state := DriverState}) ->
+    health_check(DriverState).
 
 -spec on_add_channel(
     connector_resource_id(),

+ 1 - 1
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_cassandra, [
     {description, "EMQX Enterprise Cassandra Bridge"},
-    {vsn, "0.3.3"},
+    {vsn, "0.3.4"},
     {registered, []},
     {applications, [
         kernel,

+ 3 - 2
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl

@@ -6,6 +6,7 @@
 
 -behaviour(emqx_resource).
 
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("emqx_connector/include/emqx_connector.hrl").
 -include("emqx_bridge_cassandra.hrl").
 -include_lib("typerefl/include/types.hrl").
@@ -356,8 +357,8 @@ exec(PoolName, Query) ->
 
 on_get_status(_InstId, #{pool_name := PoolName}) ->
     case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
-        true -> connected;
-        false -> connecting
+        true -> ?status_connected;
+        false -> ?status_connecting
     end.
 
 do_get_status(Conn) ->

+ 1 - 1
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_clickhouse, [
     {description, "EMQX Enterprise ClickHouse Bridge"},
-    {vsn, "0.4.4"},
+    {vsn, "0.4.5"},
     {registered, []},
     {applications, [
         kernel,

+ 5 - 5
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl

@@ -306,8 +306,8 @@ on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannId) ->
 
 on_get_channel_status(InstanceId, _ChannId, State) ->
     case on_get_status(InstanceId, State) of
-        {connected, _} -> connected;
-        {disconnected, _, _} -> disconnected
+        ?status_connected -> ?status_connected;
+        {?status_disconnected, _} -> ?status_disconnected
     end.
 
 on_get_channels(InstanceId) ->
@@ -319,13 +319,13 @@ on_get_channels(InstanceId) ->
 
 on_get_status(
     _InstanceID,
-    #{pool_name := PoolName, connect_timeout := Timeout} = State
+    #{pool_name := PoolName, connect_timeout := Timeout}
 ) ->
     case do_get_status(PoolName, Timeout) of
         ok ->
-            {connected, State};
+            ?status_connected;
         {error, Reason} ->
-            {disconnected, State, Reason}
+            {?status_disconnected, Reason}
     end.
 
 do_get_status(PoolName, Timeout) ->

+ 1 - 1
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_dynamo, [
     {description, "EMQX Enterprise Dynamo Bridge"},
-    {vsn, "0.2.4"},
+    {vsn, "0.2.5"},
     {registered, []},
     {applications, [
         kernel,

+ 5 - 5
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl

@@ -196,7 +196,7 @@ on_format_query_result(Result) ->
 health_check_timeout() ->
     2500.
 
-on_get_status(_InstanceId, #{pool_name := Pool} = State) ->
+on_get_status(_InstanceId, #{pool_name := Pool}) ->
     Health = emqx_resource_pool:health_check_workers(
         Pool,
         {emqx_bridge_dynamo_connector_client, is_connected, [
@@ -207,19 +207,19 @@ on_get_status(_InstanceId, #{pool_name := Pool} = State) ->
     ),
     case Health of
         {error, timeout} ->
-            {?status_connecting, State, <<"timeout_while_checking_connection">>};
+            {?status_connecting, <<"timeout_while_checking_connection">>};
         {ok, Results} ->
-            status_result(Results, State)
+            status_result(Results)
     end.
 
-status_result(Results, State) ->
+status_result(Results) ->
     case lists:filter(fun(Res) -> Res =/= true end, Results) of
         [] when Results =:= [] ->
             ?status_connecting;
         [] ->
             ?status_connected;
         [{false, Error} | _] ->
-            {?status_connecting, State, Error}
+            {?status_connecting, Error}
     end.
 
 %%========================================================================================

+ 1 - 1
apps/emqx_bridge_es/src/emqx_bridge_es.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_es, [
     {description, "EMQX Enterprise Elastic Search Bridge"},
-    {vsn, "0.1.4"},
+    {vsn, "0.1.5"},
     {modules, [
         emqx_bridge_es,
         emqx_bridge_es_connector

+ 2 - 1
apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl

@@ -8,6 +8,7 @@
 -behaviour(emqx_resource).
 
 -include("emqx_bridge_es.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -244,7 +245,7 @@ on_stop(InstanceId, State) ->
     Res.
 
 -spec on_get_status(manager_id(), state()) ->
-    {connected, state()} | {disconnected, state(), term()}.
+    ?status_connected | {?status_disconnected, term()}.
 on_get_status(InstanceId, State) ->
     emqx_bridge_http_connector:on_get_status(InstanceId, State).
 

+ 1 - 1
apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_hstreamdb, [
     {description, "EMQX Enterprise HStreamDB Bridge"},
-    {vsn, "0.2.2"},
+    {vsn, "0.2.3"},
     {registered, []},
     {applications, [
         kernel,

+ 1 - 1
apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl

@@ -172,7 +172,7 @@ on_get_status(_InstId, State) ->
         Error ->
             %% We set it to ?status_connecting so that the channels are not deleted.
             %% The producers in the channels contains buffers so we don't want to delete them.
-            {?status_connecting, State, Error}
+            {?status_connecting, Error}
     end.
 
 %% -------------------------------------------------------------------------------------------------

+ 6 - 12
apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl

@@ -16,6 +16,7 @@
 
 -module(emqx_bridge_http_connector).
 
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
@@ -590,14 +591,14 @@ on_get_channels(ResId) ->
 on_get_status(InstId, State) ->
     on_get_status(InstId, State, fun default_health_checker/2).
 
-on_get_status(InstId, #{pool_name := InstId, connect_timeout := Timeout} = State, DoPerWorker) ->
+on_get_status(InstId, #{pool_name := InstId, connect_timeout := Timeout}, DoPerWorker) ->
     case do_get_status(InstId, Timeout, DoPerWorker) of
         ok ->
-            connected;
+            ?status_connected;
         {error, still_connecting} ->
-            connecting;
+            ?status_connecting;
         {error, Reason} ->
-            {disconnected, State, Reason}
+            {?status_disconnected, Reason}
     end.
 
 do_get_status(PoolName, Timeout) ->
@@ -643,14 +644,7 @@ on_get_channel_status(
     _ChannelId,
     State
 ) ->
-    %% N.B.: `on_get_channel_status' expects a different return value than
-    %% `on_get_status'.
-    case on_get_status(InstId, State, fun default_health_checker/2) of
-        {Status, _State, Reason} ->
-            {Status, Reason};
-        Res ->
-            Res
-    end.
+    on_get_status(InstId, State, fun default_health_checker/2).
 
 on_format_query_result({ok, Status, Headers, Body}) ->
     #{

+ 1 - 1
apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_influxdb, [
     {description, "EMQX Enterprise InfluxDB Bridge"},
-    {vsn, "0.2.5"},
+    {vsn, "0.2.6"},
     {registered, []},
     {applications, [
         kernel,

+ 3 - 2
apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl

@@ -3,6 +3,7 @@
 %%--------------------------------------------------------------------
 -module(emqx_bridge_influxdb_connector).
 
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("emqx_connector/include/emqx_connector.hrl").
 
 -include_lib("hocon/include/hoconsc.hrl").
@@ -221,9 +222,9 @@ on_format_query_result(Result) ->
 on_get_status(_InstId, #{client := Client}) ->
     case influxdb:is_alive(Client) andalso ok =:= influxdb:check_auth(Client) of
         true ->
-            connected;
+            ?status_connected;
         false ->
-            disconnected
+            ?status_disconnected
     end.
 
 transform_bridge_v1_config_to_connector_config(BridgeV1Config) ->

+ 2 - 7
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl

@@ -224,15 +224,10 @@ on_stop(ConnectorResId, State) ->
 
 -spec on_get_status(connector_resource_id(), connector_state()) ->
     ?status_connected | ?status_disconnected.
-on_get_status(_ConnectorResId, State = #{kafka_client_id := ClientID}) ->
+on_get_status(_ConnectorResId, #{kafka_client_id := ClientID}) ->
     case whereis(ClientID) of
         Pid when is_pid(Pid) ->
-            case check_client_connectivity(Pid) of
-                {Status, Reason} ->
-                    {Status, State, Reason};
-                Status ->
-                    Status
-            end;
+            check_client_connectivity(Pid);
         _ ->
             ?status_disconnected
     end;

+ 4 - 4
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -605,7 +605,7 @@ on_get_status(
         {error, {find_client, _Error}} ->
             ?status_connecting;
         {error, {connectivity, Error}} ->
-            {?status_connecting, State, Error}
+            {?status_connecting, Error}
     end.
 
 on_get_channel_status(
@@ -688,11 +688,11 @@ maybe_check_health_check_topic(ConnResId, #{health_check_topic := Topic} = Conne
             ?status_connected
     catch
         throw:{unhealthy_target, Msg} ->
-            {?status_disconnected, ConnectorState, Msg};
+            {?status_disconnected, Msg};
         throw:#{reason := {connection_down, _} = Reason} ->
-            {?status_disconnected, ConnectorState, Reason};
+            {?status_disconnected, Reason};
         throw:#{reason := Reason} ->
-            {?status_connecting, ConnectorState, Reason}
+            {?status_connecting, Reason}
     end;
 maybe_check_health_check_topic(_ConnResId, _ConnState) ->
     %% Cannot infer further information.  Maybe upgraded from older version.

+ 1 - 1
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_kinesis, [
     {description, "EMQX Enterprise Amazon Kinesis Bridge"},
-    {vsn, "0.2.3"},
+    {vsn, "0.2.4"},
     {registered, []},
     {applications, [
         kernel,

+ 10 - 33
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl

@@ -94,11 +94,15 @@ on_stop(InstanceId, _State) ->
     ?status_connected
     | ?status_disconnected
     | {?status_disconnected, state(), {unhealthy_target, string()}}.
-on_get_status(_InstanceId, #{pool_name := Pool} = State) ->
+on_get_status(_InstanceId, #{pool_name := _Pool} = State) ->
+    do_get_status(State, []).
+
+-spec do_get_status(state(), nil() | [_Stream]) -> _.
+do_get_status(#{pool_name := Pool}, StreamArgs) ->
     case
         emqx_resource_pool:health_check_workers(
             Pool,
-            {emqx_bridge_kinesis_connector_client, connection_status, []},
+            {emqx_bridge_kinesis_connector_client, connection_status, StreamArgs},
             ?HEALTH_CHECK_TIMEOUT,
             #{return_values => true}
         )
@@ -111,7 +115,7 @@ on_get_status(_InstanceId, #{pool_name := Pool} = State) ->
                 false ->
                     Unhealthy = lists:any(fun(S) -> S =:= {error, unhealthy_target} end, Values),
                     case Unhealthy of
-                        true -> {?status_disconnected, State, {unhealthy_target, ?TOPIC_MESSAGE}};
+                        true -> {?status_disconnected, {unhealthy_target, ?TOPIC_MESSAGE}};
                         false -> ?status_disconnected
                     end
             end;
@@ -166,39 +170,12 @@ on_get_channel_status(
     _ResId,
     ChannelId,
     #{
-        pool_name := PoolName,
+        pool_name := _PoolName,
         installed_channels := Channels
-    }
+    } = State
 ) ->
     #{stream_name := StreamName} = maps:get(ChannelId, Channels),
-    case
-        emqx_resource_pool:health_check_workers(
-            PoolName,
-            {emqx_bridge_kinesis_connector_client, connection_status, [StreamName]},
-            ?HEALTH_CHECK_TIMEOUT,
-            #{return_values => true}
-        )
-    of
-        {ok, Values} ->
-            AllOk = lists:all(fun(S) -> S =:= {ok, ?status_connected} end, Values),
-            case AllOk of
-                true ->
-                    ?status_connected;
-                false ->
-                    Unhealthy = lists:any(fun(S) -> S =:= {error, unhealthy_target} end, Values),
-                    case Unhealthy of
-                        true -> {?status_disconnected, {unhealthy_target, ?TOPIC_MESSAGE}};
-                        false -> ?status_disconnected
-                    end
-            end;
-        {error, Reason} ->
-            ?SLOG(error, #{
-                msg => "kinesis_producer_get_status_failed",
-                reason => Reason,
-                stream_name => StreamName
-            }),
-            ?status_disconnected
-    end.
+    do_get_status(State, [StreamName]).
 
 on_get_channels(ResId) ->
     emqx_bridge_v2:get_channels_for_connector(ResId).

+ 1 - 1
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_mongodb, [
     {description, "EMQX Enterprise MongoDB Bridge"},
-    {vsn, "0.3.4"},
+    {vsn, "0.3.5"},
     {registered, []},
     {applications, [
         kernel,

+ 2 - 7
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl

@@ -61,13 +61,8 @@ on_get_channel_status(InstanceId, _ChannelId, State) ->
 on_get_channels(InstanceId) ->
     emqx_bridge_v2:get_channels_for_connector(InstanceId).
 
-on_get_status(InstanceId, ConnectorState = #{connector_state := DriverState0}) ->
-    case emqx_mongodb:on_get_status(InstanceId, DriverState0) of
-        {Status, DriverState, Reason} ->
-            {Status, ConnectorState#{connector_state := DriverState}, Reason};
-        Status when is_atom(Status) ->
-            Status
-    end.
+on_get_status(InstanceId, #{connector_state := DriverState0}) ->
+    emqx_mongodb:on_get_status(InstanceId, DriverState0).
 
 on_query(InstanceId, {Channel, Message0}, #{channels := Channels, connector_state := ConnectorState}) ->
     #{

+ 1 - 1
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge_mqtt, [
     {description, "EMQX MQTT Broker Bridge"},
-    {vsn, "0.2.6"},
+    {vsn, "0.2.7"},
     {registered, []},
     {applications, [
         kernel,

+ 1 - 6
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl

@@ -358,12 +358,7 @@ on_get_status(_ResourceId, State) ->
     Workers = [{Pool, Worker} || {Pool, PN} <- Pools, {_Name, Worker} <- ecpool:workers(PN)],
     try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of
         Statuses ->
-            case combine_status(Statuses) of
-                {Status, Msg} ->
-                    {Status, State, Msg};
-                Status ->
-                    Status
-            end
+            combine_status(Statuses)
     catch
         exit:timeout ->
             ?status_connecting

+ 1 - 1
apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_mysql, [
     {description, "EMQX Enterprise MySQL Bridge"},
-    {vsn, "0.1.9"},
+    {vsn, "0.1.10"},
     {registered, []},
     {applications, [
         kernel,

+ 6 - 25
apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl

@@ -5,6 +5,7 @@
 
 -behaviour(emqx_resource).
 
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 %% `emqx_resource' API
@@ -51,7 +52,7 @@ on_add_channel(
                     {error, {prepare_statement, Context}};
                 {error, undefined_table} ->
                     {error, {unhealthy_target, <<"Undefined table">>}};
-                _ ->
+                ok ->
                     State = State0#{
                         channels => maps:put(ChannelId, ChannelConfig, Channels),
                         connector_state => ConnectorState
@@ -65,36 +66,16 @@ on_add_channel(
 on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
     case maps:get(ChannelId, Channels) of
         #{prepares := ok} ->
-            connected;
+            ?status_connected;
         #{prepares := {error, _}} ->
-            connecting
+            ?status_connecting
     end.
 
 on_get_channels(InstanceId) ->
     emqx_bridge_v2:get_channels_for_connector(InstanceId).
 
-on_get_status(InstanceId, #{channels := Channels0, connector_state := ConnectorState} = State0) ->
-    case emqx_mysql:on_get_status(InstanceId, ConnectorState) of
-        WithState when is_tuple(WithState) ->
-            NewConnectorState = element(2, WithState),
-            State = State0#{connector_state => NewConnectorState},
-            setelement(2, WithState, State);
-        connected ->
-            Channels =
-                maps:map(
-                    fun
-                        (_ChannelId, #{prepares := ok} = ChannelConfig) ->
-                            ChannelConfig;
-                        (_ChannelId, #{prepares := {error, _}} = ChannelConfig) ->
-                            set_prepares(ChannelConfig, ConnectorState)
-                    end,
-                    Channels0
-                ),
-            State = State0#{channels => Channels},
-            {connected, State};
-        Other ->
-            Other
-    end.
+on_get_status(InstanceId, #{connector_state := ConnectorState}) ->
+    emqx_mysql:on_get_status(InstanceId, ConnectorState).
 
 on_query(InstId, {TypeOrKey, SQLOrKey}, State) ->
     on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State);

+ 1 - 1
apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_opents, [
     {description, "EMQX Enterprise OpenTSDB Bridge"},
-    {vsn, "0.2.2"},
+    {vsn, "0.2.3"},
     {registered, []},
     {applications, [
         kernel,

+ 7 - 9
apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl

@@ -185,15 +185,13 @@ on_format_query_result(Result) ->
     Result.
 
 on_get_status(_InstanceId, #{server := Server}) ->
-    Result =
-        case opentsdb_connectivity(Server) of
-            ok ->
-                connected;
-            {error, Reason} ->
-                ?SLOG(error, #{msg => "opents_lost_connection", reason => Reason}),
-                connecting
-        end,
-    Result.
+    case opentsdb_connectivity(Server) of
+        ok ->
+            ?status_connected;
+        {error, Reason} ->
+            ?SLOG(error, #{msg => "opents_lost_connection", reason => Reason}),
+            ?status_connecting
+    end.
 
 on_add_channel(
     _InstanceId,

+ 1 - 1
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_rabbitmq, [
     {description, "EMQX Enterprise RabbitMQ Bridge"},
-    {vsn, "0.2.4"},
+    {vsn, "0.2.5"},
     {registered, []},
     {mod, {emqx_bridge_rabbitmq_app, []}},
     {applications, [

+ 7 - 7
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl

@@ -178,8 +178,8 @@ connect(Options) ->
     end.
 
 -spec on_get_status(resource_id(), term()) ->
-    {connected, resource_state()} | {disconnected, resource_state(), binary()}.
-on_get_status(PoolName, #{channels := Channels} = State) ->
+    ?status_connected | {?status_disconnected, binary()}.
+on_get_status(PoolName, #{channels := Channels}) ->
     ChannelNum = maps:size(Channels),
     Conns = get_rabbitmq_connections(PoolName),
     Check =
@@ -191,19 +191,19 @@ on_get_status(PoolName, #{channels := Channels} = State) ->
             Conns
         ),
     case Check andalso Conns =/= [] of
-        true -> {connected, State};
-        false -> {disconnected, State, <<"not_connected">>}
+        true -> ?status_connected;
+        false -> {?status_disconnected, <<"not_connected">>}
     end.
 
 on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
     case emqx_utils_maps:deep_find([ChannelId, rabbitmq], Channels) of
         {ok, RabbitMQ} ->
             case lists:all(fun is_process_alive/1, maps:values(RabbitMQ)) of
-                true -> connected;
-                false -> {error, not_connected}
+                true -> ?status_connected;
+                false -> {?status_disconnected, not_connected}
             end;
         _ ->
-            {error, not_exists}
+            ?status_disconnected
     end.
 
 on_query(ResourceID, {ChannelId, Data} = MsgReq, State) ->

+ 1 - 1
apps/emqx_bridge_s3/src/emqx_bridge_s3.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_s3, [
     {description, "EMQX Enterprise S3 Bridge"},
-    {vsn, "0.1.8"},
+    {vsn, "0.1.9"},
     {registered, []},
     {applications, [
         kernel,

+ 3 - 3
apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl

@@ -144,17 +144,17 @@ on_stop(InstId, _State = #{pool_name := PoolName}) ->
 
 -spec on_get_status(_InstanceId :: resource_id(), state()) ->
     health_check_status().
-on_get_status(_InstId, State = #{client_config := Config}) ->
+on_get_status(_InstId, #{client_config := Config}) ->
     case emqx_s3_client:aws_config(Config) of
         {error, Reason} ->
-            {?status_disconnected, State, map_error_details(Reason)};
+            {?status_disconnected, map_error_details(Reason)};
         AWSConfig ->
             try erlcloud_s3:list_buckets(AWSConfig) of
                 Props when is_list(Props) ->
                     ?status_connected
             catch
                 error:Error ->
-                    {?status_disconnected, State, map_error_details(Error)}
+                    {?status_disconnected, map_error_details(Error)}
             end
     end.
 

+ 1 - 1
apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_syskeeper, [
     {description, "EMQX Enterprise Data bridge for Syskeeper"},
-    {vsn, "0.1.5"},
+    {vsn, "0.1.6"},
     {registered, []},
     {applications, [
         kernel,

+ 3 - 2
apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl

@@ -7,6 +7,7 @@
 -behaviour(gen_statem).
 
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 
 -elvis([{elvis_style, invalid_dynamic_call, disable}]).
 
@@ -105,10 +106,10 @@ on_stop(InstanceId, _State) ->
 on_get_status(_InstanceId, #{listen_on := ListenOn}) ->
     try
         _ = esockd:listener({?MODULE, ListenOn}),
-        connected
+        ?status_connected
     catch
         _:_ ->
-            disconnected
+            ?status_disconnected
     end.
 
 %% -------------------------------------------------------------------------------------------------

+ 1 - 1
apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_tdengine, [
     {description, "EMQX Enterprise TDEngine Bridge"},
-    {vsn, "0.2.4"},
+    {vsn, "0.2.5"},
     {registered, []},
     {applications, [
         kernel,

+ 6 - 11
apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl

@@ -224,7 +224,7 @@ on_format_query_result({ok, ResultMap}) ->
 on_format_query_result(Result) ->
     Result.
 
-on_get_status(_InstanceId, #{pool_name := PoolName} = State) ->
+on_get_status(_InstanceId, #{pool_name := PoolName}) ->
     case
         emqx_resource_pool:health_check_workers(
             PoolName,
@@ -234,16 +234,16 @@ on_get_status(_InstanceId, #{pool_name := PoolName} = State) ->
         )
     of
         {ok, []} ->
-            {?status_connecting, State, undefined};
+            {?status_connecting, undefined};
         {ok, Values} ->
             case lists:keyfind(error, 1, Values) of
                 false ->
                     ?status_connected;
                 {error, Reason} ->
-                    {?status_connecting, State, enhance_reason(Reason)}
+                    {?status_connecting, enhance_reason(Reason)}
             end;
         {error, Reason} ->
-            {?status_connecting, State, enhance_reason(Reason)}
+            {?status_connecting, enhance_reason(Reason)}
     end.
 
 do_get_status(Conn) ->
@@ -301,14 +301,9 @@ on_get_channels(InstanceId) ->
 on_get_channel_status(InstanceId, ChannelId, #{channels := Channels} = State) ->
     case maps:is_key(ChannelId, Channels) of
         true ->
-            case on_get_status(InstanceId, State) of
-                {Status, _State, Reason} ->
-                    {Status, Reason};
-                Status ->
-                    Status
-            end;
+            on_get_status(InstanceId, State);
         _ ->
-            {error, not_exists}
+            ?status_disconnected
     end.
 
 %%========================================================================================

+ 1 - 1
apps/emqx_cluster_link/src/emqx_cluster_link.app.src

@@ -2,7 +2,7 @@
 {application, emqx_cluster_link, [
     {description, "EMQX Cluster Linking"},
     % strict semver, bump manually!
-    {vsn, "0.1.2"},
+    {vsn, "0.1.3"},
     {modules, []},
     {registered, []},
     {applications, [

+ 2 - 2
apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl

@@ -281,7 +281,7 @@ on_get_status(_ResourceId, #{pool_name := PoolName} = _State) ->
             combine_status(Statuses)
     catch
         exit:timeout ->
-            connecting
+            ?status_connecting
     end.
 
 get_status(Worker) ->
@@ -312,7 +312,7 @@ combine_status(Statuses) ->
         [Status | _] ->
             Status;
         [] ->
-            disconnected
+            ?status_disconnected
     end.
 
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -249,7 +249,7 @@ init_mocks(_TestCase) ->
             (_, bad_connector_state) ->
                 connecting;
             (_, worst_connector_state) ->
-                {?status_disconnected, worst_connector_state, [
+                {?status_disconnected, [
                     #{
                         host => <<"nope:9092">>,
                         reason => unresolvable_hostname

+ 1 - 1
apps/emqx_ldap/src/emqx_ldap.app.src

@@ -1,6 +1,6 @@
 {application, emqx_ldap, [
     {description, "EMQX LDAP Connector"},
-    {vsn, "0.1.11"},
+    {vsn, "0.1.12"},
     {registered, []},
     {applications, [
         kernel,

+ 4 - 3
apps/emqx_ldap/src/emqx_ldap.erl

@@ -16,6 +16,7 @@
 
 -module(emqx_ldap).
 
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("emqx_connector/include/emqx_connector.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
@@ -204,18 +205,18 @@ on_get_status(InstId, #{pool_name := PoolName} = State) ->
         connected ->
             emqx_ldap_bind_worker:on_get_status(InstId, State);
         disconnected ->
-            disconnected
+            ?status_disconnected
     end.
 
 get_status_with_poolname(PoolName) ->
     case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
         true ->
-            connected;
+            ?status_connected;
         false ->
             %% Note: here can only return `disconnected` not `connecting`
             %% because the LDAP socket/connection can't be reused
             %% searching on a died socket will never return until timeout
-            disconnected
+            ?status_disconnected
     end.
 
 do_get_status(Conn) ->

+ 2 - 1
apps/emqx_ldap/src/emqx_ldap_bind_worker.erl

@@ -16,6 +16,7 @@
 
 -module(emqx_ldap_bind_worker).
 
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -107,7 +108,7 @@ on_query(
 on_get_status(_InstId, #{bind_pool_name := PoolName}) ->
     emqx_ldap:get_status_with_poolname(PoolName);
 on_get_status(_InstId, _) ->
-    connected.
+    ?status_connected.
 
 %% ===================================================================
 

+ 1 - 1
apps/emqx_mongodb/src/emqx_mongodb.app.src

@@ -1,6 +1,6 @@
 {application, emqx_mongodb, [
     {description, "EMQX MongoDB Connector"},
-    {vsn, "0.1.8"},
+    {vsn, "0.1.9"},
     {registered, []},
     {applications, [
         kernel,

+ 4 - 3
apps/emqx_mongodb/src/emqx_mongodb.erl

@@ -15,6 +15,7 @@
 %%--------------------------------------------------------------------
 -module(emqx_mongodb).
 
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("emqx_connector/include/emqx_connector.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
@@ -299,21 +300,21 @@ on_query(
             {ok, Result}
     end.
 
-on_get_status(InstId, State = #{pool_name := PoolName}) ->
+on_get_status(InstId, #{pool_name := PoolName}) ->
     case health_check(PoolName) of
         ok ->
             ?tp(debug, emqx_connector_mongo_health_check, #{
                 instance_id => InstId,
                 status => ok
             }),
-            connected;
+            ?status_connected;
         {error, Reason} ->
             ?tp(warning, emqx_connector_mongo_health_check, #{
                 instance_id => InstId,
                 reason => Reason,
                 status => failed
             }),
-            {disconnected, State, Reason}
+            {?status_disconnected, Reason}
     end.
 
 health_check(PoolName) ->

+ 1 - 1
apps/emqx_mysql/src/emqx_mysql.app.src

@@ -1,6 +1,6 @@
 {application, emqx_mysql, [
     {description, "EMQX MySQL Database Connector"},
-    {vsn, "0.2.1"},
+    {vsn, "0.2.2"},
     {registered, []},
     {applications, [
         kernel,

+ 5 - 18
apps/emqx_mysql/src/emqx_mysql.erl

@@ -15,6 +15,7 @@
 %%--------------------------------------------------------------------
 -module(emqx_mysql).
 
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("emqx_connector/include/emqx_connector.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
@@ -240,18 +241,15 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
         true ->
             case do_check_prepares(State) of
                 ok ->
-                    connected;
-                {ok, NState} ->
-                    %% return new state with prepared statements
-                    {connected, NState};
+                    ?status_connected;
                 {error, undefined_table} ->
-                    {disconnected, State, unhealthy_target};
+                    {?status_disconnected, unhealthy_target};
                 {error, _Reason} ->
                     %% do not log error, it is logged in prepare_sql_to_conn
-                    connecting
+                    ?status_connecting
             end;
         false ->
-            connecting
+            ?status_connecting
     end.
 
 do_get_status(Conn) ->
@@ -287,17 +285,6 @@ do_check_prepares(
         ok,
         Workers
     );
-do_check_prepares(#{prepares := ok}) ->
-    ok;
-do_check_prepares(#{prepares := {error, _}, query_templates := _} = State) ->
-    %% retry to prepare
-    case prepare_sql(State) of
-        ok ->
-            %% remove the error
-            {ok, State#{prepares => ok}};
-        {error, Reason} ->
-            {error, Reason}
-    end;
 do_check_prepares(_NoTemplates) ->
     ok.
 

+ 1 - 1
apps/emqx_postgresql/src/emqx_postgresql.app.src

@@ -1,6 +1,6 @@
 {application, emqx_postgresql, [
     {description, "EMQX PostgreSQL Database Connector"},
-    {vsn, "0.2.5"},
+    {vsn, "0.2.6"},
     {registered, []},
     {applications, [
         kernel,

+ 16 - 29
apps/emqx_postgresql/src/emqx_postgresql.erl

@@ -16,6 +16,7 @@
 -module(emqx_postgresql).
 
 -include("emqx_postgresql.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("emqx_connector/include/emqx_connector.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("emqx/include/logger.hrl").
@@ -60,9 +61,6 @@
     default_port => ?PGSQL_DEFAULT_PORT
 }).
 
--type connector_resource_id() :: binary().
--type action_resource_id() :: binary().
-
 -type template() :: {unicode:chardata(), emqx_template_sql:row_template()}.
 -type state() ::
     #{
@@ -175,7 +173,12 @@ on_start(
                     true -> disabled;
                     false -> #{}
                 end,
-            {ok, init_prepare(State2#{pool_name => InstId, prepares => Prepares})};
+            case init_prepare(State2#{pool_name => InstId, prepares => Prepares}) of
+                #{prepares := {error, _} = Error} ->
+                    Error;
+                State ->
+                    {ok, State}
+            end;
         {error, Reason} ->
             ?tp(
                 pgsql_connector_start_failed,
@@ -314,9 +317,9 @@ on_get_channel_status(
         )
     of
         ok ->
-            connected;
+            ?status_connected;
         {error, undefined_table} ->
-            {error, {unhealthy_target, <<"Table does not exist">>}}
+            {?status_disconnected, {unhealthy_target, <<"Table does not exist">>}}
     end.
 
 do_check_channel_sql(
@@ -521,19 +524,14 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
         true ->
             case do_check_prepares(State) of
                 ok ->
-                    connected;
-                {ok, NState} ->
-                    %% return new state with prepared statements
-                    {connected, NState};
+                    ?status_connected;
                 {error, undefined_table} ->
-                    %% return new state indicating that we are connected but the target table is not created
-                    {disconnected, State, unhealthy_target};
-                {error, _Reason} ->
-                    %% do not log error, it is logged in prepare_sql_to_conn
-                    connecting
+                    %% return error indicating that we are connected but the target table
+                    %% is not created
+                    {?status_disconnected, unhealthy_target}
             end;
         false ->
-            connecting
+            ?status_connecting
     end.
 
 do_get_status(Conn) ->
@@ -552,19 +550,8 @@ do_check_prepares(
         {error, Reason} ->
             {error, Reason}
     end;
-do_check_prepares(#{prepares := disabled}) ->
-    ok;
-do_check_prepares(#{prepares := Prepares}) when is_map(Prepares) ->
-    ok;
-do_check_prepares(#{prepares := {error, _}} = State) ->
-    %% retry to prepare
-    case prepare_sql(State) of
-        {ok, PrepStatements} ->
-            %% remove the error
-            {ok, State#{prepares := PrepStatements}};
-        {error, Reason} ->
-            {error, Reason}
-    end.
+do_check_prepares(_) ->
+    ok.
 
 -spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}.
 validate_table_existence([WorkerPid | Rest], SQL) ->

+ 1 - 1
apps/emqx_redis/src/emqx_redis.app.src

@@ -1,6 +1,6 @@
 {application, emqx_redis, [
     {description, "EMQX Redis Database Connector"},
-    {vsn, "0.1.6"},
+    {vsn, "0.1.7"},
     {registered, []},
     {applications, [
         kernel,

+ 14 - 14
apps/emqx_redis/src/emqx_redis.erl

@@ -236,7 +236,7 @@ is_unrecoverable_error({error, invalid_cluster_command}) ->
 is_unrecoverable_error(_) ->
     false.
 
-on_get_status(_InstId, #{type := cluster, pool_name := PoolName} = State) ->
+on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) ->
     case eredis_cluster:pool_exists(PoolName) of
         true ->
             %% eredis_cluster has null slot even pool_exists when emqx start before redis cluster.
@@ -247,12 +247,12 @@ on_get_status(_InstId, #{type := cluster, pool_name := PoolName} = State) ->
                 [] ->
                     ?status_disconnected;
                 [_ | _] ->
-                    do_cluster_status_check(PoolName, State)
+                    do_cluster_status_check(PoolName)
             end;
         false ->
             ?status_disconnected
     end;
-on_get_status(_InstId, #{pool_name := PoolName} = State) ->
+on_get_status(_InstId, #{pool_name := PoolName}) ->
     HealthCheckResoults = emqx_resource_pool:health_check_workers(
         PoolName,
         fun ?MODULE:do_get_status/1,
@@ -261,27 +261,27 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
     ),
     case HealthCheckResoults of
         {ok, Results} ->
-            sum_worker_results(Results, State);
+            sum_worker_results(Results);
         Error ->
-            {?status_disconnected, State, Error}
+            {?status_disconnected, Error}
     end.
 
-do_cluster_status_check(Pool, State) ->
+do_cluster_status_check(Pool) ->
     Pongs = eredis_cluster:qa(Pool, [<<"PING">>]),
-    sum_worker_results(Pongs, State).
+    sum_worker_results(Pongs).
 
 do_get_status(Conn) ->
     eredis:q(Conn, ["PING"]).
 
-sum_worker_results([], _State) ->
+sum_worker_results([]) ->
     ?status_connected;
-sum_worker_results([{error, <<"NOAUTH Authentication required.">>} = Error | _Rest], State) ->
+sum_worker_results([{error, <<"NOAUTH Authentication required.">>} = Error | _Rest]) ->
     ?tp(emqx_redis_auth_required_error, #{}),
     %% This requires user action to fix so we set the status to disconnected
-    {?status_disconnected, State, {unhealthy_target, Error}};
-sum_worker_results([{ok, _} | Rest], State) ->
-    sum_worker_results(Rest, State);
-sum_worker_results([Error | _Rest], State) ->
+    {?status_disconnected, {unhealthy_target, Error}};
+sum_worker_results([{ok, _} | Rest]) ->
+    sum_worker_results(Rest);
+sum_worker_results([Error | _Rest]) ->
     ?SLOG(
         warning,
         #{
@@ -289,7 +289,7 @@ sum_worker_results([Error | _Rest], State) ->
             error => Error
         }
     ),
-    {?status_connecting, State, Error}.
+    {?status_connecting, Error}.
 
 do_cmd(PoolName, cluster, {cmd, Command}) ->
     eredis_cluster:q(PoolName, Command);

+ 1 - 2
apps/emqx_resource/src/emqx_resource.erl

@@ -208,8 +208,7 @@
 %% when calling emqx_resource:health_check/2
 -callback on_get_status(resource_id(), resource_state()) ->
     health_check_status()
-    | {health_check_status(), resource_state()}
-    | {health_check_status(), resource_state(), term()}.
+    | {health_check_status(), Reason :: term()}.
 
 -callback on_get_channel_status(resource_id(), channel_id(), resource_state()) ->
     channel_status()

+ 7 - 9
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -1197,12 +1197,12 @@ continue_with_health_check(#data{} = Data0, CurrentState, HCRes) ->
         id = ResId,
         error = PrevError
     } = Data0,
-    {NewStatus, NewState, Err} = parse_health_check_result(HCRes, Data0),
+    {NewStatus, Err} = parse_health_check_result(HCRes, Data0),
     IsDryRun = emqx_resource:is_dry_run(ResId),
     _ = maybe_alarm(NewStatus, IsDryRun, ResId, Err, PrevError),
     ok = maybe_resume_resource_workers(ResId, NewStatus),
     Data1 = Data0#data{
-        state = NewState, status = NewStatus, error = Err
+        status = NewStatus, error = Err
     },
     Data = update_state(Data1),
     case CurrentState of
@@ -1733,12 +1733,10 @@ maybe_clear_alarm(true, _ResId) ->
 maybe_clear_alarm(false, ResId) ->
     emqx_alarm:safe_deactivate(ResId).
 
-parse_health_check_result(Status, Data) when ?IS_STATUS(Status) ->
-    {Status, Data#data.state, status_to_error(Status)};
-parse_health_check_result({Status, NewState}, _Data) when ?IS_STATUS(Status) ->
-    {Status, NewState, status_to_error(Status)};
-parse_health_check_result({Status, NewState, Error}, _Data) when ?IS_STATUS(Status) ->
-    {Status, NewState, {error, Error}};
+parse_health_check_result(Status, _Data) when ?IS_STATUS(Status) ->
+    {Status, status_to_error(Status)};
+parse_health_check_result({Status, Error}, _Data) when ?IS_STATUS(Status) ->
+    {Status, {error, Error}};
 parse_health_check_result({error, Error}, Data) ->
     ?tp("health_check_exception", #{resource_id => Data#data.id, reason => Error}),
     ?SLOG(
@@ -1750,7 +1748,7 @@ parse_health_check_result({error, Error}, Data) ->
         },
         #{tag => tag(Data#data.group, Data#data.type)}
     ),
-    {?status_disconnected, Data#data.state, {error, Error}}.
+    {?status_disconnected, {error, Error}}.
 
 status_to_error(?status_connected) ->
     undefined;

+ 1 - 1
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -310,7 +310,7 @@ on_get_status(_InstId, #{health_check_error := true}) ->
     ?status_disconnected;
 on_get_status(_InstId, State = #{health_check_error := {msg, Message}}) ->
     ?tp(connector_demo_health_check_error, #{}),
-    {?status_disconnected, State, Message};
+    {?status_disconnected, Message};
 on_get_status(_InstId, #{pid := Pid, health_check_error := {delay, Delay}}) ->
     ?tp(connector_demo_health_check_delay, #{}),
     timer:sleep(Delay),