فهرست منبع

feat: refactor Oracle bridge to connector and action

Fixes:
https://emqx.atlassian.net/browse/EMQX-11464
Kjell Winblad 2 سال پیش
والد
کامیت
a4272c71dc

+ 1 - 0
apps/emqx_bridge/src/emqx_action_info.erl

@@ -92,6 +92,7 @@ hard_coded_action_info_modules_ee() ->
         emqx_bridge_kinesis_action_info,
         emqx_bridge_matrix_action_info,
         emqx_bridge_mongodb_action_info,
+        emqx_bridge_oracle_action_info,
         emqx_bridge_influxdb_action_info,
         emqx_bridge_cassandra_action_info,
         emqx_bridge_mysql_action_info,

+ 2 - 2
apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_oracle, [
     {description, "EMQX Enterprise Oracle Database Bridge"},
-    {vsn, "0.1.4"},
+    {vsn, "0.1.5"},
     {registered, []},
     {applications, [
         kernel,
@@ -8,7 +8,7 @@
         emqx_resource,
         emqx_oracle
     ]},
-    {env, []},
+    {env, [{emqx_action_info_modules, [emqx_bridge_oracle_action_info]}]},
     {modules, []},
 
     {links, []}

+ 121 - 7
apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl

@@ -9,7 +9,9 @@
 -include_lib("emqx_resource/include/emqx_resource.hrl").
 
 -export([
-    conn_bridge_examples/1
+    bridge_v2_examples/1,
+    conn_bridge_examples/1,
+    connector_examples/1
 ]).
 
 -export([
@@ -20,22 +22,25 @@
     config_validator/1
 ]).
 
+-define(CONNECTOR_TYPE, oracle).
+-define(ACTION_TYPE, ?CONNECTOR_TYPE).
+
 -define(DEFAULT_SQL, <<
     "insert into t_mqtt_msgs(msgid, topic, qos, payload) "
     "values (${id}, ${topic}, ${qos}, ${payload})"
 >>).
 
-conn_bridge_examples(Method) ->
+conn_bridge_examples(_Method) ->
     [
         #{
             <<"oracle">> => #{
                 summary => <<"Oracle Database Bridge">>,
-                value => values(Method)
+                value => conn_bridge_examples_values()
             }
         }
     ].
 
-values(_Method) ->
+conn_bridge_examples_values() ->
     #{
         enable => true,
         type => oracle,
@@ -58,6 +63,54 @@ values(_Method) ->
         }
     }.
 
+connector_examples(Method) ->
+    [
+        #{
+            <<"oracle">> =>
+                #{
+                    summary => <<"Oracle Connector">>,
+                    value => emqx_connector_schema:connector_values(
+                        Method, ?CONNECTOR_TYPE, connector_values()
+                    )
+                }
+        }
+    ].
+
+connector_values() ->
+    #{
+        <<"username">> => <<"system">>,
+        <<"password">> => <<"oracle">>,
+        <<"server">> => <<"127.0.0.1:1521">>,
+        <<"service_name">> => <<"XE">>,
+        <<"sid">> => <<"XE">>,
+        <<"pool_size">> => 8,
+        <<"resource_opts">> =>
+            #{
+                <<"health_check_interval">> => <<"15s">>,
+                <<"start_timeout">> => <<"5s">>
+            }
+    }.
+
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"oracle">> =>
+                #{
+                    summary => <<"Oracle Action">>,
+                    value => emqx_bridge_v2_schema:action_values(
+                        Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
+                    )
+                }
+        }
+    ].
+
+action_values() ->
+    #{
+        parameters => #{
+            <<"sql">> => ?DEFAULT_SQL
+        }
+    }.
+
 %% -------------------------------------------------------------------------------------------------
 %% Hocon Schema Definitions
 
@@ -65,6 +118,55 @@ namespace() -> "bridge_oracle".
 
 roots() -> [].
 
