Przeglądaj źródła

Merge pull request #12295 from zhongwencool/elasticsearch

feat: support elasticsearch bridge
zhongwencool 2 lat temu
rodzic
commit
90c2a5445c

+ 2 - 1
apps/emqx_bridge/src/emqx_action_info.erl

@@ -87,7 +87,8 @@ hard_coded_action_info_modules_ee() ->
         emqx_bridge_syskeeper_action_info,
         emqx_bridge_timescale_action_info,
         emqx_bridge_redis_action_info,
-        emqx_bridge_iotdb_action_info
+        emqx_bridge_iotdb_action_info,
+        emqx_bridge_es_action_info
     ].
 -else.
 hard_coded_action_info_modules_ee() ->

+ 10 - 5
apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl

@@ -361,11 +361,16 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
         [] ->
             false;
         _ ->
-            {true, #{
-                schema_module => Module,
-                type_name => TypeName,
-                missing_fields => MissingFields
-            }}
+            %% elasticsearch is new and doesn't have local_topic
+            case MissingFields of
+                [local_topic] when Module =:= emqx_bridge_es -> false;
+                _ ->
+                    {true, #{
+                        schema_module => Module,
+                        type_name => TypeName,
+                        missing_fields => MissingFields
+                    }}
+            end
     end.
 
 -endif.

+ 94 - 0
apps/emqx_bridge_es/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2027-02-01
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 23 - 0
apps/emqx_bridge_es/README.md

