Forráskód Böngészése

Merge pull request #10498 from paulozulato/master

feat(oracle): Oracle Database integration
Paulo Zulato 2 éve
szülő
commit
5f835627f9
31 módosított fájl, 1494 hozzáadás és 13 törlés
  1. 11 0
      .ci/docker-compose-file/docker-compose-oracle.yaml
  2. 6 0
      .ci/docker-compose-file/toxiproxy.json
  3. 1 1
      apps/emqx_bridge/src/emqx_bridge.app.src
  4. 2 1
      apps/emqx_bridge/src/emqx_bridge.erl
  5. 94 0
      apps/emqx_bridge_oracle/BSL.txt
  6. 28 0
      apps/emqx_bridge_oracle/README.md
  7. 2 0
      apps/emqx_bridge_oracle/docker-ct
  8. 0 0
      apps/emqx_bridge_oracle/etc/emqx_bridge_oracle.conf
  9. 13 0
      apps/emqx_bridge_oracle/rebar.config
  10. 14 0
      apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src
  11. 109 0
      apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl
  12. 514 0
      apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl
  13. 94 0
      apps/emqx_oracle/BSL.txt
  14. 14 0
      apps/emqx_oracle/README.md
  15. 7 0
      apps/emqx_oracle/rebar.config
  16. 14 0
      apps/emqx_oracle/src/emqx_oracle.app.src
  17. 367 0
      apps/emqx_oracle/src/emqx_oracle.erl
  18. 33 0
      apps/emqx_oracle/src/emqx_oracle_schema.erl
  19. 8 3
      apps/emqx_plugin_libs/src/emqx_placeholder.erl
  20. 1 1
      apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src
  21. 1 2
      apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl
  22. 1 0
      changes/ee/feat-10498.en.md
  23. 1 1
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src
  24. 14 3
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl
  25. 5 1
      mix.exs
  26. 4 0
      rebar.config.erl
  27. 52 0
      rel/i18n/emqx_bridge_oracle.hocon
  28. 15 0
      rel/i18n/emqx_oracle.hocon
  29. 51 0
      rel/i18n/zh/emqx_bridge_oracle.hocon
  30. 15 0
      rel/i18n/zh/emqx_oracle.hocon
  31. 3 0
      scripts/ct/run.sh

+ 11 - 0
.ci/docker-compose-file/docker-compose-oracle.yaml

@@ -0,0 +1,11 @@
+version: '3.9'
+
+services:
+  oracle_server:
+    container_name: oracle
+    image: oracleinanutshell/oracle-xe-11g:1.0.0
+    restart: always
+    environment:
+      ORACLE_DISABLE_ASYNCH_IO: true
+    networks:
+      - emqx_bridge

+ 6 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -119,5 +119,11 @@
     "listen": "0.0.0.0:6653",
     "upstream": "pulsar:6653",
     "enabled": true