+fields(Field) when
+    Field == "get_connector";
+    Field == "put_connector";
+    Field == "post_connector"
+->
+    emqx_connector_schema:api_fields(
+        Field,
+        ?CONNECTOR_TYPE,
+        fields("config_connector")
+    );
+fields(Field) when
+    Field == "get_bridge_v2";
+    Field == "post_bridge_v2";
+    Field == "put_bridge_v2"
+->
+    emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(oracle_action));
+fields(action) ->
+    {?ACTION_TYPE,
+        hoconsc:mk(
+            hoconsc:map(name, hoconsc:ref(?MODULE, oracle_action)),
+            #{
+                desc => <<"Oracle Action Config">>,
+                required => false
+            }
+        )};
+fields(oracle_action) ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        hoconsc:mk(
+            hoconsc:ref(?MODULE, action_parameters),
+            #{
+                required => true,
+                desc => ?DESC("action_parameters")
+            }
+        )
+    );
+fields(action_parameters) ->
+    [
+        {sql,
+            hoconsc:mk(
+                binary(),
+                #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
+            )}
+    ];
+fields("config_connector") ->
+    emqx_connector_schema:common_fields() ++
+        fields(connector_fields) ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
 fields("config") ->
     [
         {enable,
@@ -83,8 +185,10 @@ fields("config") ->
                 #{desc => ?DESC("local_topic"), default => undefined}
             )}
     ] ++ emqx_resource_schema:fields("resource_opts") ++
-        (emqx_oracle_schema:fields(config) --
-            emqx_connector_schema_lib:prepare_statement_fields());
+        fields(connector_fields);
+fields(connector_fields) ->
+    (emqx_oracle_schema:fields(config) --
+        emqx_connector_schema_lib:prepare_statement_fields());
 fields("post") ->
     fields("post", oracle);
 fields("put") ->
@@ -97,6 +201,16 @@ fields("post", Type) ->
 
 desc("config") ->
     ?DESC("desc_config");
+desc("creation_opts") ->
+    ?DESC(emqx_resource_schema, "creation_opts");
+desc("config_connector") ->
+    ?DESC("config_connector");
+desc(oracle_action) ->
+    ?DESC("oracle_action");
+desc(action_parameters) ->
+    ?DESC("action_parameters");
+desc(connector_resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
 desc(_) ->
     undefined.
 
@@ -116,5 +230,5 @@ config_validator(#{<<"server">> := Server} = Config) when
         not is_map_key(<<"service_name">>, Config)
 ->
     {error, "neither SID nor Service Name was set"};
-config_validator(_) ->
+config_validator(_Config) ->
     ok.

+ 22 - 0
apps/emqx_bridge_oracle/src/emqx_bridge_oracle_action_info.erl

@@ -0,0 +1,22 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_oracle_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+    bridge_v1_type_name/0,
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0
+]).
+
+bridge_v1_type_name() -> oracle.
+
+action_type_name() -> oracle.
+
+connector_type_name() -> oracle.
+
+schema_module() -> emqx_bridge_oracle.

+ 86 - 10
apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl

@@ -267,7 +267,12 @@ parse_and_check(ConfigString, Name) ->
 resource_id(Config) ->
     Type = ?BRIDGE_TYPE_BIN,
     Name = ?config(oracle_name, Config),
-    emqx_bridge_resource:resource_id(Type, Name).
+    <<"connector:", Type/binary, ":", Name/binary>>.
+
+action_id(Config) ->
+    Type = ?BRIDGE_TYPE_BIN,
+    Name = ?config(oracle_name, Config),
+    emqx_bridge_v2:id(Type, Name).
 
 bridge_id(Config) ->
     Type = ?BRIDGE_TYPE_BIN,
@@ -378,6 +383,7 @@ create_rule_and_action_http(Config) ->
 
 t_sync_query(Config) ->
     ResourceId = resource_id(Config),
+    Name = ?config(oracle_name, Config),
     ?check_trace(
         begin
             reset_table(Config),
@@ -387,6 +393,18 @@ t_sync_query(Config) ->
                 _Attempts = 20,
                 ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
             ),