@@ -0,0 +1,23 @@
+# Apache ElasticSearch Data Integration Bridge
+
+This application houses the ElasticSearch data integration bridge for EMQX Enterprise
+ Edition. It provides the means to connect to ElasticSearch and publish messages to it.
+
+It implements the connection management and interaction without need for a
+ separate connector app, since it's not used by authentication and authorization
+ applications.
+
+<!---
+# Configurations
+
+Please see [our official
+ documentation](https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-elasticsearch.html
+ for more detailed info.
+--->
+
+# Contributing
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+# License
+
+See [BSL](./BSL.txt).

+ 1 - 0
apps/emqx_bridge_es/docker-ct

@@ -0,0 +1 @@
+toxiproxy

+ 0 - 0
apps/emqx_bridge_es/etc/emqx_bridge_es.conf


+ 8 - 0
apps/emqx_bridge_es/include/emqx_bridge_es.hrl

@@ -0,0 +1,8 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-ifndef(EMQX_BRIDGE_ES_HRL).
+-define(EMQX_BRIDGE_ES_HRL, true).
+
+-endif.

+ 15 - 0
apps/emqx_bridge_es/rebar.config

@@ -0,0 +1,15 @@
+%% -*- mode: erlang -*-
+
+{erl_opts, [
+    debug_info
+]}.
+
+{deps, [
+    {emqx, {path, "../../apps/emqx"}},
+    {emqx_connector, {path, "../../apps/emqx_connector"}},
+    {emqx_resource, {path, "../../apps/emqx_resource"}},
+    {emqx_bridge, {path, "../../apps/emqx_bridge"}},
+    {emqx_bridge_http, {path, "../emqx_bridge_http"}}
+]}.
+{plugins, [rebar3_path_deps]}.
+{project_plugins, [erlfmt]}.

+ 23 - 0
apps/emqx_bridge_es/src/emqx_bridge_es.app.src

@@ -0,0 +1,23 @@
+%% -*- mode: erlang -*-
+{application, emqx_bridge_es, [
+    {description, "EMQX Enterprise Elastic Search Bridge"},
+    {vsn, "0.1.0"},
+    {modules, [
+        emqx_bridge_es,
+        emqx_bridge_es_connector
+    ]},
+    {registered, []},
+    {applications, [
+        kernel,
+        stdlib,
+        emqx_resource,
+        emqx_connector
+    ]},
+    {env, []},
+    {licenses, ["Business Source License 1.1"]},
+    {maintainers, ["EMQX Team <contact@emqx.io>"]},
+    {links, [
+        {"Homepage", "https://emqx.io/"},
+        {"Github", "https://github.com/emqx/emqx"}
+    ]}
+]}.

+ 227 - 0
apps/emqx_bridge_es/src/emqx_bridge_es.erl

@@ -0,0 +1,227 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_es).
+
+-include("emqx_bridge_es.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-export([bridge_v2_examples/1]).
+
+%% hocon_schema API
+-export([namespace/0, roots/0, fields/1, desc/1]).
+
+-define(CONNECTOR_TYPE, elasticsearch).
+-define(ACTION_TYPE, ?CONNECTOR_TYPE).
+
+namespace() -> "bridge_elasticsearch".
+
+roots() -> [].
+
+fields(action) ->
+    {elasticsearch,
+        ?HOCON(
+            ?MAP(action_name, ?R_REF(action_config)),
+            #{
+                required => false,
+                desc => ?DESC(elasticsearch)
+            }
+        )};
+fields(action_config) ->
+    emqx_resource_schema:override(
+        emqx_bridge_v2_schema:make_consumer_action_schema(
+            ?HOCON(
+                ?UNION(fun action_union_member_selector/1),
+                #{
+                    required => true, desc => ?DESC("action_parameters")
+                }
+            )
+        ),
+        [
+            {resource_opts,
+                ?HOCON(?R_REF(action_resource_opts), #{
+                    default => #{},
+                    desc => ?DESC(emqx_resource_schema, "resource_opts")
+                })}
+        ]
+    );
+fields(action_resource_opts) ->
+    lists:filter(
+        fun({K, _V}) ->
+            not lists:member(K, unsupported_opts())
+        end,
+        emqx_bridge_v2_schema:resource_opts_fields()
+    );
+fields(action_create) ->
+    [
+        action(create),
+        index(),
+        id(false),
+        doc(true),
+        routing(),
+        require_alias(),
+        overwrite()
+        | http_common_opts()
+    ];
+fields(action_delete) ->
+    [action(delete), index(), id(true), routing() | http_common_opts()];
+fields(action_update) ->
+    [
+        action(update),
+        index(),
+        id(true),
+        doc(true),
+        routing(),
+        require_alias()
+        | http_common_opts()
+    ];
+fields("post_bridge_v2") ->
+    emqx_bridge_schema:type_and_name_fields(elasticsearch) ++ fields(action_config);
+fields("put_bridge_v2") ->
+    fields(action_config);
+fields("get_bridge_v2") ->
+    emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2").
+
+action_union_member_selector(all_union_members) ->
+    [
+        ?R_REF(action_create),
+        ?R_REF(action_delete),
+        ?R_REF(action_update)
+    ];
+action_union_member_selector({value, Value}) ->
+    case Value of
+        #{<<"action">> := <<"create">>} ->
+            [?R_REF(action_create)];
+        #{<<"action">> := <<"delete">>} ->
+            [?R_REF(action_delete)];
+        #{<<"action">> := <<"update">>} ->
+            [?R_REF(action_update)];
+        _ ->
+            Expected = "create | delete | update",
+            throw(#{
+                field_name => action,
+                expected => Expected
+            })
+    end.
+
+action(Action) ->
+    {action,
+        ?HOCON(
+            Action,
+            #{
+                required => true,
+                desc => atom_to_binary(Action)
+            }
+        )}.
+
+overwrite() ->
+    {overwrite,
+        ?HOCON(
+            boolean(),
+            #{
+                required => false,
+                default => true,
+                desc => ?DESC("config_overwrite")
+            }
+        )}.
+
+index() ->
+    {index,
+        ?HOCON(
+            binary(),
+            #{
+                required => true,
+                example => <<"${payload.index}">>,
+                desc => ?DESC("config_parameters_index")
+            }
+        )}.
+
+id(Required) ->
+    {id,
+        ?HOCON(
+            binary(),
+            #{
+                required => Required,
+                example => <<"${payload.id}">>,
+                desc => ?DESC("config_parameters_id")
+            }
+        )}.
+
+doc(Required) ->
+    {doc,
+        ?HOCON(
+            binary(),
+            #{
+                required => Required,
+                example => <<"${payload.doc}">>,
+                desc => ?DESC("config_parameters_doc")
+            }
+        )}.
+
+http_common_opts() ->
+    lists:filter(
+        fun({K, _}) ->
+            not lists:member(K, [path, method, body, headers, request_timeout])
+        end,
+        emqx_bridge_http_schema:fields("parameters_opts")
+    ).
+
+routing() ->
+    {routing,
+        ?HOCON(
+            binary(),
+            #{
+                required => false,
+                example => <<"${payload.routing}">>,
+                desc => ?DESC("config_routing")
+            }
+        )}.
+
+require_alias() ->
+    {require_alias,
+        ?HOCON(
+            boolean(),
+            #{
+                required => false,
+                desc => ?DESC("config_require_alias")
+            }
+        )}.
+
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"elasticsearch">> =>
+                #{
+                    summary => <<"Elastic Search Bridge">>,
+                    value => emqx_bridge_v2_schema:action_values(
+                        Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
+                    )
+                }
+        }
+    ].
+
+action_values() ->
+    #{
+        parameters => #{
+            action => create,
+            index => <<"${payload.index}">>,
+            overwrite => true,
+            doc => <<"${payload.doc}">>
+        }
+    }.
+
+unsupported_opts() ->
+    [
+        batch_size,
+        batch_time
+    ].
+
+desc(elasticsearch) -> ?DESC(elasticsearch);
+desc(action_config) -> ?DESC(action_config);
+desc(action_create) -> ?DESC(action_create);
+desc(action_delete) -> ?DESC(action_delete);
+desc(action_update) -> ?DESC(action_update);
+desc(action_resource_opts) -> ?DESC(action_resource_opts);
+desc(_) -> undefined.