+  },
+  {
+    "name": "oracle",
+    "listen": "0.0.0.0:1521",
+    "upstream": "oracle:1521",
+    "enabled": true
   }
 ]

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge, [
     {description, "EMQX bridges"},
-    {vsn, "0.1.17"},
+    {vsn, "0.1.18"},
     {registered, [emqx_bridge_sup]},
     {mod, {emqx_bridge_app, []}},
     {applications, [

+ 2 - 1
apps/emqx_bridge/src/emqx_bridge.erl

@@ -71,7 +71,8 @@
     T == rocketmq;
     T == cassandra;
     T == sqlserver;
-    T == pulsar_producer
+    T == pulsar_producer;
+    T == oracle
 ).
 
 load() ->

+ 94 - 0
apps/emqx_bridge_oracle/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2027-02-01
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 28 - 0
apps/emqx_bridge_oracle/README.md

@@ -0,0 +1,28 @@
+# EMQX Oracle Database Bridge
+
+This application houses the Oracle Database bridge for EMQX Enterprise Edition.
+It implements the data bridge APIs for interacting with an Oracle Database Bridge.
+
+
+# Documentation
+
+- Refer to [EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)
+  for the EMQX rules engine introduction.
+
+
+# HTTP APIs
+
+- Several APIs are provided for bridge management, which includes create bridge,
+  update bridge, get bridge, stop or restart bridge and list bridges etc.
+
+  Refer to [API Docs - Bridges](https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges) for more detailed information.
+
+
+## Contributing
+
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+
+## License
+
+See [BSL](./BSL.txt).

+ 2 - 0
apps/emqx_bridge_oracle/docker-ct

@@ -0,0 +1,2 @@
+toxiproxy
+oracle

+ 0 - 0
apps/emqx_bridge_oracle/etc/emqx_bridge_oracle.conf


+ 13 - 0
apps/emqx_bridge_oracle/rebar.config

@@ -0,0 +1,13 @@
+%% -*- mode: erlang; -*-
+
+{erl_opts, [debug_info]}.
+{deps, [ {emqx_oracle, {path, "../../apps/emqx_oracle"}}
+       , {emqx_connector, {path, "../../apps/emqx_connector"}}
+       , {emqx_resource, {path, "../../apps/emqx_resource"}}
+       , {emqx_bridge, {path, "../../apps/emqx_bridge"}}
+       ]}.
+
+{shell, [
+  % {config, "config/sys.config"},
+    {apps, [emqx_bridge_oracle]}
+]}.

+ 14 - 0
apps/emqx_bridge_oracle/src/emqx_bridge_oracle.app.src

@@ -0,0 +1,14 @@
+{application, emqx_bridge_oracle, [
+    {description, "EMQX Enterprise Oracle Database Bridge"},
+    {vsn, "0.1.0"},
+    {registered, []},
+    {applications, [
+        kernel,
+        stdlib,
+        emqx_oracle
+    ]},
+    {env, []},
+    {modules, []},
+
+    {links, []}
+]}.

+ 109 - 0
apps/emqx_bridge_oracle/src/emqx_bridge_oracle.erl

@@ -0,0 +1,109 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_oracle).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-export([
+    conn_bridge_examples/1
+]).
+
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+-define(DEFAULT_SQL, <<
+    "insert into t_mqtt_msg(msgid, topic, qos, payload)"
+    "values (${id}, ${topic}, ${qos}, ${payload})"
+>>).
+
+conn_bridge_examples(Method) ->
+    [
+        #{
+            <<"oracle">> => #{
+                summary => <<"Oracle Database Bridge">>,
+                value => values(Method)
+            }
+        }
+    ].
+
+values(_Method) ->
+    #{
+        enable => true,
+        type => oracle,
+        name => <<"foo">>,
+        server => <<"127.0.0.1:1521">>,
+        pool_size => 8,
+        database => <<"ORCL">>,
+        sid => <<"ORCL">>,
+        username => <<"root">>,
+        password => <<"******">>,
+        sql => ?DEFAULT_SQL,
+        local_topic => <<"local/topic/#">>,
+        resource_opts => #{
+            worker_pool_size => 8,
+            health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
+            auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
+            batch_size => ?DEFAULT_BATCH_SIZE,
+            batch_time => ?DEFAULT_BATCH_TIME,
+            query_mode => async,
+            max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
+        }
+    }.
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon Schema Definitions
+
+namespace() -> "bridge_oracle".
+
+roots() -> [].
+
+fields("config") ->
+    [
+        {enable,
+            hoconsc:mk(
+                boolean(),
+                #{desc => ?DESC("config_enable"), default => true}
+            )},
+        {sql,
+            hoconsc:mk(
+                binary(),
+                #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
+            )},
+        {local_topic,
+            hoconsc:mk(
+                binary(),
+                #{desc => ?DESC("local_topic"), default => undefined}
+            )}
+    ] ++ emqx_resource_schema:fields("resource_opts") ++
+        (emqx_oracle_schema:fields(config) --
+            emqx_connector_schema_lib:prepare_statement_fields());
+fields("post") ->
+    fields("post", oracle);
+fields("put") ->
+    fields("config");
+fields("get") ->
+    emqx_bridge_schema:status_fields() ++ fields("post").
+
+fields("post", Type) ->
+    [type_field(Type), name_field() | fields("config")].
+
+desc("config") ->
+    ?DESC("desc_config");
+desc(_) ->
+    undefined.
+
+%% -------------------------------------------------------------------------------------------------
+
+type_field(Type) ->
+    {type, hoconsc:mk(hoconsc:enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+    {name, hoconsc:mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

+ 514 - 0
apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl

@@ -0,0 +1,514 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_oracle_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-import(emqx_common_test_helpers, [on_exit/1]).
+
+-define(BRIDGE_TYPE_BIN, <<"oracle">>).
+-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_oracle, emqx_bridge_oracle]).
+-define(DATABASE, "XE").
+-define(RULE_TOPIC, "mqtt/rule").
+% -define(RULE_TOPIC_BIN, <<?RULE_TOPIC>>).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    [
+        {group, plain}
+    ].
+
+groups() ->
+    AllTCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {plain, AllTCs}
+    ].
+
+only_once_tests() ->
+    [t_create_via_http].
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
+    ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)),
+    _ = application:stop(emqx_connector),
+    ok.
+
+init_per_group(plain = Type, Config) ->
+    OracleHost = os:getenv("ORACLE_PLAIN_HOST", "toxiproxy.emqx.net"),
+    OraclePort = list_to_integer(os:getenv("ORACLE_PLAIN_PORT", "1521")),
+    ProxyName = "oracle",
+    case emqx_common_test_helpers:is_tcp_server_available(OracleHost, OraclePort) of
+        true ->
+            Config1 = common_init_per_group(),
+            [
+                {proxy_name, ProxyName},
+                {oracle_host, OracleHost},
+                {oracle_port, OraclePort},
+                {connection_type, Type}
+                | Config1 ++ Config
+            ];
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_oracle);
+                _ ->
+                    {skip, no_oracle}
+            end
+    end;
+init_per_group(_Group, Config) ->
+    Config.
+
+end_per_group(Group, Config) when
+    Group =:= plain
+->
+    common_end_per_group(Config),
+    ok;
+end_per_group(_Group, _Config) ->
+    ok.
+
+common_init_per_group() ->
+    ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
+    ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    application:load(emqx_bridge),
+    ok = emqx_common_test_helpers:start_apps([emqx_conf]),
+    ok = emqx_connector_test_helpers:start_apps(?APPS),
+    {ok, _} = application:ensure_all_started(emqx_connector),
+    emqx_mgmt_api_test_util:init_suite(),
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>,
+    [
+        {proxy_host, ProxyHost},
+        {proxy_port, ProxyPort},
+        {mqtt_topic, MQTTTopic}
+    ].
+
+common_end_per_group(Config) ->
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+    delete_all_bridges(),
+    ok.
+
+init_per_testcase(TestCase, Config) ->
+    common_init_per_testcase(TestCase, Config).
+
+end_per_testcase(_Testcase, Config) ->
+    common_end_per_testcase(_Testcase, Config).
+
+common_init_per_testcase(TestCase, Config0) ->
+    ct:timetrap(timer:seconds(60)),
+    delete_all_bridges(),
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    OracleTopic =
+        <<
+            (atom_to_binary(TestCase))/binary,
+            UniqueNum/binary
+        >>,
+    ConnectionType = ?config(connection_type, Config0),
+    Config = [{oracle_topic, OracleTopic} | Config0],
+    {Name, ConfigString, OracleConfig} = oracle_config(
+        TestCase, ConnectionType, Config
+    ),
+    ok = snabbkaffe:start_trace(),
+    [
+        {oracle_name, Name},
+        {oracle_config_string, ConfigString},
+        {oracle_config, OracleConfig}
+        | Config
+    ].
+
+common_end_per_testcase(_Testcase, Config) ->
+    case proplists:get_bool(skip_does_not_apply, Config) of
+        true ->
+            ok;
+        false ->
+            ProxyHost = ?config(proxy_host, Config),
+            ProxyPort = ?config(proxy_port, Config),
+            emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
+            delete_all_bridges(),
+            %% in CI, apparently this needs more time since the
+            %% machines struggle with all the containers running...
+            emqx_common_test_helpers:call_janitor(60_000),
+            ok = snabbkaffe:stop(),
+            ok
+    end.
+
+delete_all_bridges() ->
+    lists:foreach(
+        fun(#{name := Name, type := Type}) ->
+            emqx_bridge:remove(Type, Name)
+        end,
+        emqx_bridge:list()
+    ).
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+sql_insert_template_for_bridge() ->
+    "INSERT INTO mqtt_test(topic, msgid, payload, retain) VALUES (${topic}, ${id}, ${payload}, ${retain})".
+
+sql_create_table() ->
+    "CREATE TABLE mqtt_test (topic VARCHAR2(255), msgid VARCHAR2(64), payload NCLOB, retain NUMBER(1))".
+
+sql_drop_table() ->
+    "DROP TABLE mqtt_test".
+
+reset_table(Config) ->
+    ResourceId = resource_id(Config),
+    _ = emqx_resource:simple_sync_query(ResourceId, {sql, sql_drop_table()}),
+    {ok, [{proc_result, 0, _}]} = emqx_resource:simple_sync_query(
+        ResourceId, {sql, sql_create_table()}
+    ),
+    ok.
+
+drop_table(Config) ->
+    ResourceId = resource_id(Config),
+    emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}),
+    ok.
+
+oracle_config(TestCase, _ConnectionType, Config) ->
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    OracleHost = ?config(oracle_host, Config),
+    OraclePort = ?config(oracle_port, Config),
+    Name = <<
+        (atom_to_binary(TestCase))/binary, UniqueNum/binary
+    >>,
+    ServerURL = iolist_to_binary([
+        OracleHost,
+        ":",
+        integer_to_binary(OraclePort)
+    ]),
+    ConfigString =
+        io_lib:format(
+            "bridges.oracle.~s {\n"
+            "  enable = true\n"
+            "  database = \"~s\"\n"
+            "  sid = \"~s\"\n"
+            "  server = \"~s\"\n"
+            "  username = \"system\"\n"
+            "  password = \"oracle\"\n"
+            "  pool_size = 1\n"
+            "  sql = \"~s\"\n"
+            "  resource_opts = {\n"
+            "     auto_restart_interval = 5000\n"
+            "     request_timeout = 30000\n"
+            "     query_mode = \"async\"\n"
+            "     enable_batch = true\n"
+            "     batch_size = 3\n"
+            "     batch_time = \"3s\"\n"
+            "     worker_pool_size = 1\n"
+            "  }\n"
+            "}\n",
+            [
+                Name,
+                ?DATABASE,
+                ?DATABASE,
+                ServerURL,
+                sql_insert_template_for_bridge()
+            ]
+        ),
+    {Name, ConfigString, parse_and_check(ConfigString, Name)}.
+
+parse_and_check(ConfigString, Name) ->
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+    #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
+    Config.
+
+resource_id(Config) ->
+    Type = ?BRIDGE_TYPE_BIN,
+    Name = ?config(oracle_name, Config),
+    emqx_bridge_resource:resource_id(Type, Name).
+
+create_bridge(Config) ->
+    create_bridge(Config, _Overrides = #{}).
+
+create_bridge(Config, Overrides) ->
+    Type = ?BRIDGE_TYPE_BIN,
+    Name = ?config(oracle_name, Config),
+    OracleConfig0 = ?config(oracle_config, Config),
+    OracleConfig = emqx_utils_maps:deep_merge(OracleConfig0, Overrides),
+    emqx_bridge:create(Type, Name, OracleConfig).
+
+create_bridge_api(Config) ->
+    create_bridge_api(Config, _Overrides = #{}).
+
+create_bridge_api(Config, Overrides) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    Name = ?config(oracle_name, Config),
+    OracleConfig0 = ?config(oracle_config, Config),
+    OracleConfig = emqx_utils_maps:deep_merge(OracleConfig0, Overrides),
+    Params = OracleConfig#{<<"type">> => TypeBin, <<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    ct:pal("creating bridge (via http): ~p", [Params]),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
+            {ok, {Status, Headers, Body0}} ->
+                {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
+            Error ->
+                Error
+        end,
+    ct:pal("bridge create result: ~p", [Res]),
+    Res.
+
+update_bridge_api(Config) ->
+    update_bridge_api(Config, _Overrides = #{}).
+
+update_bridge_api(Config, Overrides) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    Name = ?config(oracle_name, Config),
+    OracleConfig0 = ?config(oracle_config, Config),
+    OracleConfig = emqx_utils_maps:deep_merge(OracleConfig0, Overrides),
+    BridgeId = emqx_bridge_resource:bridge_id(TypeBin, Name),
+    Params = OracleConfig#{<<"type">> => TypeBin, <<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    ct:pal("updating bridge (via http): ~p", [Params]),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Params, Opts) of
+            {ok, {_Status, _Headers, Body0}} -> {ok, emqx_utils_json:decode(Body0, [return_maps])};
+            Error -> Error
+        end,
+    ct:pal("bridge update result: ~p", [Res]),
+    Res.
+
+probe_bridge_api(Config) ->
+    probe_bridge_api(Config, _Overrides = #{}).
+
+probe_bridge_api(Config, _Overrides) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    Name = ?config(oracle_name, Config),
+    OracleConfig = ?config(oracle_config, Config),
+    Params = OracleConfig#{<<"type">> => TypeBin, <<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    ct:pal("probing bridge (via http): ~p", [Params]),
+    Res =
+        case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
+            {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
+            Error -> Error
+        end,
+    ct:pal("bridge probe result: ~p", [Res]),
+    Res.
+
+create_rule_and_action_http(Config) ->
+    OracleName = ?config(oracle_name, Config),
+    BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, OracleName),
+    Params = #{
+        enable => true,
+        sql => <<"SELECT * FROM \"", ?RULE_TOPIC, "\"">>,
+        actions => [BridgeId]
+    },
+    Path = emqx_mgmt_api_test_util:api_path(["rules"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    ct:pal("rule action params: ~p", [Params]),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
+        Error -> Error
+    end.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_sync_query(Config) ->
+    ResourceId = resource_id(Config),
+    ?check_trace(
+        begin
+            ?assertMatch({ok, _}, create_bridge_api(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+            ),
+            reset_table(Config),
+            MsgId = erlang:unique_integer(),
+            Params = #{
+                topic => ?config(mqtt_topic, Config),
+                id => MsgId,
+                payload => ?config(oracle_name, Config),
+                retain => true
+            },
+            Message = {send_message, Params},
+            ?assertEqual(
+                {ok, [{affected_rows, 1}]}, emqx_resource:simple_sync_query(ResourceId, Message)
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_batch_sync_query(Config) ->
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+    ResourceId = resource_id(Config),
+    ?check_trace(
+        begin
+            ?assertMatch({ok, _}, create_bridge_api(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 30,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+            ),
+            reset_table(Config),
+            MsgId = erlang:unique_integer(),
+            Params = #{
+                topic => ?config(mqtt_topic, Config),
+                id => MsgId,
+                payload => ?config(oracle_name, Config),
+                retain => false
+            },
+            % Send 3 async messages while resource is down. When it comes back, these messages
+            % will be delivered in sync way. If we try to send sync messages directly, it will
+            % be sent async as callback_mode is set to async_if_possible.
+            Message = {send_message, Params},
+            emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+                ct:sleep(1000),
+                emqx_resource:query(ResourceId, Message),
+                emqx_resource:query(ResourceId, Message),
+                emqx_resource:query(ResourceId, Message)
+            end),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 30,
+                ?assertMatch(
+                    {ok, [{result_set, _, _, [[{3}]]}]},
+                    emqx_resource:simple_sync_query(
+                        ResourceId, {query, "SELECT COUNT(*) FROM mqtt_test"}
+                    )
+                )
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_create_via_http(Config) ->
+    ?check_trace(
+        begin
+            ?assertMatch({ok, _}, create_bridge_api(Config)),
+
+            %% lightweight matrix testing some configs
+            ?assertMatch(
+                {ok, _},
+                update_bridge_api(
+                    Config,
+                    #{
+                        <<"resource_opts">> =>
+                            #{<<"batch_size">> => 4}
+                    }
+                )
+            ),
+            ?assertMatch(
+                {ok, _},
+                update_bridge_api(
+                    Config,
+                    #{
+                        <<"resource_opts">> =>
+                            #{<<"batch_time">> => <<"4s">>}
+                    }
+                )
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
+t_start_stop(Config) ->
+    OracleName = ?config(oracle_name, Config),
+    ResourceId = resource_id(Config),
+    ?check_trace(
+        begin
+            ?assertMatch({ok, _}, create_bridge(Config)),
+            %% Since the connection process is async, we give it some time to
+            %% stabilize and avoid flakiness.
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+            ),
+
+            %% Check that the bridge probe API doesn't leak atoms.
+            ProbeRes0 = probe_bridge_api(
+                Config,
+                #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
+            ),
+            ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
+            AtomsBefore = erlang:system_info(atom_count),
+            %% Probe again; shouldn't have created more atoms.
+            ProbeRes1 = probe_bridge_api(
+                Config,
+                #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
+            ),
+
+            ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
+            AtomsAfter = erlang:system_info(atom_count),
+            ?assertEqual(AtomsBefore, AtomsAfter),
+
+            %% Now stop the bridge.
+            ?assertMatch(
+                {{ok, _}, {ok, _}},
+                ?wait_async_action(
+                    emqx_bridge:disable_enable(disable, ?BRIDGE_TYPE_BIN, OracleName),
+                    #{?snk_kind := oracle_bridge_stopped},
+                    5_000
+                )
+            ),
+
+            ok
+        end,
+        fun(Trace) ->
+            %% one for each probe, one for real
+            ?assertMatch([_, _, _], ?of_kind(oracle_bridge_stopped, Trace)),
+            ok
+        end
+    ),
+    ok.
+
+t_on_get_status(Config) ->
+    ProxyPort = ?config(proxy_port, Config),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyName = ?config(proxy_name, Config),
+    ResourceId = resource_id(Config),
+    ?assertMatch({ok, _}, create_bridge(Config)),
+    %% Since the connection process is async, we give it some time to
+    %% stabilize and avoid flakiness.
+    ?retry(
+        _Sleep = 1_000,
+        _Attempts = 20,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+    emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+        ct:sleep(500),
+        ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
+    end),
+    %% Check that it recovers itself.
+    ?retry(
+        _Sleep = 1_000,
+        _Attempts = 20,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+    ok.

+ 94 - 0
apps/emqx_oracle/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2027-02-01
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 14 - 0
apps/emqx_oracle/README.md

@@ -0,0 +1,14 @@
+# Oracle Database Connector
+
+This application houses the Oracle Database connector for EMQX Enterprise Edition.
+It provides the APIs to connect to Oracle Database.
+
+So far it is only used to insert messages as data bridge.
+
+## Contributing
+
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+## License
+
+See [BSL](./BSL.txt).

+ 7 - 0
apps/emqx_oracle/rebar.config

@@ -0,0 +1,7 @@
+%% -*- mode: erlang; -*-
+
+{erl_opts, [debug_info]}.
+{deps, [ {jamdb_oracle, {git, "https://github.com/emqx/jamdb_oracle", {tag, "0.4.9.4"}}}
+       , {emqx_connector, {path, "../../apps/emqx_connector"}}
+       , {emqx_resource, {path, "../../apps/emqx_resource"}}
+       ]}.

+ 14 - 0
apps/emqx_oracle/src/emqx_oracle.app.src

@@ -0,0 +1,14 @@
+{application, emqx_oracle, [
+    {description, "EMQX Enterprise Oracle Database Connector"},
+    {vsn, "0.1.0"},
+    {registered, []},
+    {applications, [
+        kernel,
+        stdlib,
+        jamdb_oracle
+    ]},
+    {env, []},
+    {modules, []},
+
+    {links, []}
+]}.

+ 367 - 0
apps/emqx_oracle/src/emqx_oracle.erl

@@ -0,0 +1,367 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_oracle).
+
+-behaviour(emqx_resource).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(ORACLE_DEFAULT_PORT, 1521).
+
+%%====================================================================
+%% Exports
+%%====================================================================
+
+%% callbacks for behaviour emqx_resource
+-export([
+    callback_mode/0,
+    is_buffer_supported/0,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    on_batch_query/3,
+    on_get_status/2
+]).
+
+%% callbacks for ecpool
+-export([connect/1, prepare_sql_to_conn/2]).
+
+%% Internal exports used to execute code with ecpool worker
+-export([
+    query/3,
+    execute_batch/3,
+    do_get_status/1
+]).
+
+-export([
+    oracle_host_options/0
+]).
+
+-define(ACTION_SEND_MESSAGE, send_message).
+
+-define(SYNC_QUERY_MODE, no_handover).
+
+-define(ORACLE_HOST_OPTIONS, #{
+    default_port => ?ORACLE_DEFAULT_PORT
+}).
+
+-define(MAX_CURSORS, 10).
+-define(DEFAULT_POOL_SIZE, 8).
+-define(OPT_TIMEOUT, 30000).
+
+-type prepares() :: #{atom() => binary()}.
+-type params_tokens() :: #{atom() => list()}.
+
+-type state() ::
+    #{
+        pool_name := binary(),
+        prepare_sql := prepares(),
+        params_tokens := params_tokens(),
+        batch_params_tokens := params_tokens()
+    }.
+
+% As ecpool is not monitoring the worker's PID when doing a handover_async, the
+% request can be lost if worker crashes. Thus, it's better to force requests to
+% be sync for now.
+callback_mode() -> always_sync.
+
+is_buffer_supported() -> false.
+
+-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
+on_start(
+    InstId,
+    #{
+        server := Server,
+        database := DB,
+        sid := Sid,
+        username := User
+    } = Config
+) ->
+    ?SLOG(info, #{
+        msg => "starting_oracle_connector",
+        connector => InstId,
+        config => emqx_utils:redact(Config)
+    }),
+    ?tp(oracle_bridge_started, #{instance_id => InstId, config => Config}),
+    {ok, _} = application:ensure_all_started(ecpool),
+    {ok, _} = application:ensure_all_started(jamdb_oracle),
+    jamdb_oracle_conn:set_max_cursors_number(?MAX_CURSORS),
+
+    #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, oracle_host_options()),
+    ServiceName = maps:get(<<"service_name">>, Config, Sid),
+    Options = [
+        {host, Host},
+        {port, Port},
+        {user, emqx_plugin_libs_rule:str(User)},
+        {password, emqx_secret:wrap(maps:get(password, Config, ""))},
+        {sid, emqx_plugin_libs_rule:str(Sid)},
+        {service_name, emqx_plugin_libs_rule:str(ServiceName)},
+        {database, DB},
+        {pool_size, maps:get(<<"pool_size">>, Config, ?DEFAULT_POOL_SIZE)},
+        {timeout, ?OPT_TIMEOUT},
+        {app_name, "EMQX Data To Oracle Database Action"}
+    ],
+    PoolName = InstId,
+    Prepares = parse_prepare_sql(Config),
+    InitState = #{pool_name => PoolName, prepare_statement => #{}},
+    State = maps:merge(InitState, Prepares),
+    case emqx_resource_pool:start(InstId, ?MODULE, Options) of
+        ok ->
+            {ok, init_prepare(State)};
+        {error, Reason} ->
+            ?tp(
+                oracle_connector_start_failed,
+                #{error => Reason}
+            ),
+            {error, Reason}
+    end.
+
+on_stop(InstId, #{pool_name := PoolName}) ->
+    ?SLOG(info, #{
+        msg => "stopping_oracle_connector",
+        connector => InstId
+    }),
+    ?tp(oracle_bridge_stopped, #{instance_id => InstId}),
+    emqx_resource_pool:stop(PoolName).
+
+on_query(InstId, {TypeOrKey, NameOrSQL}, #{pool_name := _PoolName} = State) ->
+    on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);
+on_query(
+    InstId,
+    {TypeOrKey, NameOrSQL, Params},
+    #{pool_name := PoolName} = State
+) ->
+    ?SLOG(debug, #{
+        msg => "oracle database connector received sql query",
+        connector => InstId,
+        type => TypeOrKey,
+        sql => NameOrSQL,
+        state => State
+    }),
+    Type = query,
+    {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
+    Res = on_sql_query(InstId, PoolName, Type, ?SYNC_QUERY_MODE, NameOrSQL2, Data),
+    handle_result(Res).
+
+on_batch_query(
+    InstId,
+    BatchReq,
+    #{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State
+) ->
+    case BatchReq of
+        [{Key, _} = Request | _] ->
+            BinKey = to_bin(Key),
+            case maps:get(BinKey, Tokens, undefined) of
+                undefined ->
+                    Log = #{
+                        connector => InstId,
+                        first_request => Request,
+                        state => State,
+                        msg => "batch prepare not implemented"
+                    },
+                    ?SLOG(error, Log),
+                    {error, {unrecoverable_error, batch_prepare_not_implemented}};
+                TokenList ->
+                    {_, Datas} = lists:unzip(BatchReq),
+                    Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
+                    St = maps:get(BinKey, Sts),
+                    case
+                        on_sql_query(InstId, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2)
+                    of
+                        {ok, Results} ->
+                            handle_batch_result(Results, 0);
+                        Result ->
+                            Result
+                    end
+            end;
+        _ ->
+            Log = #{
+                connector => InstId,
+                request => BatchReq,
+                state => State,
+                msg => "invalid request"
+            },
+            ?SLOG(error, Log),
+            {error, {unrecoverable_error, invalid_request}}
+    end.
+
+proc_sql_params(query, SQLOrKey, Params, _State) ->
+    {SQLOrKey, Params};
+proc_sql_params(TypeOrKey, SQLOrData, Params, #{
+    params_tokens := ParamsTokens, prepare_sql := PrepareSql
+}) ->
+    Key = to_bin(TypeOrKey),
+    case maps:get(Key, ParamsTokens, undefined) of
+        undefined ->
+            {SQLOrData, Params};
+        Tokens ->
+            case maps:get(Key, PrepareSql, undefined) of
+                undefined ->
+                    {SQLOrData, Params};
+                Sql ->
+                    {Sql, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
+            end
+    end.
+
+on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL, Data) ->
+    case ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, ApplyMode) of
+        {error, Reason} = Result ->
+            ?tp(
+                oracle_connector_query_return,
+                #{error => Reason}
+            ),
+            ?SLOG(error, #{
+                msg => "oracle database connector do sql query failed",
+                connector => InstId,
+                type => Type,
+                sql => NameOrSQL,
+                reason => Reason
+            }),
+            Result;
+        Result ->
+            ?tp(
+                oracle_connector_query_return,
+                #{result => Result}
+            ),
+            Result
+    end.
+
+on_get_status(_InstId, #{pool_name := Pool} = State) ->
+    case emqx_resource_pool:health_check_workers(Pool, fun ?MODULE:do_get_status/1) of
+        true ->
+            case do_check_prepares(State) of
+                ok ->
+                    connected;
+                {ok, NState} ->
+                    %% return new state with prepared statements
+                    {connected, NState}
+            end;
+        false ->
+            disconnected
+    end.
+
+do_get_status(Conn) ->
+    ok == element(1, jamdb_oracle:sql_query(Conn, "select 1 from dual")).
+
+do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
+    ok;
+do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) ->
+    {ok, Sts} = prepare_sql(Prepares, PoolName),
+    {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}.
+
+%% ===================================================================
+
+oracle_host_options() ->
+    ?ORACLE_HOST_OPTIONS.
+
+connect(Opts) ->
+    Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
+    NewOpts = lists:keyreplace(password, 1, Opts, {password, Password}),
+    jamdb_oracle:start_link(NewOpts).
+
+sql_query_to_str(SqlQuery) ->
+    emqx_plugin_libs_rule:str(SqlQuery).
+
+sql_params_to_str(Params) when is_list(Params) ->
+    lists:map(
+        fun
+            (false) -> "0";
+            (true) -> "1";
+            (Value) -> emqx_plugin_libs_rule:str(Value)
+        end,
+        Params
+    ).
+
+query(Conn, SQL, Params) ->
+    Ret = jamdb_oracle:sql_query(Conn, {sql_query_to_str(SQL), sql_params_to_str(Params)}),
+    ?tp(oracle_query, #{conn => Conn, sql => SQL, params => Params, result => Ret}),
+    handle_result(Ret).
+
+execute_batch(Conn, SQL, ParamsList) ->
+    ParamsListStr = lists:map(fun sql_params_to_str/1, ParamsList),
+    Ret = jamdb_oracle:sql_query(Conn, {batch, sql_query_to_str(SQL), ParamsListStr}),
+    ?tp(oracle_batch_query, #{conn => Conn, sql => SQL, params => ParamsList, result => Ret}),
+    handle_result(Ret).
+
+parse_prepare_sql(Config) ->
+    SQL =
+        case maps:get(prepare_statement, Config, undefined) of
+            undefined ->
+                case maps:get(sql, Config, undefined) of
+                    undefined -> #{};
+                    Template -> #{<<"send_message">> => Template}
+                end;
+            Any ->
+                Any
+        end,
+    parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
+
+parse_prepare_sql([{Key, H} | T], Prepares, Tokens) ->
+    {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H, ':n'),
+    parse_prepare_sql(
+        T, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}
+    );
+parse_prepare_sql([], Prepares, Tokens) ->
+    #{
+        prepare_sql => Prepares,
+        params_tokens => Tokens
+    }.
+
+init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) ->
+    {ok, Sts} = prepare_sql(Prepares, PoolName),
+    State#{prepare_statement := Sts}.
+
+prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
+    prepare_sql(maps:to_list(Prepares), PoolName);
+prepare_sql(Prepares, PoolName) ->
+    Data = do_prepare_sql(Prepares, PoolName),
+    {ok, _Sts} = Data,
+    ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
+    Data.
+
+do_prepare_sql(Prepares, PoolName) ->
+    do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}).
+
+do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) ->
+    {ok, Conn} = ecpool_worker:client(Worker),
+    {ok, Sts} = prepare_sql_to_conn(Conn, Prepares),
+    do_prepare_sql(T, Prepares, PoolName, Sts);
+do_prepare_sql([], _Prepares, _PoolName, LastSts) ->
+    {ok, LastSts}.
+
+prepare_sql_to_conn(Conn, Prepares) ->
+    prepare_sql_to_conn(Conn, Prepares, #{}).
+
+prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements};
+prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) ->
+    LogMeta = #{msg => "Oracle Database Prepare Statement", name => Key, prepare_sql => SQL},
+    ?SLOG(info, LogMeta),
+    prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => SQL}).
+
+to_bin(Bin) when is_binary(Bin) ->
+    Bin;
+to_bin(Atom) when is_atom(Atom) ->
+    erlang:atom_to_binary(Atom).
+
+handle_result({error, disconnected}) ->
+    {error, {recoverable_error, disconnected}};
+handle_result({error, Error}) ->
+    {error, {unrecoverable_error, Error}};
+handle_result({error, socket, closed} = Error) ->
+    {error, {recoverable_error, Error}};
+handle_result({error, Type, Reason}) ->
+    {error, {unrecoverable_error, {Type, Reason}}};
+handle_result(Res) ->
+    Res.
+
+handle_batch_result([{affected_rows, RowCount} | Rest], Acc) ->
+    handle_batch_result(Rest, Acc + RowCount);
+handle_batch_result([{proc_result, RetCode, _Rows} | Rest], Acc) when RetCode =:= 0 ->
+    handle_batch_result(Rest, Acc);
+handle_batch_result([{proc_result, RetCode, Reason} | _Rest], _Acc) ->
+    {error, {unrecoverable_error, {RetCode, Reason}}};
+handle_batch_result([], Acc) ->
+    {ok, Acc}.