+            ?retry(
+                _Sleep1 = 1_000,
+                _Attempts1 = 30,
+                ?assertMatch(
+                    #{status := connected},
+                    emqx_bridge_v2:health_check(
+                        ?BRIDGE_TYPE_BIN,
+                        Name
+                    )
+                )
+            ),
+            ActionId = action_id(Config),
             MsgId = erlang:unique_integer(),
             Params = #{
                 topic => ?config(mqtt_topic, Config),
@@ -394,7 +412,7 @@ t_sync_query(Config) ->
                 payload => ?config(oracle_name, Config),
                 retain => true
             },
-            Message = {send_message, Params},
+            Message = {ActionId, Params},
             ?assertEqual(
                 {ok, [{affected_rows, 1}]}, emqx_resource:simple_sync_query(ResourceId, Message)
             ),
@@ -409,7 +427,7 @@ t_batch_sync_query(Config) ->
     ProxyHost = ?config(proxy_host, Config),
     ProxyName = ?config(proxy_name, Config),
     ResourceId = resource_id(Config),
-    BridgeId = bridge_id(Config),
+    Name = ?config(oracle_name, Config),
     ?check_trace(
         begin
             reset_table(Config),
@@ -419,6 +437,17 @@ t_batch_sync_query(Config) ->
                 _Attempts = 30,
                 ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
             ),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 30,
+                ?assertMatch(
+                    #{status := connected},
+                    emqx_bridge_v2:health_check(
+                        ?BRIDGE_TYPE_BIN,
+                        Name
+                    )
+                )
+            ),
             MsgId = erlang:unique_integer(),
             Params = #{
                 topic => ?config(mqtt_topic, Config),
@@ -431,9 +460,9 @@ t_batch_sync_query(Config) ->
             % be sent async as callback_mode is set to async_if_possible.
             emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
                 ct:sleep(1000),
-                emqx_bridge:send_message(BridgeId, Params),
-                emqx_bridge:send_message(BridgeId, Params),
-                emqx_bridge:send_message(BridgeId, Params),
+                emqx_bridge_v2:send_message(?BRIDGE_TYPE_BIN, Name, Params, #{}),
+                emqx_bridge_v2:send_message(?BRIDGE_TYPE_BIN, Name, Params, #{}),
+                emqx_bridge_v2:send_message(?BRIDGE_TYPE_BIN, Name, Params, #{}),
                 ok
             end),
             % Wait for reconnection.
@@ -442,6 +471,17 @@ t_batch_sync_query(Config) ->
                 _Attempts = 30,
                 ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
             ),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 30,
+                ?assertMatch(
+                    #{status := connected},
+                    emqx_bridge_v2:health_check(
+                        ?BRIDGE_TYPE_BIN,
+                        Name
+                    )
+                )
+            ),
             ?retry(
                 _Sleep = 1_000,
                 _Attempts = 30,
@@ -506,6 +546,17 @@ t_start_stop(Config) ->
                 _Attempts = 20,
                 ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
             ),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertMatch(
+                    #{status := connected},
+                    emqx_bridge_v2:health_check(
+                        ?BRIDGE_TYPE_BIN,
+                        OracleName
+                    )
+                )
+            ),
 
             %% Check that the bridge probe API doesn't leak atoms.
             ProbeRes0 = probe_bridge_api(
@@ -554,6 +605,7 @@ t_probe_with_nested_tokens(Config) ->
 t_message_with_nested_tokens(Config) ->
     BridgeId = bridge_id(Config),
     ResourceId = resource_id(Config),
+    Name = ?config(oracle_name, Config),
     reset_table(Config),
     ?assertMatch(
         {ok, _},
@@ -568,6 +620,17 @@ t_message_with_nested_tokens(Config) ->
         _Attempts = 20,
         ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
     ),
+    ?retry(
+        _Sleep = 1_000,
+        _Attempts = 20,
+        ?assertMatch(
+            #{status := connected},
+            emqx_bridge_v2:health_check(
+                ?BRIDGE_TYPE_BIN,
+                Name
+            )
+        )
+    ),
     MsgId = erlang:unique_integer(),
     Data = binary_to_list(?config(oracle_name, Config)),
     Params = #{
@@ -600,6 +663,7 @@ t_on_get_status(Config) ->
     ProxyPort = ?config(proxy_port, Config),
     ProxyHost = ?config(proxy_host, Config),
     ProxyName = ?config(proxy_name, Config),
+    Name = ?config(oracle_name, Config),
     ResourceId = resource_id(Config),
     reset_table(Config),
     ?assertMatch({ok, _}, create_bridge(Config)),
@@ -612,13 +676,23 @@ t_on_get_status(Config) ->
     ),
     emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
         ct:sleep(500),
