Przeglądaj źródła

feat: support elasticsearch bridge

zhongwencool 2 lat temu
rodzic
commit
e49d3ca50c

+ 2 - 1
apps/emqx_bridge/src/emqx_action_info.erl

@@ -87,7 +87,8 @@ hard_coded_action_info_modules_ee() ->
         emqx_bridge_syskeeper_action_info,
         emqx_bridge_timescale_action_info,
         emqx_bridge_redis_action_info,
-        emqx_bridge_iotdb_action_info
+        emqx_bridge_iotdb_action_info,
+        emqx_bridge_es_action_info
     ].
 -else.
 hard_coded_action_info_modules_ee() ->

+ 19 - 0
apps/emqx_bridge_es/.gitignore

@@ -0,0 +1,19 @@
+.rebar3
+ _*
+ .eunit
+ *.o
+ *.beam
+ *.plt
+ *.swp
+ *.swo
+ .erlang.cookie
+ ebin
+ log
+ erl_crash.dump
+ .rebar
+ logs
+ _build
+ .idea
+ *.iml
+ rebar3.crashdump
+ *~

+ 94 - 0
apps/emqx_bridge_es/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2027-02-01
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 23 - 0
apps/emqx_bridge_es/README.md

@@ -0,0 +1,23 @@
+# Apache ElasticSearch Data Integration Bridge
+
+This application houses the ElasticSearch data integration bridge for EMQX Enterprise
+ Edition. It provides the means to connect to ElasticSearch and publish messages to it.
+
+It implements the connection management and interaction without need for a
+ separate connector app, since it's not used by authentication and authorization
+ applications.
+
+<!---
+# Configurations
+
+Please see [our official
+ documentation](https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-elasticsearch.html
+ for more detailed info.
+--->
+
+# Contributing
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+# License
+
+See [BSL](./BSL.txt).

+ 1 - 0
apps/emqx_bridge_es/docker-ct

@@ -0,0 +1 @@
+toxiproxy

+ 0 - 0
apps/emqx_bridge_es/etc/emqx_bridge_es.conf


+ 8 - 0
apps/emqx_bridge_es/include/emqx_bridge_es.hrl

@@ -0,0 +1,8 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-ifndef(EMQX_BRIDGE_ES_HRL).
+-define(EMQX_BRIDGE_ES_HRL, true).
+
+-endif.

+ 15 - 0
apps/emqx_bridge_es/rebar.config

@@ -0,0 +1,15 @@
+%% -*- mode: erlang -*-
+
+{erl_opts, [
+    debug_info
+]}.
+
+{deps, [
+    {emqx, {path, "../../apps/emqx"}},
+    {emqx_connector, {path, "../../apps/emqx_connector"}},
+    {emqx_resource, {path, "../../apps/emqx_resource"}},
+    {emqx_bridge, {path, "../../apps/emqx_bridge"}},
+    {emqx_bridge_http, {path, "../emqx_bridge_http"}}
+]}.
+{plugins, [rebar3_path_deps]}.
+{project_plugins, [erlfmt]}.

+ 23 - 0
apps/emqx_bridge_es/src/emqx_bridge_es.app.src

@@ -0,0 +1,23 @@
+%% -*- mode: erlang -*-
+{application, emqx_bridge_es, [
+    {description, "EMQX Enterprise Elastic Search Bridge"},
+    {vsn, "0.1.0"},
+    {modules, [
+        emqx_bridge_es,
+        emqx_bridge_es_connector
+    ]},
+    {registered, []},
+    {applications, [
+        kernel,
+        stdlib,
+        emqx_resource,
+        emqx_connector
+    ]},
+    {env, []},
+    {licenses, ["Business Source License 1.1"]},
+    {maintainers, ["EMQX Team <contact@emqx.io>"]},
+    {links, [
+        {"Homepage", "https://emqx.io/"},
+        {"Github", "https://github.com/emqx/emqx"}
+    ]}
+]}.

+ 312 - 0
apps/emqx_bridge_es/src/emqx_bridge_es.erl

@@ -0,0 +1,312 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_es).
+
+-include("emqx_bridge_es.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-export([bridge_v2_examples/1]).
+
+%% hocon_schema API
+-export([namespace/0, roots/0, fields/1, desc/1]).
+
+-define(CONNECTOR_TYPE, elasticsearch).
+-define(ACTION_TYPE, ?CONNECTOR_TYPE).
+
+namespace() -> "bridge_elasticsearch".
+
+roots() -> [].
+
+fields(action) ->
+    {elasticsearch,
+        ?HOCON(
+            ?MAP(action_name, ?R_REF(action_config)),
+            #{
+                desc => <<"ElasticSearch Action Config">>,
+                required => false
+            }
+        )};
+fields(action_config) ->
+    emqx_resource_schema:override(
+        emqx_bridge_v2_schema:make_producer_action_schema(
+            ?HOCON(
+                ?R_REF(action_parameters),
+                #{
+                    required => true, desc => ?DESC("action_parameters")
+                }
+            )
+        ),
+        [
+            {resource_opts,
+                ?HOCON(?R_REF(action_resource_opts), #{
+                    default => #{},
+                    desc => ?DESC(emqx_resource_schema, "resource_opts")
+                })}
+        ]
+    );
+fields(action_resource_opts) ->
+    lists:filter(
+        fun({K, _V}) ->
+            not lists:member(K, unsupported_opts())
+        end,
+        emqx_bridge_v2_schema:resource_opts_fields()
+    );
+fields(action_parameters) ->
+    [
+        {target,
+            ?HOCON(
+                binary(),
+                #{
+                    desc => ?DESC("config_target"),
+                    required => false
+                }
+            )},
+        {require_alias,
+            ?HOCON(
+                boolean(),
+                #{
+                    required => false,
+                    default => false,
+                    desc => ?DESC("config_require_alias")
+                }
+            )},
+        {routing,
+            ?HOCON(
+                binary(),
+                #{
+                    required => false,
+                    desc => ?DESC("config_routing")
+                }
+            )},
+        {wait_for_active_shards,
+            ?HOCON(
+                ?UNION([pos_integer(), all]),
+                #{
+                    required => false,
+                    desc => ?DESC("config_wait_for_active_shards")
+                }
+            )},
+        {data,
+            ?HOCON(
+                ?ARRAY(
+                    ?UNION(
+                        [
+                            ?R_REF(create),
+                            ?R_REF(delete),
+                            ?R_REF(index),
+                            ?R_REF(update)
+                        ]
+                    )
+                ),
+                #{
+                    desc => ?DESC("action_parameters_data")
+                }
+            )}
+    ] ++
+        lists:filter(
+            fun({K, _}) ->
+                not lists:member(K, [path, method, body, headers, request_timeout])
+            end,
+            emqx_bridge_http_schema:fields("parameters_opts")
+        );
+fields(Action) when Action =:= create; Action =:= index ->
+    [
+        {action,
+            ?HOCON(
+                Action,
+                #{
+                    desc => atom_to_binary(Action),
+                    required => true
+                }
+            )},
+        {'_index',
+            ?HOCON(
+                binary(),
+                #{
+                    required => false,
+                    desc => ?DESC("config_parameters_index")
+                }
+            )},
+        {'_id',
+            ?HOCON(
+                binary(),
+                #{
+                    required => false,
+                    desc => ?DESC("config_parameters_id")
+                }
+            )},
+        {require_alias,
+            ?HOCON(
+                binary(),
+                #{
+                    required => false,
+                    desc => ?DESC("config_parameters_require_alias")
+                }
+            )},
+        {fields,
+            ?HOCON(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("config_parameters_fields")
+                }
+            )}
+    ];
+fields(delete) ->
+    [
+        {action,
+            ?HOCON(
+                delete,
+                #{
+                    desc => <<"Delete">>,
+                    required => true
+                }
+            )},
+        {'_index',
+            ?HOCON(
+                binary(),
+                #{
+                    required => false,
+                    desc => ?DESC("config_parameters_index")
+                }
+            )},
+        {'_id',
+            ?HOCON(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("config_parameters_id")
+                }
+            )},
+        {require_alias,
+            ?HOCON(
+                binary(),
+                #{
+                    required => false,
+                    desc => ?DESC("config_parameters_require_alias")
+                }
+            )}
+    ];
+fields(update) ->
+    [
+        {action,
+            ?HOCON(
+                update,
+                #{
+                    desc => <<"Update">>,
+                    required => true
+                }
+            )},
+        {doc_as_upsert,
+            ?HOCON(
+                binary(),
+                #{
+                    required => false,
+                    desc => ?DESC("config_parameters_doc_as_upsert")
+                }
+            )},
+        {upsert,
+            ?HOCON(
+                binary(),
+                #{
+                    required => false,
+                    desc => ?DESC("config_parameters_upsert")
+                }
+            )},
+        {'_index',
+            ?HOCON(
+                binary(),
+                #{
+                    required => false,
+                    desc => ?DESC("config_parameters_index")
+                }
+            )},
+        {'_id',
+            ?HOCON(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("config_parameters_id")
+                }
+            )},
+        {require_alias,
+            ?HOCON(
+                binary(),
+                #{
+                    required => false,
+                    desc => ?DESC("config_parameters_require_alias")
+                }
+            )},
+        {fields,
+            ?HOCON(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("config_parameters_fields")
+                }
+            )}
+    ];
+fields("post_bridge_v2") ->
+    emqx_bridge_schema:type_and_name_fields(elasticsearch) ++ fields(action_config);
+fields("put_bridge_v2") ->
+    fields(action_config);
+fields("get_bridge_v2") ->
+    emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2").
+
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"elasticsearch">> =>
+                #{
+                    summary => <<"Elastic Search Bridge">>,
+                    value => emqx_bridge_v2_schema:action_values(
+                        Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
+                    )
+                }
+        }
+    ].
+
+action_values() ->
+    #{
+        parameters => #{
+            target => <<"${target_index}">>,
+            data => [
+                #{
+                    action => index,
+                    '_index' => <<"${index}">>,
+                    fields => <<"${fields}">>,
+                    require_alias => <<"${require_alias}">>
+                },
+                #{
+                    action => create,
+                    '_index' => <<"${index}">>,
+                    fields => <<"${fields}">>
+                },
+                #{
+                    action => delete,
+                    '_index' => <<"${index}">>,
+                    '_id' => <<"${id}">>
+                },
+                #{
+                    action => update,
+                    '_index' => <<"${index}">>,
+                    '_id' => <<"${id}">>,
+                    fields => <<"${fields}">>,
+                    require_alias => false,
+                    doc_as_upsert => <<"${doc_as_upsert}">>,
+                    upsert => <<"${upsert}">>
+                }
+            ]
+        }
+    }.
+
+unsupported_opts() ->
+    [
+        batch_size,
+        batch_time
+    ].
+
+desc(_) -> undefined.