+ 33 - 0
apps/emqx_oracle/src/emqx_oracle_schema.erl

@@ -0,0 +1,33 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_oracle_schema).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-define(REF_MODULE, emqx_oracle).
+
+%% Hocon config schema exports
+-export([
+    roots/0,
+    fields/1
+]).
+
+roots() ->
+    [{config, #{type => hoconsc:ref(?REF_MODULE, config)}}].
+
+fields(config) ->
+    [{server, server()}, {sid, fun sid/1}] ++
+        emqx_connector_schema_lib:relational_db_fields() ++
+        emqx_connector_schema_lib:prepare_statement_fields().
+
+server() ->
+    Meta = #{desc => ?DESC(?REF_MODULE, "server")},
+    emqx_schema:servers_sc(Meta, (?REF_MODULE):oracle_host_options()).
+
+sid(type) -> binary();
+sid(desc) -> ?DESC(?REF_MODULE, "sid");
+sid(required) -> true;
+sid(_) -> undefined.

+ 8 - 3
apps/emqx_plugin_libs/src/emqx_placeholder.erl

@@ -69,7 +69,7 @@
 
 -type preproc_sql_opts() :: #{
     placeholders => list(binary()),
-    replace_with => '?' | '$n',
+    replace_with => '?' | '$n' | ':n',
     strip_double_quote => boolean()
 }.
 
@@ -149,7 +149,7 @@ proc_cmd(Tokens, Data, Opts) ->
 preproc_sql(Sql) ->
     preproc_sql(Sql, '?').
 
--spec preproc_sql(binary(), '?' | '$n' | preproc_sql_opts()) ->
+-spec preproc_sql(binary(), '?' | '$n' | ':n' | preproc_sql_opts()) ->
     {prepare_statement_key(), tmpl_token()}.
 preproc_sql(Sql, ReplaceWith) when is_atom(ReplaceWith) ->
     preproc_sql(Sql, #{replace_with => ReplaceWith});
@@ -316,13 +316,17 @@ preproc_tmpl_deep_map_key(Key, _) ->
 replace_with(Tmpl, RE, '?') ->
     re:replace(Tmpl, RE, "?", [{return, binary}, global]);
 replace_with(Tmpl, RE, '$n') ->
+    replace_with(Tmpl, RE, <<"$">>);
+replace_with(Tmpl, RE, ':n') ->
+    replace_with(Tmpl, RE, <<":">>);
+replace_with(Tmpl, RE, String) when is_binary(String) ->
     Parts = re:split(Tmpl, RE, [{return, binary}, trim, group]),
     {Res, _} =
         lists:foldl(
             fun
                 ([Tkn, _Phld], {Acc, Seq}) ->
                     Seq1 = erlang:integer_to_binary(Seq),
-                    {<<Acc/binary, Tkn/binary, "$", Seq1/binary>>, Seq + 1};
+                    {<<Acc/binary, Tkn/binary, String/binary, Seq1/binary>>, Seq + 1};
                 ([Tkn], {Acc, Seq}) ->
                     {<<Acc/binary, Tkn/binary>>, Seq}
             end,
@@ -330,6 +334,7 @@ replace_with(Tmpl, RE, '$n') ->
             Parts
         ),
     Res.
+
 parse_nested(<<".", R/binary>>) ->
     %% ignore the root .
     parse_nested(R);

+ 1 - 1
apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_plugin_libs, [
     {description, "EMQX Plugin utility libs"},
-    {vsn, "4.3.9"},
+    {vsn, "4.3.10"},
     {modules, []},
     {applications, [kernel, stdlib]},
     {env, []}

+ 1 - 2
apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl

@@ -105,9 +105,8 @@ proc_cmd(Tokens, Data, Opts) ->
 preproc_sql(Sql) ->
     emqx_placeholder:preproc_sql(Sql).
 
--spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n') ->
+-spec preproc_sql(Sql :: binary(), ReplaceWith :: '?' | '$n' | ':n') ->
     {prepare_statement_key(), tmpl_token()}.
-
 preproc_sql(Sql, ReplaceWith) ->
     emqx_placeholder:preproc_sql(Sql, ReplaceWith).
 

+ 1 - 0
changes/ee/feat-10498.en.md

@@ -0,0 +1 @@
+Implement Oracle Database Bridge, which supports publishing messages to Oracle Database from MQTT topics.

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src

@@ -1,6 +1,6 @@
 {application, emqx_ee_bridge, [
     {description, "EMQX Enterprise data bridges"},
-    {vsn, "0.1.11"},
+    {vsn, "0.1.12"},
     {registered, []},
     {applications, [
         kernel,

+ 14 - 3
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -37,7 +37,8 @@ api_schemas(Method) ->
         ref(emqx_ee_bridge_rocketmq, Method),
         ref(emqx_ee_bridge_sqlserver, Method),
         ref(emqx_bridge_opents, Method),
-        ref(emqx_bridge_pulsar, Method ++ "_producer")
+        ref(emqx_bridge_pulsar, Method ++ "_producer"),
+        ref(emqx_bridge_oracle, Method)
     ].
 
 schema_modules() ->
@@ -59,7 +60,8 @@ schema_modules() ->
         emqx_ee_bridge_rocketmq,
         emqx_ee_bridge_sqlserver,
         emqx_bridge_opents,
-        emqx_bridge_pulsar
+        emqx_bridge_pulsar,
+        emqx_bridge_oracle
     ].
 
 examples(Method) ->
@@ -100,7 +102,8 @@ resource_type(dynamo) -> emqx_ee_connector_dynamo;
 resource_type(rocketmq) -> emqx_ee_connector_rocketmq;
 resource_type(sqlserver) -> emqx_ee_connector_sqlserver;
 resource_type(opents) -> emqx_bridge_opents_connector;
-resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer.
+resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer;
+resource_type(oracle) -> emqx_oracle.
 
 fields(bridges) ->
     [
@@ -167,6 +170,14 @@ fields(bridges) ->
                     desc => <<"OpenTSDB Bridge Config">>,
                     required => false
                 }
+            )},
+        {oracle,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_oracle, "config")),
+                #{
+                    desc => <<"Oracle Bridge Config">>,
+                    required => false
+                }
             )}
     ] ++ kafka_structs() ++ pulsar_structs() ++ mongodb_structs() ++ influxdb_structs() ++
         redis_structs() ++