-        ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
+        ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
+        ?assertMatch(
+            #{status := disconnected},
+            emqx_bridge_v2:health_check(?BRIDGE_TYPE_BIN, Name)
+        )
     end),
     %% Check that it recovers itself.
     ?retry(
         _Sleep = 1_000,
         _Attempts = 20,
-        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+        begin
+            ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
+            ?assertMatch(
+                #{status := connected},
+                emqx_bridge_v2:health_check(?BRIDGE_TYPE_BIN, Name)
+            )
+        end
     ),
     ok.
 
@@ -664,6 +738,7 @@ t_missing_table(Config) ->
         begin
             drop_table_if_exists(Config),
             ?assertMatch({ok, _}, create_bridge_api(Config)),
+            ActionId = emqx_bridge_v2:id(?BRIDGE_TYPE_BIN, ?config(oracle_name, Config)),
             ?retry(
                 _Sleep = 1_000,
                 _Attempts = 20,
@@ -679,7 +754,7 @@ t_missing_table(Config) ->
                 payload => ?config(oracle_name, Config),
                 retain => true
             },
-            Message = {send_message, Params},
+            Message = {ActionId, Params},
             ?assertMatch(
                 {error, {resource_error, #{reason := not_connected}}},
                 emqx_resource:simple_sync_query(ResourceId, Message)
@@ -698,6 +773,7 @@ t_table_removed(Config) ->
         begin
             reset_table(Config),
             ?assertMatch({ok, _}, create_bridge_api(Config)),
+            ActionId = emqx_bridge_v2:id(?BRIDGE_TYPE_BIN, ?config(oracle_name, Config)),
             ?retry(
                 _Sleep = 1_000,
                 _Attempts = 20,
@@ -711,7 +787,7 @@ t_table_removed(Config) ->
                 payload => ?config(oracle_name, Config),
                 retain => true
             },
-            Message = {send_message, Params},
+            Message = {ActionId, Params},
             ?assertEqual(
                 {error, {unrecoverable_error, {942, "ORA-00942: table or view does not exist\n"}}},
                 emqx_resource:simple_sync_query(ResourceId, Message)

+ 13 - 0
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -36,6 +36,8 @@ resource_type(matrix) ->
     emqx_postgresql;
 resource_type(mongodb) ->
     emqx_bridge_mongodb_connector;
+resource_type(oracle) ->
+    emqx_oracle;
 resource_type(influxdb) ->
     emqx_bridge_influxdb_connector;
 resource_type(cassandra) ->
@@ -140,6 +142,15 @@ connector_structs() ->
                     required => false
                 }
             )},
+        {oracle,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_oracle, "config_connector")),
+                #{
+                    desc => <<"Oracle Connector Config">>,
+                    required => false,
+                    validator => fun emqx_bridge_oracle:config_validator/1
+                }
+            )},
         {influxdb,
             mk(
                 hoconsc:map(name, ref(emqx_bridge_influxdb, "config_connector")),
@@ -247,6 +258,7 @@ schema_modules() ->
         emqx_bridge_kinesis,
         emqx_bridge_matrix,
         emqx_bridge_mongodb,
+        emqx_bridge_oracle,
         emqx_bridge_influxdb,
         emqx_bridge_cassandra,
         emqx_bridge_mysql,
@@ -280,6 +292,7 @@ api_schemas(Method) ->
         api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"),
         api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
         api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
+        api_ref(emqx_bridge_oracle, <<"oracle">>, Method ++ "_connector"),
         api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"),
         api_ref(emqx_bridge_cassandra, <<"cassandra">>, Method ++ "_connector"),
         api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"),

+ 2 - 0
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -137,6 +137,8 @@ connector_type_to_bridge_types(matrix) ->
     [matrix];
 connector_type_to_bridge_types(mongodb) ->
     [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
+connector_type_to_bridge_types(oracle) ->
+    [oracle];
 connector_type_to_bridge_types(influxdb) ->
     [influxdb, influxdb_api_v1, influxdb_api_v2];
 connector_type_to_bridge_types(cassandra) ->

+ 1 - 1
apps/emqx_oracle/src/emqx_oracle.app.src

@@ -1,6 +1,6 @@
 {application, emqx_oracle, [
     {description, "EMQX Enterprise Oracle Database Connector"},
-    {vsn, "0.1.8"},
+    {vsn, "0.1.9"},
     {registered, []},
     {applications, [
         kernel,

+ 147 - 45
apps/emqx_oracle/src/emqx_oracle.erl

@@ -6,6 +6,7 @@
 
 -behaviour(emqx_resource).
 
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
@@ -24,7 +25,11 @@
     on_stop/2,
     on_query/3,
     on_batch_query/3,
-    on_get_status/2
+    on_get_status/2,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channels/1,
+    on_get_channel_status/3
 ]).
 
 %% callbacks for ecpool
@@ -103,12 +108,13 @@ on_start(
         {app_name, "EMQX Data To Oracle Database Action"}
     ],
     PoolName = InstId,
-    Prepares = parse_prepare_sql(Config),
-    InitState = #{pool_name => PoolName},
-    State = maps:merge(InitState, Prepares),
+    State = #{
+        pool_name => PoolName,
+        installed_channels => #{}
+    },
     case emqx_resource_pool:start(InstId, ?MODULE, Options) of
         ok ->
-            {ok, init_prepare(State)};
+            {ok, State};
         {error, Reason} ->
             ?tp(
                 oracle_connector_start_failed,
@@ -125,13 +131,105 @@ on_stop(InstId, #{pool_name := PoolName}) ->
     ?tp(oracle_bridge_stopped, #{instance_id => InstId}),
     emqx_resource_pool:stop(PoolName).
 
+on_add_channel(
+    _InstId,
+    #{
+        installed_channels := InstalledChannels,
+        pool_name := PoolName
+    } = OldState,
+    ChannelId,
+    ChannelConfig
+) ->
+    {ok, ChannelState} = create_channel_state(ChannelId, PoolName, ChannelConfig),
+    NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
+    %% Update state
+    NewState = OldState#{installed_channels => NewInstalledChannels},
+    {ok, NewState}.
+
+create_channel_state(
+    ChannelId,
+    PoolName,
+    #{parameters := Conf} = _ChannelConfig
+) ->
+    State0 = parse_prepare_sql(ChannelId, Conf),
+    State1 = init_prepare(PoolName, State0),
+    {ok, State1}.
+
+on_remove_channel(
+    _InstId,
+    #{
+        installed_channels := InstalledChannels
+    } = OldState,
+    ChannelId
+) ->
+    NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
+    %% Update state
+    NewState = OldState#{installed_channels => NewInstalledChannels},
+    {ok, NewState}.
+
+on_get_channel_status(
+    _ResId,
+    ChannelId,
+    #{
+        pool_name := PoolName,
+        installed_channels := Channels
+    } = _State
+) ->
+    State = maps:get(ChannelId, Channels),
+    case do_check_prepares(ChannelId, PoolName, State) of
+        ok ->
+            ?status_connected;
+        {error, undefined_table} ->
+            %% return new state indicating that we are connected but the target table is not created
+            {?status_disconnected, {unhealthy_target, ?UNHEALTHY_TARGET_MSG}};
+        {error, _Reason} ->
+            %% do not log error, it is logged in prepare_sql_to_conn
+            connecting
+    end.
+% #{stream_name := StreamName} = maps:get(ChannelId, Channels),
+% case
+%     emqx_resource_pool:health_check_workers(
+%         PoolName,
+%         {emqx_bridge_kinesis_connector_client, connection_status, [StreamName]},
+%         ?HEALTH_CHECK_TIMEOUT,
+%         #{return_values => true}
+%     )
+% of
+%     {ok, Values} ->
+%         AllOk = lists:all(fun(S) -> S =:= {ok, ?status_connected} end, Values),
+%         case AllOk of
+%             true ->
+%                 ?status_connected;
+%             false ->
+%                 Unhealthy = lists:any(fun(S) -> S =:= {error, unhealthy_target} end, Values),
+%                 case Unhealthy of
+%                     true -> {?status_disconnected, {unhealthy_target, ?TOPIC_MESSAGE}};
+%                     false -> ?status_disconnected
+%                 end
+%         end;
+%     {error, Reason} ->
+%         ?SLOG(error, #{
+%             msg => "kinesis_producer_get_status_failed",
+%             state => State,
+%             reason => Reason
+%         }),
+%         ?status_disconnected
+% end.
+
+on_get_channels(ResId) ->
+    emqx_bridge_v2:get_channels_for_connector(ResId).
+
 on_query(InstId, {TypeOrKey, NameOrSQL}, #{pool_name := _PoolName} = State) ->
     on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
 on_query(
     InstId,
     {TypeOrKey, NameOrSQL, Params},
-    #{pool_name := PoolName} = State
+    #{
+        pool_name := PoolName,
+        installed_channels := Channels
+    } = _ConnectorState
 ) ->
+    State = maps:get(TypeOrKey, Channels, #{}),
     ?SLOG(debug, #{
         msg => "oracle_connector_received_sql_query",
         connector => InstId,
@@ -147,11 +245,19 @@ on_query(
 on_batch_query(
     InstId,
     BatchReq,
-    #{pool_name := PoolName, params_tokens := Tokens, prepare_sql := Sts} = State
+    #{
+        pool_name := PoolName,
+        installed_channels := Channels
+    } = ConnectorState
 ) ->
     case BatchReq of
         [{Key, _} = Request | _] ->
             BinKey = to_bin(Key),
+            State = maps:get(BinKey, Channels),
+            #{
+                params_tokens := Tokens,
+                prepare_sql := Sts
+            } = State,
             case maps:get(BinKey, Tokens, undefined) of
                 undefined ->
                     Log = #{
@@ -179,7 +285,7 @@ on_batch_query(
             Log = #{
                 connector => InstId,
                 request => BatchReq,
-                state => State,
+                state => ConnectorState,
                 msg => "invalid_request"
             },
             ?SLOG(error, Log),
@@ -232,36 +338,35 @@ on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL, Data) ->
             Result
     end.
 
-on_get_status(_InstId, #{pool_name := Pool} = State) ->
+on_get_status(_InstId, #{pool_name := Pool} = _State) ->
     case emqx_resource_pool:health_check_workers(Pool, fun ?MODULE:do_get_status/1) of
         true ->
-            case do_check_prepares(State) of
-                ok ->
-                    connected;
-                {ok, NState} ->
-                    %% return new state with prepared statements
-                    {connected, NState};
-                {error, {undefined_table, NState}} ->
-                    %% return new state indicating that we are connected but the target table is not created
-                    {disconnected, NState, {unhealthy_target, ?UNHEALTHY_TARGET_MSG}};
-                {error, _Reason} ->
-                    %% do not log error, it is logged in prepare_sql_to_conn
-                    connecting
-            end;
+            ?status_connected;
         false ->
-            disconnected
+            ?status_disconnected
     end.
 
 do_get_status(Conn) ->
     ok == element(1, jamdb_oracle:sql_query(Conn, "select 1 from dual")).
 
 do_check_prepares(
+    _ChannelId,
+    _PoolName,
     #{
-        pool_name := PoolName,
-        prepare_sql := #{<<"send_message">> := SQL},
-        params_tokens := #{<<"send_message">> := Tokens}
-    } = State
+        prepare_sql := {error, _Prepares}
+    } = _State
+) ->
+    {error, undefined_table};
+do_check_prepares(
+    ChannelId,
+    PoolName,
+    State
 ) ->
+    #{
+        prepare_sql := #{ChannelId := SQL},
+        params_tokens := #{ChannelId := Tokens}
+    } = State,
+
     % it's already connected. Verify if target table still exists
     Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
     lists:foldl(
@@ -270,7 +375,7 @@ do_check_prepares(
                 case ecpool_worker:client(WorkerPid) of
                     {ok, Conn} ->
                         case check_if_table_exists(Conn, SQL, Tokens) of
-                            {error, undefined_table} -> {error, {undefined_table, State}};
+                            {error, undefined_table} -> {error, undefined_table};
                             _ -> ok
                         end;
                     _ ->
@@ -281,20 +386,17 @@ do_check_prepares(
         end,
         ok,
         Workers
-    );
-do_check_prepares(
-    State = #{pool_name := PoolName, prepare_sql := {error, Prepares}, params_tokens := TokensMap}
-) ->
-    case prepare_sql(Prepares, PoolName, TokensMap) of
-        %% remove the error
-        {ok, Sts} ->
-            {ok, State#{prepare_sql => Sts}};
-        {error, undefined_table} ->
-            %% indicate the error
-            {error, {undefined_table, State#{prepare_sql => {error, Prepares}}}};
-        {error, _Reason} = Error ->
-            Error
-    end.
+    ).
+% case prepare_sql(Prepares, PoolName, TokensMap) of
+%     %% remove the error
+%     {ok, Sts} ->
+%         {ok, State#{prepare_sql => Sts}};
+%     {error, undefined_table} ->
+%         %% indicate the error
+%         {error, {undefined_table, State#{prepare_sql => {error, Prepares}}}};
+%     {error, _Reason} = Error ->
+%         Error
+% end.
 
 %% ===================================================================
 
@@ -328,13 +430,13 @@ execute_batch(Conn, SQL, ParamsList) ->
     ?tp(oracle_batch_query, #{conn => Conn, sql => SQL, params => ParamsList, result => Ret}),
     handle_result(Ret).
 
-parse_prepare_sql(Config) ->
+parse_prepare_sql(ChannelId, Config) ->
     SQL =
         case maps:get(prepare_statement, Config, undefined) of
             undefined ->
                 case maps:get(sql, Config, undefined) of
                     undefined -> #{};
-                    Template -> #{<<"send_message">> => Template}
+                    Template -> #{ChannelId => Template}
                 end;
             Any ->
                 Any
@@ -352,7 +454,7 @@ parse_prepare_sql([], Prepares, Tokens) ->
         params_tokens => Tokens
     }.
 
-init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName, params_tokens := TokensMap}) ->
+init_prepare(PoolName, State = #{prepare_sql := Prepares, params_tokens := TokensMap}) ->
     case prepare_sql(Prepares, PoolName, TokensMap) of
         {ok, Sts} ->
             State#{prepare_sql := Sts};

+ 15 - 0
rel/i18n/emqx_bridge_oracle.hocon

@@ -54,4 +54,19 @@ emqx_bridge_oracle {
     label = "Bridge Name"
   }
 
+  action_parameters {
+    desc = "Action specific configuration."
+    label = "Action"
+  }
+
+  oracle_action {
+    desc = "Configuration for Oracle Action"
+    label = "Oracle Action Configuration"
+  }
+
+  config_connector {
+    desc = "Configuration for an Oracle Client."
+    label = "Oracle Client Configuration"
+  }
+
 }