+ 22 - 0
apps/emqx_bridge_es/src/emqx_bridge_es_action_info.erl

@@ -0,0 +1,22 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_es_action_info).
+
+-behaviour(emqx_action_info).
+
+-elvis([{elvis_style, invalid_dynamic_call, disable}]).
+
+%% behaviour callbacks
+-export([
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0
+]).
+
+-define(ACTION_TYPE, elasticsearch).
+
+action_type_name() -> ?ACTION_TYPE.
+connector_type_name() -> ?ACTION_TYPE.
+
+schema_module() -> emqx_bridge_es.

+ 373 - 0
apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl

@@ -0,0 +1,373 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_es_connector).
+
+-behaviour(emqx_resource).
+
+-include("emqx_bridge_es.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%% `emqx_resource' API
+-export([
+    callback_mode/0,
+    on_start/2,
+    on_stop/2,
+    on_get_status/2,
+    on_query/3,
+    on_query_async/4,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channels/1,
+    on_get_channel_status/3
+]).
+
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1,
+    connector_examples/1,
+    connector_example_values/0
+]).
+
+%% emqx_connector_resource behaviour callbacks
+-export([connector_config/2]).
+
+-type config() ::
+    #{
+        base_url := #{
+            scheme := http | https,
+            host := iolist(),
+            port := inet:port_number(),
+            path := _
+        },
+        connect_timeout := pos_integer(),
+        pool_type := random | hash,
+        pool_size := pos_integer(),
+        request => undefined | map(),
+        atom() => _
+    }.
+
+-type state() ::
+    #{
+        base_path := _,
+        connect_timeout := pos_integer(),
+        pool_type := random | hash,
+        channels := map(),
+        request => undefined | map(),
+        atom() => _
+    }.
+
+-type manager_id() :: binary().
+
+-define(CONNECTOR_TYPE, elasticsearch).
+
+%%-------------------------------------------------------------------------------------
+%% connector examples
+%%-------------------------------------------------------------------------------------
+connector_examples(Method) ->
+    [
+        #{
+            <<"elasticsearch">> =>
+                #{
+                    summary => <<"Elastic Search Connector">>,
+                    value => emqx_connector_schema:connector_values(
+                        Method, ?CONNECTOR_TYPE, connector_example_values()
+                    )
+                }
+        }
+    ].
+
+connector_example_values() ->
+    #{
+        name => <<"elasticsearch_connector">>,
+        type => elasticsearch,
+        enable => true,
+        authentication => #{
+            <<"username">> => <<"root">>,
+            <<"password">> => <<"******">>
+        },
+        base_url => <<"http://127.0.0.1:9200/">>,
+        connect_timeout => <<"15s">>,
+        pool_type => <<"random">>,
+        pool_size => 8,
+        enable_pipelining => 100,
+        ssl => #{enable => false}
+    }.
+
+%%-------------------------------------------------------------------------------------
+%% schema
+%%-------------------------------------------------------------------------------------
+namespace() -> "elasticsearch".
+
+roots() ->
+    [{config, #{type => ?R_REF(config)}}].
+
+fields(config) ->
+    lists:filter(
+        fun({K, _}) -> not lists:member(K, [url, request, retry_interval, headers]) end,
+        emqx_bridge_http_schema:fields("config_connector")
+    ) ++
+        fields("connection_fields");
+fields("connection_fields") ->
+    [
+        {base_url,
+            ?HOCON(
+                emqx_schema:url(),
+                #{
+                    required => true,
+                    desc => ?DESC(emqx_bridge_es, "config_base_url")
+                }
+            )},
+        {authentication,
+            ?HOCON(
+                ?UNION([?R_REF(auth_basic)]),
+                #{
+                    desc => ?DESC("config_authentication")
+                }
+            )}
+    ];
+fields(auth_basic) ->
+    [
+        {username,
+            ?HOCON(binary(), #{
+                required => true,
+                desc => ?DESC("config_auth_basic_username")
+            })},
+        {password,
+            emqx_schema_secret:mk(#{
+                required => true,
+                desc => ?DESC("config_auth_basic_password")
+            })}
+    ];
+fields("post") ->
+    emqx_connector_schema:type_and_name_fields(elasticsearch) ++ fields(config);
+fields("put") ->
+    fields(config);
+fields("get") ->
+    emqx_bridge_schema:status_fields() ++ fields("post").
+
+desc(config) ->
+    ?DESC("desc_config");
+desc(auth_basic) ->
+    "Basic Authentication";
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for Elastic Search using `", string:to_upper(Method), "` method."];
+desc(_) ->
+    undefined.
+
+connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) ->
+    #{
+        base_url := BaseUrl,
+        authentication :=
+            #{
+                username := Username,
+                password := Password0
+            }
+    } = Conf,
+
+    Password = emqx_secret:unwrap(Password0),
+    Base64 = base64:encode(<<Username/binary, ":", Password/binary>>),
+    BasicToken = <<"Basic ", Base64/binary>>,
+
+    WebhookConfig =
+        Conf#{
+            method => <<"post">>,
+            url => BaseUrl,
+            headers => [
+                {<<"Content-type">>, <<"application/json">>},
+                {<<"Authorization">>, BasicToken}
+            ]
+        },
+    ParseConfs(
+        <<"http">>,
+        Name,
+        WebhookConfig
+    ).
+
+%%-------------------------------------------------------------------------------------
+%% `emqx_resource' API
+%%-------------------------------------------------------------------------------------
+callback_mode() -> async_if_possible.
+
+-spec on_start(manager_id(), config()) -> {ok, state()} | no_return().
+on_start(InstanceId, Config) ->
+    case emqx_bridge_http_connector:on_start(InstanceId, Config) of
+        {ok, State} ->
+            ?SLOG(info, #{
+                msg => "elasticsearch_bridge_started",
+                instance_id => InstanceId,
+                request => maps:get(request, State, <<>>)
+            }),
+            ?tp(elasticsearch_bridge_started, #{instance_id => InstanceId}),
+            {ok, State#{channels => #{}}};
+        {error, Reason} ->
+            ?SLOG(error, #{
+                msg => "failed_to_start_elasticsearch_bridge",
+                instance_id => InstanceId,
+                request => maps:get(request, Config, <<>>),
+                reason => Reason
+            }),
+            throw(failed_to_start_elasticsearch_bridge)
+    end.
+
+-spec on_stop(manager_id(), state()) -> ok | {error, term()}.
+on_stop(InstanceId, State) ->
+    ?SLOG(info, #{
+        msg => "stopping_elasticsearch_bridge",
+        connector => InstanceId
+    }),
+    Res = emqx_bridge_http_connector:on_stop(InstanceId, State),
+    ?tp(elasticsearch_bridge_stopped, #{instance_id => InstanceId}),
+    Res.
+
+-spec on_get_status(manager_id(), state()) ->
+    {connected, state()} | {disconnected, state(), term()}.
+on_get_status(InstanceId, State) ->
+    emqx_bridge_http_connector:on_get_status(InstanceId, State).
+
+-spec on_query(manager_id(), tuple(), state()) ->
+    {ok, pos_integer(), [term()], term()}
+    | {ok, pos_integer(), [term()]}
+    | {error, term()}.
+on_query(InstanceId, {ChannelId, Msg} = Req, State) ->
+    ?tp(elasticsearch_bridge_on_query, #{instance_id => InstanceId}),
+    ?SLOG(debug, #{
+        msg => "elasticsearch_bridge_on_query_called",
+        instance_id => InstanceId,
+        send_message => Req,
+        state => emqx_utils:redact(State)
+    }),
+    handle_response(
+        emqx_bridge_http_connector:on_query(
+            InstanceId, {ChannelId, Msg}, State
+        )
+    ).
+
+-spec on_query_async(manager_id(), tuple(), {function(), [term()]}, state()) ->
+    {ok, pid()} | {error, empty_request}.
+on_query_async(
+    InstanceId, {ChannelId, Msg} = Req, ReplyFunAndArgs0, State
+) ->
+    ?tp(elasticsearch_bridge_on_query_async, #{instance_id => InstanceId}),
+    ?SLOG(debug, #{
+        msg => "elasticsearch_bridge_on_query_async_called",
+        instance_id => InstanceId,
+        send_message => Req,
+        state => emqx_utils:redact(State)
+    }),
+    ReplyFunAndArgs =
+        {
+            fun(Result) ->
+                Response = handle_response(Result),
+                emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response)
+            end,
+            []
+        },
+    emqx_bridge_http_connector:on_query_async(
+        InstanceId, {ChannelId, Msg}, ReplyFunAndArgs, State
+    ).
+
+on_add_channel(
+    InstanceId,
+    #{channels := Channels} = State0,
+    ChannelId,
+    #{parameters := Parameter}
+) ->
+    case maps:is_key(ChannelId, Channels) of
+        true ->
+            {error, already_exists};
+        _ ->
+            Parameter1 = Parameter#{
+                path => path(Parameter),
+                method => method(Parameter),
+                body => get_body_template(Parameter)
+            },
+            {ok, State} = emqx_bridge_http_connector:on_add_channel(
+                InstanceId, State0, ChannelId, #{parameters => Parameter1}
+            ),
+            Channel = Parameter1,
+            Channels2 = Channels#{ChannelId => Channel},
+            {ok, State#{channels => Channels2}}
+    end.
+
+on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) ->
+    {ok, OldState} = emqx_bridge_http_connector:on_remove_channel(InstanceId, OldState0, ChannelId),
+    Channels2 = maps:remove(ChannelId, Channels),
+    {ok, OldState#{channels => Channels2}}.
+
+on_get_channels(InstanceId) ->
+    emqx_bridge_v2:get_channels_for_connector(InstanceId).
+
+on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
+    case maps:is_key(ChannelId, Channels) of
+        true ->
+            connected;
+        _ ->
+            {error, not_exists}
+    end.
+
+%%--------------------------------------------------------------------
+%% Internal Functions
+%%--------------------------------------------------------------------
+%% delete DELETE /<index>/_doc/<_id>
+path(#{action := delete, id := Id, index := Index} = Action) ->
+    BasePath = ["/", Index, "/_doc/", Id],
+    Qs = add_query_string([routing], Action),
+    BasePath ++ Qs;
+%% update POST /<index>/_update/<_id>
+path(#{action := update, id := Id, index := Index} = Action) ->
+    BasePath = ["/", Index, "/_update/", Id],
+    Qs = add_query_string([routing, require_alias], Action),
+    BasePath ++ Qs;
+%% create with id  /<index>/_doc/_id
+path(#{action := create, index := Index, id := Id} = Action) ->
+    BasePath = ["/", Index, "/_doc/", Id],
+    Qs =
+        case maps:get(overwrite, Action, true) of
+            true ->
+                add_query_string([routing, require_alias], Action);
+            false ->
+                Action1 = Action#{op_type => "create"},
+                add_query_string([routing, require_alias, op_type], Action1)
+        end,
+    BasePath ++ Qs;
+%% create without id POST /<index>/_doc/
+path(#{action := create, index := Index} = Action) ->
+    BasePath = ["/", Index, "/_doc/"],
+    Qs = add_query_string([routing, require_alias], Action),
+    BasePath ++ Qs.
+
+method(#{action := create}) -> <<"POST">>;
+method(#{action := delete}) -> <<"DELETE">>;
+method(#{action := update}) -> <<"POST">>.
+
+add_query_string(Keys, Param0) ->
+    Param1 = maps:with(Keys, Param0),
+    FoldFun = fun(K, V, Acc) -> [[atom_to_list(K), "=", to_str(V)] | Acc] end,
+    case maps:fold(FoldFun, [], Param1) of
+        "" -> "";
+        QString -> "?" ++ lists:join("&", QString)
+    end.
+
+to_str(List) when is_list(List) -> List;
+to_str(false) -> "false";
+to_str(true) -> "true";
+to_str(Atom) when is_atom(Atom) -> atom_to_list(Atom).
+
+handle_response({ok, Code, _Headers, _Body} = Resp) when Code =:= 200; Code =:= 201 ->
+    Resp;
+handle_response({ok, Code, _Body} = Resp) when Code =:= 200; Code =:= 201 ->
+    Resp;
+handle_response({ok, Code, _Headers, Body}) ->
+    {error, #{code => Code, body => Body}};
+handle_response({ok, Code, Body}) ->
+    {error, #{code => Code, body => Body}};
+handle_response({error, _} = Error) ->
+    Error.
+
+get_body_template(#{doc := Doc}) -> Doc;
+get_body_template(_) -> undefined.

+ 7 - 6
apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl

@@ -337,7 +337,7 @@ on_query(
             } = process_request_and_action(Request, ActionState, Msg),
             %% bridge buffer worker has retry, do not let ehttpc retry
             Retry = 2,
-            ClientId = maps:get(clientid, Msg, undefined),
+            ClientId = clientid(Msg),
             on_query(
                 InstId,
                 {ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
@@ -449,7 +449,7 @@ on_query_async(
                 headers := Headers,
                 request_timeout := Timeout
             } = process_request_and_action(Request, ActionState, Msg),
-            ClientId = maps:get(clientid, Msg, undefined),
+            ClientId = clientid(Msg),
             on_query_async(
                 InstId,
                 {ClientId, Method, {Path, Headers, Body}, Timeout},
@@ -632,9 +632,6 @@ parse_template(String) ->
 process_request_and_action(Request, ActionState, Msg) ->
     MethodTemplate = maps:get(method, ActionState),
     Method = make_method(render_template_string(MethodTemplate, Msg)),
-    BodyTemplate = maps:get(body, ActionState),
-    Body = render_request_body(BodyTemplate, Msg),
-
     PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)),
     PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)),
 
@@ -650,6 +647,8 @@ process_request_and_action(Request, ActionState, Msg) ->
         render_headers(HeadersTemplate1, Msg),
         render_headers(HeadersTemplate2, Msg)
     ),
+    BodyTemplate = maps:get(body, ActionState),
+    Body = render_request_body(BodyTemplate, Msg),
     #{
         method => Method,
         path => Path,
@@ -732,7 +731,7 @@ formalize_request(_Method, BasePath, {Path, Headers}) ->
 %% because an HTTP server may handle paths like
 %% "/a/b/c/", "/a/b/c" and "/a//b/c" differently.
 %%
-%% So we try to avoid unneccessary path normalization.
+%% So we try to avoid unnecessary path normalization.
 %%
 %% See also: `join_paths_test_/0`
 join_paths(Path1, Path2) ->
@@ -876,6 +875,8 @@ redact_request({Path, Headers}) ->
 redact_request({Path, Headers, _Body}) ->
     {Path, Headers, <<"******">>}.
 
+clientid(Msg) -> maps:get(clientid, Msg, undefined).
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 

+ 6 - 3
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -90,7 +90,7 @@ connector_example_values() ->
         enable => true,
         authentication => #{
             <<"username">> => <<"root">>,
-            <<"password">> => <<"*****">>
+            <<"password">> => <<"******">>
         },
         base_url => <<"http://iotdb.local:18080/">>,
         connect_timeout => <<"15s">>,
@@ -109,7 +109,10 @@ roots() ->
     [{config, #{type => hoconsc:ref(?MODULE, config)}}].
 
 fields(config) ->
-    proplists_without([url, headers], emqx_bridge_http_schema:fields("config_connector")) ++
+    proplists_without(
+        [url, request, retry_interval, headers],
+        emqx_bridge_http_schema:fields("config_connector")
+    ) ++
         fields("connection_fields");
 fields("connection_fields") ->
     [
@@ -206,7 +209,7 @@ on_start(InstanceId, Config) ->
             ?SLOG(error, #{
                 msg => "failed_to_start_iotdb_bridge",
                 instance_id => InstanceId,
-                base_url => maps:get(request, Config, <<>>),
+                request => maps:get(request, Config, <<>>),
                 reason => Reason
             }),
             throw(failed_to_start_iotdb_bridge)

+ 16 - 2
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -50,6 +50,8 @@ resource_type(redis) ->
     emqx_bridge_redis_connector;
 resource_type(iotdb) ->
     emqx_bridge_iotdb_connector;
+resource_type(elasticsearch) ->
+    emqx_bridge_es_connector;
 resource_type(Type) ->
     error({unknown_connector_type, Type}).
 
@@ -62,6 +64,8 @@ connector_impl_module(confluent_producer) ->
     emqx_bridge_confluent_producer;
 connector_impl_module(iotdb) ->
     emqx_bridge_iotdb_connector;
+connector_impl_module(elasticsearch) ->
+    emqx_bridge_es_connector;
 connector_impl_module(_ConnectorType) ->
     undefined.
 
@@ -181,6 +185,14 @@ connector_structs() ->
                     desc => <<"IoTDB Connector Config">>,
                     required => false
                 }
+            )},
+        {elasticsearch,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_es_connector, config)),
+                #{
+                    desc => <<"ElasticSearch Connector Config">>,
+                    required => false
+                }
             )}
     ].
 
@@ -199,7 +211,8 @@ schema_modules() ->
         emqx_bridge_timescale,
         emqx_postgresql_connector_schema,
         emqx_bridge_redis_schema,
-        emqx_bridge_iotdb_connector
+        emqx_bridge_iotdb_connector,
+        emqx_bridge_es_connector
     ].
 
 api_schemas(Method) ->
@@ -227,7 +240,8 @@ api_schemas(Method) ->
         api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
         api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"),
         api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"),
-        api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method)
+        api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method),
+        api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method)
     ].
 
 api_ref(Module, Type, Method) ->

+ 3 - 1
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -147,7 +147,9 @@ connector_type_to_bridge_types(syskeeper_proxy) ->
 connector_type_to_bridge_types(timescale) ->
     [timescale];
 connector_type_to_bridge_types(iotdb) ->
-    [iotdb].
+    [iotdb];
+connector_type_to_bridge_types(elasticsearch) ->
+    [elasticsearch].
 
 actions_config_name() -> <<"actions">>.
 

+ 1 - 0
apps/emqx_machine/priv/reboot_lists.eterm

@@ -99,6 +99,7 @@
             emqx_bridge_hstreamdb,
             emqx_bridge_influxdb,
             emqx_bridge_iotdb,
+            emqx_bridge_es,
             emqx_bridge_matrix,
             emqx_bridge_mongodb,
             emqx_bridge_mysql,

+ 1 - 1
apps/emqx_machine/src/emqx_machine.app.src

@@ -3,7 +3,7 @@
     {id, "emqx_machine"},
     {description, "The EMQX Machine"},
     % strict semver, bump manually!
-    {vsn, "0.2.17"},
+    {vsn, "0.2.18"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, emqx_ctl]},

+ 1 - 0
mix.exs

@@ -164,6 +164,7 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_bridge_hstreamdb,
       :emqx_bridge_influxdb,
       :emqx_bridge_iotdb,
+      :emqx_bridge_es,
       :emqx_bridge_matrix,
       :emqx_bridge_mongodb,
       :emqx_bridge_mysql,

+ 144 - 0
rel/i18n/emqx_bridge_es.hocon

@@ -0,0 +1,144 @@
+emqx_bridge_es {
+
+elasticsearch.desc:
+"""Elasticsearch Bridge"""
+elasticsearch.label:
+"""ElasticSearch"""
+
+config_enable.desc:
+"""Enable or disable this bridge"""
+
+config_enable.label:
+"""Enable Or Disable Bridge"""
+
+config_authentication.desc:
+"""Authentication configuration"""
+
+config_authentication.label:
+"""Authentication"""
+
+auth_basic.desc:
+"""Parameters for basic authentication."""
+
+auth_basic.label:
+"""Basic auth params"""
+
+config_auth_basic_username.desc:
+"""The username as configured at the IoTDB REST interface"""
+
+config_auth_basic_username.label:
+  """HTTP Basic Auth Username"""
+
+config_auth_basic_password.desc:
+"""The password as configured at the IoTDB REST interface"""
+
+config_auth_basic_password.label:
+"""HTTP Basic Auth Password"""
+
+config_base_url.desc:
+"""The base URL of the external ElasticSearch service's REST interface."""
+config_base_url.label:
+"""ElasticSearch REST Service Base URL"""
+
+config_target.desc:
+"""Name of the data stream, index, or index alias to perform bulk actions on"""
+
+config_target.label:
+"""Target"""
+
+config_require_alias.desc:
+"""If true, the request’s actions must target an index alias. Defaults to false"""
+config_require_alias.label:
+"""Require Alias"""
+
+config_routing.desc:
+"""Custom value used to route operations to a specific shard."""
+config_routing.label:
+"""Routing"""
+
+config_wait_for_active_shards.desc:
+"""The number of shard copies that must be active before proceeding with the operation.
+Set to all or any positive integer up to the total number of shards in the index (number_of_replicas+1).
+Default: 1, the primary shard"""
+
+config_max_retries.desc:
+"""HTTP request max retry times if failed."""
+
+config_max_retries.label:
+"""HTTP Request Max Retries"""
+
+desc_config.desc:
+"""Configuration for Apache IoTDB bridge."""
+
+desc_config.label:
+"""IoTDB Bridge Configuration"""
+
+desc_name.desc:
+"""Bridge name, used as a human-readable description of the bridge."""
+
+desc_name.label:
+"""Bridge Name"""
+
+config_parameters_index.desc:
+"""Name of index, or index alias to perform the action on.
+This parameter is required."""
+
+config_parameters_index.label:
+"""_index"""
+
+config_parameters_id.desc:
+"""The document ID. If no ID is specified, a document ID is automatically generated."""
+config_parameters_id.label:
+"""_id"""
+
+config_parameters_require_alias.desc:
+"""If true, the action must target an index alias. Defaults to false."""
+config_parameters_require_alias.label:
+"""_require_alias"""
+
+config_parameters_doc.desc:
+"""JSON document"""
+config_parameters_doc.label:
+"""doc"""
+
+action_parameters.desc:
+"""ElasticSearch action parameters"""
+
+action_parameters.label:
+"""Parameters"""
+
+config_overwrite.desc:
+"""Set to false If a document with the specified _id already exists(conflict), the operation will fail."""
+
+config_overwrite.label:
+"""overwrite"""
+
+action_config.desc:
+"""ElasticSearch Action Configuration"""
+action_config.label:
+"""ElasticSearch Action Config"""
+
+action_create.desc:
+"""Adds a JSON document to the specified index and makes it searchable.
+If the target is an index and the document already exists,
+the request updates the document and increments its version."""
+action_create.label:
+"""Create Doc"""
+
+action_delete.desc:
+"""Removes a JSON document from the specified index."""
+action_delete.label:
+"""Delete Doc"""
+
+action_update.desc:
+"""Updates a document using the specified doc."""
+action_update.label:
+"""Update Doc"""
+
+action_resource_opts.desc:
+"""Resource options."""
+
+action_resource_opts.label:
+"""Resource Options"""
+
+}

+ 39 - 0
rel/i18n/emqx_bridge_es_connector.hocon

@@ -0,0 +1,39 @@
+emqx_bridge_es_connector {
+
+config_authentication.desc:
+"""Authentication configuration"""
+
+config_authentication.label:
+"""Authentication"""
+
+auth_basic.desc:
+"""Parameters for basic authentication."""
+
+auth_basic.label:
+"""Basic auth params"""
+
+config_auth_basic_username.desc:
+"""The username as configured at the ElasticSearch REST interface"""
+
+config_auth_basic_username.label:
+  """HTTP Basic Auth Username"""
+
+config_auth_basic_password.desc:
+"""The password as configured at the ElasticSearch REST interface"""
+
+config_auth_basic_password.label:
+"""HTTP Basic Auth Password"""
+
+config_max_retries.desc:
+"""HTTP request max retry times if failed."""
+
+config_max_retries.label:
+"""HTTP Request Max Retries"""
+
+desc_config.desc:
+"""Configuration for ElasticSearch bridge."""
+
+desc_config.label:
+"""ElasticSearch Bridge Configuration"""
+
+}

+ 2 - 0
scripts/spellcheck/dicts/emqx.txt

@@ -297,3 +297,5 @@ Syskeeper
 msacc
 now_us
 ns
+elasticsearch
+ElasticSearch