+ 5 - 1
mix.exs

@@ -170,7 +170,9 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_bridge_rocketmq,
       :emqx_bridge_tdengine,
       :emqx_bridge_timescale,
-      :emqx_bridge_pulsar
+      :emqx_bridge_pulsar,
+      :emqx_oracle,
+      :emqx_bridge_oracle
     ])
   end
 
@@ -377,6 +379,8 @@ defmodule EMQXUmbrella.MixProject do
           emqx_bridge_rocketmq: :permanent,
           emqx_bridge_tdengine: :permanent,
           emqx_bridge_timescale: :permanent,
+          emqx_oracle: :permanent,
+          emqx_bridge_oracle: :permanent,
           emqx_ee_schema_registry: :permanent
         ],
         else: []

+ 4 - 0
rebar.config.erl

@@ -94,6 +94,8 @@ is_community_umbrella_app("apps/emqx_bridge_redis") -> false;
 is_community_umbrella_app("apps/emqx_bridge_rocketmq") -> false;
 is_community_umbrella_app("apps/emqx_bridge_tdengine") -> false;
 is_community_umbrella_app("apps/emqx_bridge_timescale") -> false;
+is_community_umbrella_app("apps/emqx_bridge_oracle") -> false;
+is_community_umbrella_app("apps/emqx_oracle") -> false;
 is_community_umbrella_app(_) -> true.
 
 is_jq_supported() ->