+ 22 - 0
apps/emqx_bridge_es/src/emqx_bridge_es_action_info.erl

@@ -0,0 +1,22 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_es_action_info).
+
+-behaviour(emqx_action_info).
+
+-elvis([{elvis_style, invalid_dynamic_call, disable}]).
+
+%% behaviour callbacks
+-export([
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0
+]).
+
+-define(ACTION_TYPE, elasticsearch).
+
+action_type_name() -> ?ACTION_TYPE.
+connector_type_name() -> ?ACTION_TYPE.
+
+schema_module() -> emqx_bridge_es.

+ 498 - 0
apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl

@@ -0,0 +1,498 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_es_connector).
+
+-behaviour(emqx_resource).
+
+-include("emqx_bridge_es.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%% `emqx_resource' API
+-export([
+    callback_mode/0,
+    on_start/2,
+    on_stop/2,
+    on_get_status/2,
+    on_query/3,
+    on_query_async/4,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channels/1,
+    on_get_channel_status/3
+]).
+
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1,
+    connector_examples/1,
+    connector_example_values/0
+]).
+
+%% emqx_connector_resource behaviour callbacks
+-export([connector_config/2]).
+
+-type config() ::
+    #{
+        base_url := #{
+            scheme := http | https,
+            host := iolist(),
+            port := inet:port_number(),
+            path := _
+        },
+        connect_timeout := pos_integer(),
+        pool_type := random | hash,
+        pool_size := pos_integer(),
+        request => undefined | map(),
+        atom() => _
+    }.
+
+-type state() ::
+    #{
+        base_path := _,
+        connect_timeout := pos_integer(),
+        pool_type := random | hash,
+        channels := map(),
+        request => undefined | map(),
+        atom() => _
+    }.
+
+-type manager_id() :: binary().
+
+-define(CONNECTOR_TYPE, elasticsearch).
+
+%%-------------------------------------------------------------------------------------
+%% connector examples
+%%-------------------------------------------------------------------------------------
+connector_examples(Method) ->
+    [
+        #{
+            <<"elasticsearch">> =>
+                #{
+                    summary => <<"Elastic Search Connector">>,
+                    value => emqx_connector_schema:connector_values(
+                        Method, ?CONNECTOR_TYPE, connector_example_values()
+                    )
+                }
+        }
+    ].
+
+connector_example_values() ->
+    #{
+        name => <<"elasticsearch_connector">>,
+        type => elasticsearch,
+        enable => true,
+        authentication => #{
+            <<"username">> => <<"root">>,
+            <<"password">> => <<"******">>
+        },
+        base_url => <<"http://127.0.0.1:9200/">>,
+        connect_timeout => <<"15s">>,
+        pool_type => <<"random">>,
+        pool_size => 8,
+        enable_pipelining => 100,
+        ssl => #{enable => false}
+    }.
+
+%%-------------------------------------------------------------------------------------
+%% schema
+%%-------------------------------------------------------------------------------------
+namespace() -> "elasticsearch".
+
+roots() ->
+    [{config, #{type => ?R_REF(config)}}].
+
+fields(config) ->
+    lists:filter(
+        fun({K, _}) -> not lists:member(K, [url, request, retry_interval, headers]) end,
+        emqx_bridge_http_schema:fields("config_connector")
+    ) ++
+        fields("connection_fields");
+fields("connection_fields") ->
+    [
+        {base_url,
+            ?HOCON(
+                emqx_schema:url(),
+                #{
+                    required => true,
+                    desc => ?DESC(emqx_bridge_es, "config_base_url")
+                }
+            )},
+        {authentication,
+            ?HOCON(
+                ?UNION([?R_REF(auth_basic)]),
+                #{
+                    desc => ?DESC("config_authentication")
+                }
+            )}
+    ];
+fields(auth_basic) ->
+    [
+        {username,
+            ?HOCON(binary(), #{
+                required => true,
+                desc => ?DESC("config_auth_basic_username")
+            })},
+        {password,
+            emqx_schema_secret:mk(#{
+                required => true,
+                desc => ?DESC("config_auth_basic_password")
+            })}
+    ];
+fields("post") ->
+    emqx_connector_schema:type_and_name_fields(elasticsearch) ++ fields(config);
+fields("put") ->
+    fields(config);
+fields("get") ->
+    emqx_bridge_schema:status_fields() ++ fields("post").
+
+desc(config) ->
+    ?DESC("desc_config");
+desc(auth_basic) ->
+    "Basic Authentication";
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for Elastic Search using `", string:to_upper(Method), "` method."];
+desc(_) ->
+    undefined.
+
+connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) ->
+    #{
+        base_url := BaseUrl,
+        authentication :=
+            #{
+                username := Username,
+                password := Password0
+            }
+    } = Conf,
+
+    Password = emqx_secret:unwrap(Password0),
+    Base64 = base64:encode(<<Username/binary, ":", Password/binary>>),
+    BasicToken = <<"Basic ", Base64/binary>>,
+
+    WebhookConfig =
+        Conf#{
+            method => <<"post">>,
+            url => BaseUrl,
+            headers => [
+                {<<"Content-type">>, <<"application/json">>},
+                {<<"Authorization">>, BasicToken}
+            ]
+        },
+    ParseConfs(
+        <<"http">>,
+        Name,
+        WebhookConfig
+    ).
+
+%%-------------------------------------------------------------------------------------
+%% `emqx_resource' API
+%%-------------------------------------------------------------------------------------
+callback_mode() -> async_if_possible.
+
+-spec on_start(manager_id(), config()) -> {ok, state()} | no_return().
+on_start(InstanceId, Config) ->
+    case emqx_bridge_http_connector:on_start(InstanceId, Config) of
+        {ok, State} ->
+            ?SLOG(info, #{
+                msg => "elasticsearch_bridge_started",
+                instance_id => InstanceId,
+                request => maps:get(request, State, <<>>)
+            }),
+            ?tp(elasticsearch_bridge_started, #{instance_id => InstanceId}),
+            {ok, State#{channels => #{}}};
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "failed_to_start_elasticsearch_bridge",
+                instance_id => InstanceId,
+                request => maps:get(request, Config, <<>>),
+                reason => Reason
+            }),
+            throw(failed_to_start_elasticsearch_bridge)
+    end.
+
+-spec on_stop(manager_id(), state()) -> ok | {error, term()}.
+on_stop(InstanceId, State) ->
+    ?SLOG(info, #{
+        msg => "stopping_elasticsearch_bridge",
+        connector => InstanceId
+    }),
+    Res = emqx_bridge_http_connector:on_stop(InstanceId, State),
+    ?tp(elasticsearch_bridge_stopped, #{instance_id => InstanceId}),
+    Res.
+
+-spec on_get_status(manager_id(), state()) ->
+    {connected, state()} | {disconnected, state(), term()}.
+on_get_status(InstanceId, State) ->
+    emqx_bridge_http_connector:on_get_status(InstanceId, State).
+
+-spec on_query(manager_id(), tuple(), state()) ->
+    {ok, pos_integer(), [term()], term()}
+    | {ok, pos_integer(), [term()]}
+    | {error, term()}.
+on_query(InstanceId, {ChannelId, Msg} = Req, #{channels := Channels} = State) ->
+    ?tp(elasticsearch_bridge_on_query, #{instance_id => InstanceId}),
+    ?SLOG(debug, #{
+        msg => "elasticsearch_bridge_on_query_called",
+        instance_id => InstanceId,
+        send_message => Req,
+        state => emqx_utils:redact(State)
+    }),
+    case try_render_message(Req, Channels) of
+        {ok, Body} ->
+            handle_response(
+                emqx_bridge_http_connector:on_query(
+                    InstanceId, {ChannelId, {Msg, Body}}, State
+                )
+            );
+        Error ->
+            Error
+    end.
+
+-spec on_query_async(manager_id(), tuple(), {function(), [term()]}, state()) ->
+    {ok, pid()} | {error, empty_request}.
+on_query_async(
+    InstanceId, {ChannelId, Msg} = Req, ReplyFunAndArgs0, #{channels := Channels} = State
+) ->
+    ?tp(elasticsearch_bridge_on_query_async, #{instance_id => InstanceId}),
+    ?SLOG(debug, #{
+        msg => "elasticsearch_bridge_on_query_async_called",
+        instance_id => InstanceId,
+        send_message => Req,
+        state => emqx_utils:redact(State)
+    }),
+    case try_render_message(Req, Channels) of
+        {ok, Payload} ->
+            ReplyFunAndArgs =
+                {
+                    fun(Result) ->
+                        Response = handle_response(Result),
+                        emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response)
+                    end,
+                    []
+                },
+            emqx_bridge_http_connector:on_query_async(
+                InstanceId, {ChannelId, {Msg, Payload}}, ReplyFunAndArgs, State
+            );
+        Error ->
+            Error
+    end.
+
+on_add_channel(
+    InstanceId,
+    #{channels := Channels} = State0,
+    ChannelId,
+    #{parameters := Parameter}
+) ->
+    case maps:is_key(ChannelId, Channels) of
+        true ->
+            {error, already_exists};
+        _ ->
+            #{data := Data} = Parameter,
+            Parameter1 = Parameter#{path => path(Parameter), method => <<"post">>},
+            {ok, State} = emqx_bridge_http_connector:on_add_channel(
+                InstanceId, State0, ChannelId, #{parameters => Parameter1}
+            ),
+            case preproc_data_template(Data) of
+                [] ->
+                    {error, invalid_data};
+                DataTemplate ->
+                    Channel = Parameter1#{data => DataTemplate},
+                    Channels2 = Channels#{ChannelId => Channel},
+                    {ok, State#{channels => Channels2}}
+            end
+    end.
+
+on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) ->
+    {ok, OldState} = emqx_bridge_http_connector:on_remove_channel(InstanceId, OldState0, ChannelId),
+    Channels2 = maps:remove(ChannelId, Channels),
+    {ok, OldState#{channels => Channels2}}.
+
+on_get_channels(InstanceId) ->
+    emqx_bridge_v2:get_channels_for_connector(InstanceId).
+
+on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
+    case maps:is_key(ChannelId, Channels) of
+        true ->
+            connected;
+        _ ->
+            {error, not_exists}
+    end.
+
+%%--------------------------------------------------------------------
+%% Internal Functions
+%%--------------------------------------------------------------------
+path(Param) ->
+    Target = maps:get(target, Param, undefined),
+    QString0 = maps:fold(
+        fun(K, V, Acc) ->
+            [[atom_to_list(K), "=", to_str(V)] | Acc]
+        end,
+        [["_source=false"], ["filter_path=items.*.error"]],
+        maps:with([require_alias, routing, wait_for_active_shards], Param)
+    ),
+    QString = "?" ++ lists:join("&", QString0),
+    target(Target) ++ QString.
+
+target(undefined) -> "/_bulk";
+target(Str) -> "/" ++ binary_to_list(Str) ++ "/_bulk".
+
+to_str(List) when is_list(List) -> List;
+to_str(false) -> "false";
+to_str(true) -> "true";
+to_str(Atom) when is_atom(Atom) -> atom_to_list(Atom).
+
+proc_data(DataList, Msg) when is_list(DataList) ->
+    [
+        begin
+            proc_data(Data, Msg)
+        end
+     || Data <- DataList
+    ];
+proc_data(
+    #{
+        action := Action,
+        '_index' := IndexT,
+        '_id' := IdT,
+        require_alias := RequiredAliasT,
+        fields := FieldsT
+    },
+    Msg
+) when Action =:= create; Action =:= index ->
+    [
+        emqx_utils_json:encode(
+            #{
+                Action => filter([
+                    {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)},
+                    {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)},
+                    {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)}
+                ])
+            }
+        ),
+        "\n",
+        emqx_placeholder:proc_tmpl(FieldsT, Msg),
+        "\n"
+    ];
+proc_data(
+    #{
+        action := delete,
+        '_index' := IndexT,
+        '_id' := IdT,
+        require_alias := RequiredAliasT
+    },
+    Msg
+) ->
+    [
+        emqx_utils_json:encode(
+            #{
+                delete => filter([
+                    {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)},
+                    {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)},
+                    {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)}
+                ])
+            }
+        ),
+        "\n"
+    ];
+proc_data(
+    #{
+        action := update,
+        '_index' := IndexT,
+        '_id' := IdT,
+        require_alias := RequiredAliasT,
+        doc_as_upsert := DocAsUpsert,
+        upsert := Upsert,
+        fields := FieldsT
+    },
+    Msg
+) ->
+    [
+        emqx_utils_json:encode(
+            #{
+                update => filter([
+                    {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)},
+                    {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)},
+                    {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)},
+                    {doc_as_upsert, emqx_placeholder:proc_tmpl(DocAsUpsert, Msg)},
+                    {upsert, emqx_placeholder:proc_tmpl(Upsert, Msg)}
+                ])
+            }
+        ),
+        "\n{\"doc\":",
+        emqx_placeholder:proc_tmpl(FieldsT, Msg),
+        "}\n"
+    ].
+
+filter(List) ->
+    Fun = fun
+        ({_K, V}) when V =:= undefined; V =:= <<"undefined">>; V =:= "undefined" ->
+            false;
+        ({_K, V}) when V =:= ""; V =:= <<>> ->
+            false;
+        ({_K, V}) when V =:= "false" -> {true, false};
+        ({_K, V}) when V =:= "true" -> {true, true};
+        ({_K, _V}) ->
+            true
+    end,
+    maps:from_list(lists:filtermap(Fun, List)).
+
+handle_response({ok, 200, _Headers, Body} = Resp) ->
+    eval_response_body(Body, Resp);
+handle_response({ok, 200, Body} = Resp) ->
+    eval_response_body(Body, Resp);
+handle_response({ok, Code, _Headers, Body}) ->
+    {error, #{code => Code, body => Body}};
+handle_response({ok, Code, Body}) ->
+    {error, #{code => Code, body => Body}};
+handle_response({error, _} = Error) ->
+    Error.
+
+eval_response_body(<<"{}">>, Resp) -> Resp;
+eval_response_body(Body, _Resp) -> {error, emqx_utils_json:decode(Body)}.
+
+preproc_data_template(DataList) when is_list(DataList) ->
+    [
+        begin
+            preproc_data_template(Data)
+        end
+     || Data <- DataList
+    ];
+preproc_data_template(#{action := create} = Data) ->
+    Index = maps:get('_index', Data, ""),
+    Id = maps:get('_id', Data, ""),
+    RequiredAlias = maps:get(require_alias, Data, ""),
+    Fields = maps:get(fields, Data, ""),
+    #{
+        action => create,
+        '_index' => emqx_placeholder:preproc_tmpl(Index),
+        '_id' => emqx_placeholder:preproc_tmpl(Id),
+        require_alias => emqx_placeholder:preproc_tmpl(RequiredAlias),
+        fields => emqx_placeholder:preproc_tmpl(Fields)
+    };
+preproc_data_template(#{action := index} = Data) ->
+    Data1 = preproc_data_template(Data#{action => create}),
+    Data1#{action => index};
+preproc_data_template(#{action := delete} = Data) ->
+    Data1 = preproc_data_template(Data#{action => create}),
+    Data2 = Data1#{action => delete},
+    maps:remove(fields, Data2);
+preproc_data_template(#{action := update} = Data) ->
+    Data1 = preproc_data_template(Data#{action => index}),
+    DocAsUpsert = maps:get(doc_as_upsert, Data, ""),
+    Upsert = maps:get(upsert, Data, ""),
+    Data1#{
+        action => update,
+        doc_as_upsert => emqx_placeholder:preproc_tmpl(DocAsUpsert),
+        upsert => emqx_placeholder:preproc_tmpl(Upsert)
+    }.
+
+try_render_message({ChannelId, Msg}, Channels) ->
+    case maps:find(ChannelId, Channels) of
+        {ok, #{data := Data}} ->
+            {ok, proc_data(Data, Msg)};
+        _ ->
+            {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
+    end.

+ 16 - 12
apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl

@@ -317,7 +317,7 @@ on_query(InstId, {send_message, Msg}, State) ->
 %% BridgeV2 entrypoint
 on_query(
     InstId,
-    {ActionId, Msg},
+    {ActionId, MsgAndBody},
     State = #{installed_actions := InstalledActions}
 ) when is_binary(ActionId) ->
     case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
@@ -334,10 +334,10 @@ on_query(
                 body := Body,
                 headers := Headers,
                 request_timeout := Timeout
-            } = process_request_and_action(Request, ActionState, Msg),
+            } = process_request_and_action(Request, ActionState, MsgAndBody),
             %% bridge buffer worker has retry, do not let ehttpc retry
             Retry = 2,
-            ClientId = maps:get(clientid, Msg, undefined),
+            ClientId = clientid(MsgAndBody),
             on_query(
                 InstId,
                 {ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
@@ -430,7 +430,7 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
 %% BridgeV2 entrypoint
 on_query_async(
     InstId,
-    {ActionId, Msg},
+    {ActionId, MsgAndBody},
     ReplyFunAndArgs,
     State = #{installed_actions := InstalledActions}
 ) when is_binary(ActionId) ->
@@ -448,8 +448,8 @@ on_query_async(
                 body := Body,
                 headers := Headers,
                 request_timeout := Timeout
-            } = process_request_and_action(Request, ActionState, Msg),
-            ClientId = maps:get(clientid, Msg, undefined),
+            } = process_request_and_action(Request, ActionState, MsgAndBody),
+            ClientId = clientid(MsgAndBody),
             on_query_async(
                 InstId,
                 {ClientId, Method, {Path, Headers, Body}, Timeout},
@@ -629,12 +629,9 @@ maybe_parse_template(Key, Conf) ->
 parse_template(String) ->
     emqx_template:parse(String).
 
-process_request_and_action(Request, ActionState, Msg) ->
+process_request_and_action(Request, ActionState, {Msg, Body}) ->
     MethodTemplate = maps:get(method, ActionState),
     Method = make_method(render_template_string(MethodTemplate, Msg)),
-    BodyTemplate = maps:get(body, ActionState),
-    Body = render_request_body(BodyTemplate, Msg),
-
     PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)),
     PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)),
 
@@ -656,7 +653,11 @@ process_request_and_action(Request, ActionState, Msg) ->
         body => Body,
         headers => Headers,
         request_timeout => maps:get(request_timeout, ActionState)
-    }.
+    };
+process_request_and_action(Request, ActionState, Msg) ->
+    BodyTemplate = maps:get(body, ActionState),
+    Body = render_request_body(BodyTemplate, Msg),
+    process_request_and_action(Request, ActionState, {Msg, Body}).
 
 merge_proplist(Proplist1, Proplist2) ->
     lists:foldl(
@@ -732,7 +733,7 @@ formalize_request(_Method, BasePath, {Path, Headers}) ->
 %% because an HTTP server may handle paths like
 %% "/a/b/c/", "/a/b/c" and "/a//b/c" differently.
 %%
-%% So we try to avoid unneccessary path normalization.
+%% So we try to avoid unnecessary path normalization.
 %%
 %% See also: `join_paths_test_/0`
 join_paths(Path1, Path2) ->
@@ -876,6 +877,9 @@ redact_request({Path, Headers}) ->
 redact_request({Path, Headers, _Body}) ->
     {Path, Headers, <<"******">>}.
 
+clientid({Msg, _Body}) -> clientid(Msg);
+clientid(Msg) -> maps:get(clientid, Msg, undefined).
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 

+ 6 - 3
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -90,7 +90,7 @@ connector_example_values() ->
         enable => true,
         authentication => #{
             <<"username">> => <<"root">>,
-            <<"password">> => <<"*****">>
+            <<"password">> => <<"******">>
         },
         base_url => <<"http://iotdb.local:18080/">>,
         connect_timeout => <<"15s">>,
@@ -109,7 +109,10 @@ roots() ->
     [{config, #{type => hoconsc:ref(?MODULE, config)}}].
 
 fields(config) ->
-    proplists_without([url, headers], emqx_bridge_http_schema:fields("config_connector")) ++
+    proplists_without(
+        [url, request, retry_interval, headers],
+        emqx_bridge_http_schema:fields("config_connector")
+    ) ++
         fields("connection_fields");
 fields("connection_fields") ->
     [
@@ -206,7 +209,7 @@ on_start(InstanceId, Config) ->
             ?SLOG(error, #{
                 msg => "failed_to_start_iotdb_bridge",
                 instance_id => InstanceId,
-                base_url => maps:get(request, Config, <<>>),
+                request => maps:get(request, Config, <<>>),
                 reason => Reason
             }),
             throw(failed_to_start_iotdb_bridge)

+ 16 - 2
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -50,6 +50,8 @@ resource_type(redis) ->
     emqx_bridge_redis_connector;
 resource_type(iotdb) ->
     emqx_bridge_iotdb_connector;
+resource_type(elasticsearch) ->
+    emqx_bridge_es_connector;
 resource_type(Type) ->
     error({unknown_connector_type, Type}).
 
@@ -62,6 +64,8 @@ connector_impl_module(confluent_producer) ->
     emqx_bridge_confluent_producer;
 connector_impl_module(iotdb) ->
     emqx_bridge_iotdb_connector;
+connector_impl_module(elasticsearch) ->
+    emqx_bridge_es_connector;
 connector_impl_module(_ConnectorType) ->
     undefined.
 
@@ -181,6 +185,14 @@ connector_structs() ->
                     desc => <<"IoTDB Connector Config">>,
                     required => false
                 }
+            )},
+        {elasticsearch,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_es_connector, config)),
+                #{
+                    desc => <<"Elastis Search Connector Config">>,
+                    required => false
+                }
             )}
     ].
 
@@ -199,7 +211,8 @@ schema_modules() ->
         emqx_bridge_timescale,
         emqx_postgresql_connector_schema,
         emqx_bridge_redis_schema,
-        emqx_bridge_iotdb_connector
+        emqx_bridge_iotdb_connector,
+        emqx_bridge_es_connector
     ].
 
 api_schemas(Method) ->
@@ -227,7 +240,8 @@ api_schemas(Method) ->
         api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
         api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"),
         api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"),
-        api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method)
+        api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method),
+        api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method)
     ].
 
 api_ref(Module, Type, Method) ->

+ 3 - 1
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -147,7 +147,9 @@ connector_type_to_bridge_types(syskeeper_proxy) ->
 connector_type_to_bridge_types(timescale) ->
     [timescale];
 connector_type_to_bridge_types(iotdb) ->
-    [iotdb].
+    [iotdb];
+connector_type_to_bridge_types(elasticsearch) ->
+    [elasticsearch].
 
 actions_config_name() -> <<"actions">>.
 

+ 1 - 0
apps/emqx_machine/priv/reboot_lists.eterm

@@ -99,6 +99,7 @@
             emqx_bridge_hstreamdb,
             emqx_bridge_influxdb,
             emqx_bridge_iotdb,
+            emqx_bridge_es,
             emqx_bridge_matrix,
             emqx_bridge_mongodb,
             emqx_bridge_mysql,

+ 1 - 1
apps/emqx_machine/src/emqx_machine.app.src

@@ -3,7 +3,7 @@
     {id, "emqx_machine"},
     {description, "The EMQX Machine"},
     % strict semver, bump manually!
-    {vsn, "0.2.17"},
+    {vsn, "0.2.18"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, emqx_ctl]},

+ 1 - 0
mix.exs

@@ -164,6 +164,7 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_bridge_hstreamdb,
       :emqx_bridge_influxdb,
       :emqx_bridge_iotdb,
+      :emqx_bridge_es,
       :emqx_bridge_matrix,
       :emqx_bridge_mongodb,
       :emqx_bridge_mysql,

+ 129 - 0
rel/i18n/emqx_bridge_es.hocon

@@ -0,0 +1,129 @@
+emqx_bridge_es {
+
+config_enable.desc:
+"""Enable or disable this bridge"""
+
+config_enable.label:
+"""Enable Or Disable Bridge"""
+
+config_authentication.desc:
+"""Authentication configuration"""
+
+config_authentication.label:
+"""Authentication"""
+
+auth_basic.desc:
+"""Parameters for basic authentication."""
+
+auth_basic.label:
+"""Basic auth params"""
+
+config_auth_basic_username.desc:
+"""The username as configured at the IoTDB REST interface"""
+
+config_auth_basic_username.label:
+  """HTTP Basic Auth Username"""
+
+config_auth_basic_password.desc:
+"""The password as configured at the IoTDB REST interface"""
+
+config_auth_basic_password.label:
+"""HTTP Basic Auth Password"""
+
+config_base_url.desc:
+"""The base URL of the external ElasticSearch service's REST interface."""
+config_base_url.label:
+"""ElasticSearch REST Service Base URL"""
+
+config_target.desc:
+"""Name of the data stream, index, or index alias to perform bulk actions on"""
+
+config_target.label:
+"""Target"""
+
+config_require_alias.desc:
+"""If true, the request’s actions must target an index alias. Defaults to false"""
+config_require_alias.label:
+"""Require Alias"""
+
+config_routing.desc:
+"""Custom value used to route operations to a specific shard."""
+config_routing.label:
+"""Routing"""
+
+config_wait_for_active_shards.desc:
+"""The number of shard copies that must be active before proceeding with the operation.
+Set to all or any positive integer up to the total number of shards in the index (number_of_replicas+1).
+Default: 1, the primary shard"""
+
+config_max_retries.desc:
+"""HTTP request max retry times if failed."""
+
+config_max_retries.label:
+"""HTTP Request Max Retries"""
+
+desc_config.desc:
+"""Configuration for Apache IoTDB bridge."""
+
+desc_config.label:
+"""IoTDB Bridge Configuration"""
+
+desc_name.desc:
+"""Bridge name, used as a human-readable description of the bridge."""
+
+desc_name.label:
+"""Bridge Name"""
+
+config_parameters_action.desc:
+"""TODO"""
+
+config_parameters_action.label:
+"""Action"""
+
+config_parameters_index.desc:
+"""Name of the data stream, index, or index alias to perform the action on.
+This parameter is required if a <target> is not specified in the request path."""
+
+config_parameters_index.label:
+"""_index"""
+
+config_parameters_id.desc:
+"""The document ID. If no ID is specified, a document ID is automatically generated."""
+config_parameters_id.label:
+"""_id"""
+
+config_parameters_require_alias.desc:
+"""If true, the action must target an index alias. Defaults to false."""
+config_parameters_require_alias.label:
+"""_require_alias"""
+
+config_parameters_fields.desc:
+"""The document source to index. Required for create and index operations."""
+config_parameters_fields.label:
+"""fields"""
+
+config_parameters_doc_as_upsert.desc:
+"""Instead of sending a partial doc plus an upsert doc, you can set doc_as_upsert to true
+to use the contents of doc as the upsert value."""
+config_parameters_doc_as_upsert.label:
+"""doc_as_upsert"""
+
+config_parameters_upsert.desc:
+"""If the document does not already exist, the contents of the upsert element are inserted as a new document."""
+config_parameters_upsert.label:
+"""upsert"""
+
+
+action_parameters_data.desc:
+"""ElasticSearch action parameter data"""
+
+action_parameters_data.label:
+"""Parameter Data"""
+
+action_parameters.desc:
+"""ElasticSearch action parameters"""
+
+action_parameters.label:
+"""Parameters"""
+
+}

+ 44 - 0
rel/i18n/emqx_bridge_es_connector.hocon

@@ -0,0 +1,44 @@
+emqx_bridge_es_connector {
+
+config_authentication.desc:
+"""Authentication configuration"""
+
+config_authentication.label:
+"""Authentication"""
+
+auth_basic.desc:
+"""Parameters for basic authentication."""
+
+auth_basic.label:
+"""Basic auth params"""
+
+config_auth_basic_username.desc:
+"""The username as configured at the ElasticSearch REST interface"""
+
+config_auth_basic_username.label:
+  """HTTP Basic Auth Username"""
+
+config_auth_basic_password.desc:
+"""The password as configured at the ElasticSearch REST interface"""
+
+config_auth_basic_password.label:
+"""HTTP Basic Auth Password"""
+
+config_base_url.desc:
+"""The base URL of the external ElasticSearch service's REST interface."""
+config_base_url.label:
+"""ElasticSearch REST Service Base URL"""
+
+config_max_retries.desc:
+"""HTTP request max retry times if failed."""
+
+config_max_retries.label:
+"""HTTP Request Max Retries"""
+
+desc_config.desc:
+"""Configuration for ElasticSearch bridge."""
+
+desc_config.label:
+"""ElasticSearch Bridge Configuration"""
+
+}