@@ -470,6 +472,8 @@ relx_apps_per_edition(ee) ->
         emqx_bridge_rocketmq,
         emqx_bridge_tdengine,
         emqx_bridge_timescale,
+        emqx_oracle,
+        emqx_bridge_oracle,
         emqx_ee_schema_registry
     ];
 relx_apps_per_edition(ce) ->

+ 52 - 0
rel/i18n/emqx_bridge_oracle.hocon

@@ -0,0 +1,52 @@
+emqx_bridge_oracle {
+
+  local_topic {
+    desc = "The MQTT topic filter to be forwarded to Oracle Database. All MQTT 'PUBLISH' messages with the topic"
+           " matching the local_topic will be forwarded.</br>"
+           "NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is"
+           " configured, then both the data got from the rule and the MQTT messages that match local_topic"
+           " will be forwarded."
+    label = "Local Topic"
+  }
+
+  sql_template {
+    desc = "SQL Template. The template string can contain placeholders"
+           " for message metadata and payload field. The placeholders are inserted"
+           " without any checking and special formatting, so it is important to"
+           " ensure that the inserted values are formatted and escaped correctly."
+    label = "SQL Template"
+  }
+
+  server {
+    desc = "The IPv4 or IPv6 address or the hostname to connect to.<br/>"
+           "A host entry has the following form: `Host[:Port]`.<br/>"
+           "The Oracle Database default port 1521 is used if `[:Port]` is not specified."
+    label = "Server Host"
+  }
+
+  sid {
+    desc = "Sid for Oracle Database"
+    label = "Oracle Database Sid."
+  }
+
+  config_enable {
+    desc = "Enable or disable this bridge"
+    label = "Enable Or Disable Bridge"
+  }
+
+  desc_config {
+    desc = "Configuration for an Oracle Database bridge."
+    label = "Oracle Database Bridge Configuration"
+  }
+
+  desc_type {
+    desc = "The Bridge Type"
+    label = "Bridge Type"
+  }
+
+  desc_name {
+    desc = "Bridge name."
+    label = "Bridge Name"
+  }
+
+}

+ 15 - 0
rel/i18n/emqx_oracle.hocon

@@ -0,0 +1,15 @@
+emqx_oracle {
+
+  server {
+    desc = "The IPv4 or IPv6 address or the hostname to connect to.<br/>"
+           "A host entry has the following form: `Host[:Port]`.<br/>"
+           "The Oracle Database default port 1521 is used if `[:Port]` is not specified."
+    label = "Server Host"
+  }
+
+  sid {
+    desc = "Sid for Oracle Database."
+    label = "Oracle Database Sid"
+  }
+
+}

+ 51 - 0
rel/i18n/zh/emqx_bridge_oracle.hocon

@@ -0,0 +1,51 @@
+emqx_bridge_oracle {
+
+  local_topic {
+    desc = "发送到 'local_topic' 的消息都会转发到 Oracle Database。 </br>"
+           "注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。"
+    label = "本地 Topic"
+  }
+
+  sql_template {
+    desc = "SQL模板。模板字符串可以包含消息元数据和有效载荷字段的占位符。占位符"
+           "的插入不需要任何检查和特殊格式化,因此必须确保插入的数值格式化和转义正确。模板字符串可以包含占位符"
+           "模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入"
+           "所以必须确保插入的值的格式正确。因此,确保插入的值格式化和转义正确是非常重要的。模板字符串可以包含占位符"
+           "模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入"
+           "所以必须确保插入的值的格式正确。确保插入的值被正确地格式化和转义。"
+    label = "SQL 模板"
+  }
+
+  server {
+    desc = "将要连接的 IPv4 或 IPv6 地址,或者主机名。<br/>"
+           "主机名具有以下形式:`Host[:Port]`。<br/>"
+           "如果未指定 `[:Port]`,则使用 Oracle Database 默认端口 1521。"
+    label = "服务器地址"
+  }
+
+  sid {
+    desc = "Oracle Database Sid 名称"
+    label = "Oracle Database Sid"
+  }
+
+  config_enable {
+    desc = "启用/禁用桥接"
+    label = "启用/禁用桥接"
+  }
+
+  desc_config {
+    desc = "Oracle Database 桥接配置"
+    label = "Oracle Database 桥接配置"
+  }
+
+  desc_type {
+    desc = "Bridge 类型"
+    label = "桥接类型"
+  }
+
+  desc_name {
+    desc = "桥接名字"
+    label = "桥接名字"
+  }
+
+}

+ 15 - 0
rel/i18n/zh/emqx_oracle.hocon

@@ -0,0 +1,15 @@
+emqx_oracle {
+
+  server {
+    desc = "将要连接的 IPv4 或 IPv6 地址,或者主机名。<br/>"
+           "主机名具有以下形式:`Host[:Port]`。<br/>"
+           "如果未指定 `[:Port]`,则使用 Oracle Database 默认端口 1521。"
+    label = "服务器地址"
+  }
+
+  sid {
+    desc = "Oracle Database Sid 名称"
+    label = "Oracle Database Sid"
+  }
+
+}

+ 3 - 0
scripts/ct/run.sh

@@ -193,6 +193,9 @@ for dep in ${CT_DEPS}; do
             ;;
         pulsar)
             FILES+=( '.ci/docker-compose-file/docker-compose-pulsar.yaml' )
+            ;; 
+        oracle)
+            FILES+=( '.ci/docker-compose-file/docker-compose-oracle.yaml' )
             ;;
         *)
             echo "unknown_ct_dependency $dep"