Browse Source

refactor: refactor es's action

zhongwencool 2 năm trước cách đây
mục cha
commit
ace443fc18

+ 10 - 5
apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl

@@ -361,11 +361,16 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
         [] ->
             false;
         _ ->
-            {true, #{
-                schema_module => Module,
-                type_name => TypeName,
-                missing_fields => MissingFields
-            }}
+            %% elasticsearch is new and doesn't have local_topic
+            case MissingFields of
+                [local_topic] when Module =:= emqx_bridge_es -> false;
+                _ ->
+                    {true, #{
+                        schema_module => Module,
+                        type_name => TypeName,
+                        missing_fields => MissingFields
+                    }}
+            end
     end.
 
 -endif.

+ 0 - 19
apps/emqx_bridge_es/.gitignore

@@ -1,19 +0,0 @@
-.rebar3
- _*
- .eunit
- *.o
- *.beam
- *.plt
- *.swp
- *.swo
- .erlang.cookie
- ebin
- log
- erl_crash.dump
- .rebar
- logs
- _build
- .idea
- *.iml
- rebar3.crashdump
- *~

+ 138 - 223
apps/emqx_bridge_es/src/emqx_bridge_es.erl

@@ -25,15 +25,15 @@ fields(action) ->
         ?HOCON(
             ?MAP(action_name, ?R_REF(action_config)),
             #{
-                desc => <<"ElasticSearch Action Config">>,
-                required => false
+                required => false,
+                desc => ?DESC(elasticsearch)
             }
         )};
 fields(action_config) ->
     emqx_resource_schema:override(
-        emqx_bridge_v2_schema:make_producer_action_schema(
+        emqx_bridge_v2_schema:make_consumer_action_schema(
             ?HOCON(
-                ?R_REF(action_parameters),
+                ?UNION(fun action_union_member_selector/1),
                 #{
                     required => true, desc => ?DESC("action_parameters")
                 }
@@ -54,200 +54,28 @@ fields(action_resource_opts) ->
         end,
         emqx_bridge_v2_schema:resource_opts_fields()
     );
-fields(action_parameters) ->
+fields(action_create) ->
     [
-        {target,
-            ?HOCON(
-                binary(),
-                #{
-                    desc => ?DESC("config_target"),
-                    required => false
-                }
-            )},
-        {require_alias,
-            ?HOCON(
-                boolean(),
-                #{
-                    required => false,
-                    default => false,
-                    desc => ?DESC("config_require_alias")
-                }
-            )},
-        {routing,
-            ?HOCON(
-                binary(),
-                #{
-                    required => false,
-                    desc => ?DESC("config_routing")
-                }
-            )},
-        {wait_for_active_shards,
-            ?HOCON(
-                ?UNION([pos_integer(), all]),
-                #{
-                    required => false,
-                    desc => ?DESC("config_wait_for_active_shards")
-                }
-            )},
-        {data,
-            ?HOCON(
-                ?ARRAY(
-                    ?UNION(
-                        [
-                            ?R_REF(create),
-                            ?R_REF(delete),
-                            ?R_REF(index),
-                            ?R_REF(update)
-                        ]
-                    )
-                ),
-                #{
-                    desc => ?DESC("action_parameters_data")
-                }
-            )}
-    ] ++
-        lists:filter(
-            fun({K, _}) ->
-                not lists:member(K, [path, method, body, headers, request_timeout])
-            end,
-            emqx_bridge_http_schema:fields("parameters_opts")
-        );
-fields(Action) when Action =:= create; Action =:= index ->
-    [
-        {action,
-            ?HOCON(
-                Action,
-                #{
-                    desc => atom_to_binary(Action),
-                    required => true
-                }
-            )},
-        {'_index',
-            ?HOCON(
-                binary(),
-                #{
-                    required => false,
-                    desc => ?DESC("config_parameters_index")
-                }
-            )},
-        {'_id',
-            ?HOCON(
-                binary(),
-                #{
-                    required => false,
-                    desc => ?DESC("config_parameters_id")
-                }
-            )},
-        {require_alias,
-            ?HOCON(
-                binary(),
-                #{
-                    required => false,
-                    desc => ?DESC("config_parameters_require_alias")
-                }
-            )},
-        {fields,
-            ?HOCON(
-                binary(),
-                #{
-                    required => true,
-                    desc => ?DESC("config_parameters_fields")
-                }
-            )}
-    ];
-fields(delete) ->
-    [
-        {action,
-            ?HOCON(
-                delete,
-                #{
-                    desc => <<"Delete">>,
-                    required => true
-                }
-            )},
-        {'_index',
-            ?HOCON(
-                binary(),
-                #{
-                    required => false,
-                    desc => ?DESC("config_parameters_index")
-                }
-            )},
-        {'_id',
-            ?HOCON(
-                binary(),
-                #{
-                    required => true,
-                    desc => ?DESC("config_parameters_id")
-                }
-            )},
-        {require_alias,
-            ?HOCON(
-                binary(),
-                #{
-                    required => false,
-                    desc => ?DESC("config_parameters_require_alias")
-                }
-            )}
+        action(create),
+        index(),
+        id(false),
+        doc(true),
+        routing(),
+        require_alias(),
+        overwrite()
+        | http_common_opts()
     ];
-fields(update) ->
+fields(action_delete) ->
+    [action(delete), index(), id(true), routing() | http_common_opts()];
+fields(action_update) ->
     [
-        {action,
-            ?HOCON(
-                update,
-                #{
-                    desc => <<"Update">>,
-                    required => true
-                }
-            )},
-        {doc_as_upsert,
-            ?HOCON(
-                binary(),
-                #{
-                    required => false,
-                    desc => ?DESC("config_parameters_doc_as_upsert")
-                }
-            )},
-        {upsert,
-            ?HOCON(
-                binary(),
-                #{
-                    required => false,
-                    desc => ?DESC("config_parameters_upsert")
-                }
-            )},
-        {'_index',
-            ?HOCON(
-                binary(),
-                #{
-                    required => false,
-                    desc => ?DESC("config_parameters_index")
-                }
-            )},
-        {'_id',
-            ?HOCON(
-                binary(),
-                #{
-                    required => true,
-                    desc => ?DESC("config_parameters_id")
-                }
-            )},
-        {require_alias,
-            ?HOCON(
-                binary(),
-                #{
-                    required => false,
-                    desc => ?DESC("config_parameters_require_alias")
-                }
-            )},
-        {fields,
-            ?HOCON(
-                binary(),
-                #{
-                    required => true,
-                    desc => ?DESC("config_parameters_fields")
-                }
-            )}
+        action(update),
+        index(),
+        id(true),
+        doc(true),
+        routing(),
+        require_alias()
+        | http_common_opts()
     ];
 fields("post_bridge_v2") ->
     emqx_bridge_schema:type_and_name_fields(elasticsearch) ++ fields(action_config);
@@ -256,6 +84,111 @@ fields("put_bridge_v2") ->
 fields("get_bridge_v2") ->
     emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2").
 
+action_union_member_selector(all_union_members) ->
+    [
+        ?R_REF(action_create),
+        ?R_REF(action_delete),
+        ?R_REF(action_update)
+    ];
+action_union_member_selector({value, Value}) ->
+    case Value of
+        #{<<"action">> := <<"create">>} ->
+            [?R_REF(action_create)];
+        #{<<"action">> := <<"delete">>} ->
+            [?R_REF(action_delete)];
+        #{<<"action">> := <<"update">>} ->
+            [?R_REF(action_update)];
+        _ ->
+            Expected = "create | delete | update",
+            throw(#{
+                field_name => action,
+                expected => Expected
+            })
+    end.
+
+action(Action) ->
+    {action,
+        ?HOCON(
+            Action,
+            #{
+                required => true,
+                desc => atom_to_binary(Action)
+            }
+        )}.
+
+overwrite() ->
+    {overwrite,
+        ?HOCON(
+            boolean(),
+            #{
+                required => false,
+                default => true,
+                desc => ?DESC("config_overwrite")
+            }
+        )}.
+
+index() ->
+    {index,
+        ?HOCON(
+            binary(),
+            #{
+                required => true,
+                example => <<"${payload.index}">>,
+                desc => ?DESC("config_parameters_index")
+            }
+        )}.
+
+id(Required) ->
+    {id,
+        ?HOCON(
+            binary(),
+            #{
+                required => Required,
+                example => <<"${payload.id}">>,
+                desc => ?DESC("config_parameters_id")
+            }
+        )}.
+
+doc(Required) ->
+    {doc,
+        ?HOCON(
+            binary(),
+            #{
+                required => Required,
+                example => <<"${payload.doc}">>,
+                desc => ?DESC("config_parameters_doc")
+            }
+        )}.
+
+http_common_opts() ->
+    lists:filter(
+        fun({K, _}) ->
+            not lists:member(K, [path, method, body, headers, request_timeout])
+        end,
+        emqx_bridge_http_schema:fields("parameters_opts")
+    ).
+
+routing() ->
+    {routing,
+        ?HOCON(
+            binary(),
+            #{
+                required => false,
+                example => <<"${payload.routing}">>,
+                desc => ?DESC("config_routing")
+            }
+        )}.
+
+require_alias() ->
+    {require_alias,
+        ?HOCON(
+            boolean(),
+            #{
+                required => false,
+                desc => ?DESC("config_require_alias")
+            }
+        )}.
+
 bridge_v2_examples(Method) ->
     [
         #{
@@ -272,34 +205,10 @@ bridge_v2_examples(Method) ->
 action_values() ->
     #{
         parameters => #{
-            target => <<"${target_index}">>,
-            data => [
-                #{
-                    action => index,
-                    '_index' => <<"${index}">>,
-                    fields => <<"${fields}">>,
-                    require_alias => <<"${require_alias}">>
-                },
-                #{
-                    action => create,
-                    '_index' => <<"${index}">>,
-                    fields => <<"${fields}">>
-                },
-                #{
-                    action => delete,
-                    '_index' => <<"${index}">>,
-                    '_id' => <<"${id}">>
-                },
-                #{
-                    action => update,
-                    '_index' => <<"${index}">>,
-                    '_id' => <<"${id}">>,
-                    fields => <<"${fields}">>,
-                    require_alias => false,
-                    doc_as_upsert => <<"${doc_as_upsert}">>,
-                    upsert => <<"${upsert}">>
-                }
-            ]
+            action => create,
+            index => <<"${payload.index}">>,
+            overwrite => true,
+            doc => <<"${payload.doc}">>
         }
     }.
 
@@ -309,4 +218,10 @@ unsupported_opts() ->
         batch_time
     ].
 
+desc(elasticsearch) -> ?DESC(elasticsearch);
+desc(action_config) -> ?DESC(action_config);
+desc(action_create) -> ?DESC(action_create);
+desc(action_delete) -> ?DESC(action_delete);
+desc(action_update) -> ?DESC(action_update);
+desc(action_resource_opts) -> ?DESC(action_resource_opts);
 desc(_) -> undefined.

+ 70 - 195
apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl

@@ -233,7 +233,7 @@ on_get_status(InstanceId, State) ->
     {ok, pos_integer(), [term()], term()}
     | {ok, pos_integer(), [term()]}
     | {error, term()}.
-on_query(InstanceId, {ChannelId, Msg} = Req, #{channels := Channels} = State) ->
+on_query(InstanceId, {ChannelId, Msg} = Req, State) ->
     ?tp(elasticsearch_bridge_on_query, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
         msg => "elasticsearch_bridge_on_query_called",
@@ -241,21 +241,16 @@ on_query(InstanceId, {ChannelId, Msg} = Req, #{channels := Channels} = State) ->
         send_message => Req,
         state => emqx_utils:redact(State)
     }),
-    case try_render_message(Req, Channels) of
-        {ok, Body} ->
-            handle_response(
-                emqx_bridge_http_connector:on_query(
-                    InstanceId, {ChannelId, {Msg, Body}}, State
-                )
-            );
-        Error ->
-            Error
-    end.
+    handle_response(
+        emqx_bridge_http_connector:on_query(
+            InstanceId, {ChannelId, Msg}, State
+        )
+    ).
 
 -spec on_query_async(manager_id(), tuple(), {function(), [term()]}, state()) ->
     {ok, pid()} | {error, empty_request}.
 on_query_async(
-    InstanceId, {ChannelId, Msg} = Req, ReplyFunAndArgs0, #{channels := Channels} = State
+    InstanceId, {ChannelId, Msg} = Req, ReplyFunAndArgs0, State
 ) ->
     ?tp(elasticsearch_bridge_on_query_async, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
@@ -264,22 +259,17 @@ on_query_async(
         send_message => Req,
         state => emqx_utils:redact(State)
     }),
-    case try_render_message(Req, Channels) of
-        {ok, Payload} ->
-            ReplyFunAndArgs =
-                {
-                    fun(Result) ->
-                        Response = handle_response(Result),
-                        emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response)
-                    end,
-                    []
-                },
-            emqx_bridge_http_connector:on_query_async(
-                InstanceId, {ChannelId, {Msg, Payload}}, ReplyFunAndArgs, State
-            );
-        Error ->
-            Error
-    end.
+    ReplyFunAndArgs =
+        {
+            fun(Result) ->
+                Response = handle_response(Result),
+                emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response)
+            end,
+            []
+        },
+    emqx_bridge_http_connector:on_query_async(
+        InstanceId, {ChannelId, Msg}, ReplyFunAndArgs, State
+    ).
 
 on_add_channel(
     InstanceId,
@@ -291,19 +281,17 @@ on_add_channel(
         true ->
             {error, already_exists};
         _ ->
-            #{data := Data} = Parameter,
-            Parameter1 = Parameter#{path => path(Parameter), method => <<"post">>},
+            Parameter1 = Parameter#{
+                path => path(Parameter),
+                method => method(Parameter),
+                body => get_body_template(Parameter)
+            },
             {ok, State} = emqx_bridge_http_connector:on_add_channel(
                 InstanceId, State0, ChannelId, #{parameters => Parameter1}
             ),
-            case preproc_data_template(Data) of
-                [] ->
-                    {error, invalid_data};
-                DataTemplate ->
-                    Channel = Parameter1#{data => DataTemplate},
-                    Channels2 = Channels#{ChannelId => Channel},
-                    {ok, State#{channels => Channels2}}
-            end
+            Channel = Parameter1,
+            Channels2 = Channels#{ChannelId => Channel},
+            {ok, State#{channels => Channels2}}
     end.
 
 on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) ->
@@ -325,124 +313,55 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
 %%--------------------------------------------------------------------
 %% Internal Functions
 %%--------------------------------------------------------------------
-path(Param) ->
-    Target = maps:get(target, Param, undefined),
-    QString0 = maps:fold(
-        fun(K, V, Acc) ->
-            [[atom_to_list(K), "=", to_str(V)] | Acc]
+%% delete DELETE /<index>/_doc/<_id>
+path(#{action := delete, id := Id, index := Index} = Action) ->
+    BasePath = ["/", Index, "/_doc/", Id],
+    Qs = add_query_string([routing], Action),
+    BasePath ++ Qs;
+%% update POST /<index>/_update/<_id>
+path(#{action := update, id := Id, index := Index} = Action) ->
+    BasePath = ["/", Index, "/_update/", Id],
+    Qs = add_query_string([routing, require_alias], Action),
+    BasePath ++ Qs;
+%% create with id  /<index>/_doc/_id
+path(#{action := create, index := Index, id := Id} = Action) ->
+    BasePath = ["/", Index, "/_doc/", Id],
+    Qs =
+        case maps:get(overwrite, Action, true) of
+            true ->
+                add_query_string([routing, require_alias], Action);
+            false ->
+                Action1 = Action#{op_type => "create"},
+                add_query_string([routing, require_alias, op_type], Action1)
         end,
-        [["_source=false"], ["filter_path=items.*.error"]],
-        maps:with([require_alias, routing, wait_for_active_shards], Param)
-    ),
-    QString = "?" ++ lists:join("&", QString0),
-    target(Target) ++ QString.
-
-target(undefined) -> "/_bulk";
-target(Str) -> "/" ++ binary_to_list(Str) ++ "/_bulk".
+    BasePath ++ Qs;
+%% create without id POST /<index>/_doc/
+path(#{action := create, index := Index} = Action) ->
+    BasePath = ["/", Index, "/_doc/"],
+    Qs = add_query_string([routing, require_alias], Action),
+    BasePath ++ Qs.
+
+method(#{action := create}) -> <<"POST">>;
+method(#{action := delete}) -> <<"DELETE">>;
+method(#{action := update}) -> <<"POST">>.
+
+add_query_string(Keys, Param0) ->
+    Param1 = maps:with(Keys, Param0),
+    FoldFun = fun(K, V, Acc) -> [[atom_to_list(K), "=", to_str(V)] | Acc] end,
+    case maps:fold(FoldFun, [], Param1) of
+        "" -> "";
+        QString -> "?" ++ lists:join("&", QString)
+    end.
 
 to_str(List) when is_list(List) -> List;
 to_str(false) -> "false";
 to_str(true) -> "true";
 to_str(Atom) when is_atom(Atom) -> atom_to_list(Atom).
 
-proc_data(DataList, Msg) when is_list(DataList) ->
-    [
-        begin
-            proc_data(Data, Msg)
-        end
-     || Data <- DataList
-    ];
-proc_data(
-    #{
-        action := Action,
-        '_index' := IndexT,
-        '_id' := IdT,
-        require_alias := RequiredAliasT,
-        fields := FieldsT
-    },
-    Msg
-) when Action =:= create; Action =:= index ->
-    [
-        emqx_utils_json:encode(
-            #{
-                Action => filter([
-                    {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)},
-                    {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)},
-                    {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)}
-                ])
-            }
-        ),
-        "\n",
-        emqx_placeholder:proc_tmpl(FieldsT, Msg),
-        "\n"
-    ];
-proc_data(
-    #{
-        action := delete,
-        '_index' := IndexT,
-        '_id' := IdT,
-        require_alias := RequiredAliasT
-    },
-    Msg
-) ->
-    [
-        emqx_utils_json:encode(
-            #{
-                delete => filter([
-                    {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)},
-                    {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)},
-                    {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)}
-                ])
-            }
-        ),
-        "\n"
-    ];
-proc_data(
-    #{
-        action := update,
-        '_index' := IndexT,
-        '_id' := IdT,
-        require_alias := RequiredAliasT,
-        doc_as_upsert := DocAsUpsert,
-        upsert := Upsert,
-        fields := FieldsT
-    },
-    Msg
-) ->
-    [
-        emqx_utils_json:encode(
-            #{
-                update => filter([
-                    {'_index', emqx_placeholder:proc_tmpl(IndexT, Msg)},
-                    {'_id', emqx_placeholder:proc_tmpl(IdT, Msg)},
-                    {required_alias, emqx_placeholder:proc_tmpl(RequiredAliasT, Msg)},
-                    {doc_as_upsert, emqx_placeholder:proc_tmpl(DocAsUpsert, Msg)},
-                    {upsert, emqx_placeholder:proc_tmpl(Upsert, Msg)}
-                ])
-            }
-        ),
-        "\n{\"doc\":",
-        emqx_placeholder:proc_tmpl(FieldsT, Msg),
-        "}\n"
-    ].
-
-filter(List) ->
-    Fun = fun
-        ({_K, V}) when V =:= undefined; V =:= <<"undefined">>; V =:= "undefined" ->
-            false;
-        ({_K, V}) when V =:= ""; V =:= <<>> ->
-            false;
-        ({_K, V}) when V =:= "false" -> {true, false};
-        ({_K, V}) when V =:= "true" -> {true, true};
-        ({_K, _V}) ->
-            true
-    end,
-    maps:from_list(lists:filtermap(Fun, List)).
-
-handle_response({ok, 200, _Headers, Body} = Resp) ->
-    eval_response_body(Body, Resp);
-handle_response({ok, 200, Body} = Resp) ->
-    eval_response_body(Body, Resp);
+handle_response({ok, Code, _Headers, _Body} = Resp) when Code =:= 200; Code =:= 201 ->
+    Resp;
+handle_response({ok, Code, _Body} = Resp) when Code =:= 200; Code =:= 201 ->
+    Resp;
 handle_response({ok, Code, _Headers, Body}) ->
     {error, #{code => Code, body => Body}};
 handle_response({ok, Code, Body}) ->
@@ -450,49 +369,5 @@ handle_response({ok, Code, Body}) ->
 handle_response({error, _} = Error) ->
     Error.
 
-eval_response_body(<<"{}">>, Resp) -> Resp;
-eval_response_body(Body, _Resp) -> {error, emqx_utils_json:decode(Body)}.
-
-preproc_data_template(DataList) when is_list(DataList) ->
-    [
-        begin
-            preproc_data_template(Data)
-        end
-     || Data <- DataList
-    ];
-preproc_data_template(#{action := create} = Data) ->
-    Index = maps:get('_index', Data, ""),
-    Id = maps:get('_id', Data, ""),
-    RequiredAlias = maps:get(require_alias, Data, ""),
-    Fields = maps:get(fields, Data, ""),
-    #{
-        action => create,
-        '_index' => emqx_placeholder:preproc_tmpl(Index),
-        '_id' => emqx_placeholder:preproc_tmpl(Id),
-        require_alias => emqx_placeholder:preproc_tmpl(RequiredAlias),
-        fields => emqx_placeholder:preproc_tmpl(Fields)
-    };
-preproc_data_template(#{action := index} = Data) ->
-    Data1 = preproc_data_template(Data#{action => create}),
-    Data1#{action => index};
-preproc_data_template(#{action := delete} = Data) ->
-    Data1 = preproc_data_template(Data#{action => create}),
-    Data2 = Data1#{action => delete},
-    maps:remove(fields, Data2);
-preproc_data_template(#{action := update} = Data) ->
-    Data1 = preproc_data_template(Data#{action => index}),
-    DocAsUpsert = maps:get(doc_as_upsert, Data, ""),
-    Upsert = maps:get(upsert, Data, ""),
-    Data1#{
-        action => update,
-        doc_as_upsert => emqx_placeholder:preproc_tmpl(DocAsUpsert),
-        upsert => emqx_placeholder:preproc_tmpl(Upsert)
-    }.
-
-try_render_message({ChannelId, Msg}, Channels) ->
-    case maps:find(ChannelId, Channels) of
-        {ok, #{data := Data}} ->
-            {ok, proc_data(Data, Msg)};
-        _ ->
-            {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
-    end.
+get_body_template(#{doc := Doc}) -> Doc;
+get_body_template(_) -> undefined.

+ 10 - 13
apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl

@@ -317,7 +317,7 @@ on_query(InstId, {send_message, Msg}, State) ->
 %% BridgeV2 entrypoint
 on_query(
     InstId,
-    {ActionId, MsgAndBody},
+    {ActionId, Msg},
     State = #{installed_actions := InstalledActions}
 ) when is_binary(ActionId) ->
     case {maps:get(request, State, undefined), maps:get(ActionId, InstalledActions, undefined)} of
@@ -334,10 +334,10 @@ on_query(
                 body := Body,
                 headers := Headers,
                 request_timeout := Timeout
-            } = process_request_and_action(Request, ActionState, MsgAndBody),
+            } = process_request_and_action(Request, ActionState, Msg),
             %% bridge buffer worker has retry, do not let ehttpc retry
             Retry = 2,
-            ClientId = clientid(MsgAndBody),
+            ClientId = clientid(Msg),
             on_query(
                 InstId,
                 {ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
@@ -430,7 +430,7 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
 %% BridgeV2 entrypoint
 on_query_async(
     InstId,
-    {ActionId, MsgAndBody},
+    {ActionId, Msg},
     ReplyFunAndArgs,
     State = #{installed_actions := InstalledActions}
 ) when is_binary(ActionId) ->
@@ -448,8 +448,8 @@ on_query_async(
                 body := Body,
                 headers := Headers,
                 request_timeout := Timeout
-            } = process_request_and_action(Request, ActionState, MsgAndBody),
-            ClientId = clientid(MsgAndBody),
+            } = process_request_and_action(Request, ActionState, Msg),
+            ClientId = clientid(Msg),
             on_query_async(
                 InstId,
                 {ClientId, Method, {Path, Headers, Body}, Timeout},
@@ -629,7 +629,7 @@ maybe_parse_template(Key, Conf) ->
 parse_template(String) ->
     emqx_template:parse(String).
 
-process_request_and_action(Request, ActionState, {Msg, Body}) ->
+process_request_and_action(Request, ActionState, Msg) ->
     MethodTemplate = maps:get(method, ActionState),
     Method = make_method(render_template_string(MethodTemplate, Msg)),
     PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)),
@@ -647,17 +647,15 @@ process_request_and_action(Request, ActionState, {Msg, Body}) ->
         render_headers(HeadersTemplate1, Msg),
         render_headers(HeadersTemplate2, Msg)
     ),
+    BodyTemplate = maps:get(body, ActionState),
+    Body = render_request_body(BodyTemplate, Msg),
     #{
         method => Method,
         path => Path,
         body => Body,
         headers => Headers,
         request_timeout => maps:get(request_timeout, ActionState)
-    };
-process_request_and_action(Request, ActionState, Msg) ->
-    BodyTemplate = maps:get(body, ActionState),
-    Body = render_request_body(BodyTemplate, Msg),
-    process_request_and_action(Request, ActionState, {Msg, Body}).
+    }.
 
 merge_proplist(Proplist1, Proplist2) ->
     lists:foldl(
@@ -877,7 +875,6 @@ redact_request({Path, Headers}) ->
 redact_request({Path, Headers, _Body}) ->
     {Path, Headers, <<"******">>}.
 
-clientid({Msg, _Body}) -> clientid(Msg);
 clientid(Msg) -> maps:get(clientid, Msg, undefined).
 
 -ifdef(TEST).

+ 1 - 1
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -190,7 +190,7 @@ connector_structs() ->
             mk(
                 hoconsc:map(name, ref(emqx_bridge_es_connector, config)),
                 #{
-                    desc => <<"Elastis Search Connector Config">>,
+                    desc => <<"ElasticSearch Connector Config">>,
                     required => false
                 }
             )}

+ 44 - 29
rel/i18n/emqx_bridge_es.hocon

@@ -1,5 +1,10 @@
 emqx_bridge_es {
 
+elasticsearch.desc:
+"""Elasticsearch Bridge"""
+elasticsearch.label:
+"""ElasticSearch"""
+
 config_enable.desc:
 """Enable or disable this bridge"""
 
@@ -74,15 +79,9 @@ desc_name.desc:
 desc_name.label:
 """Bridge Name"""
 
-config_parameters_action.desc:
-"""TODO"""
-
-config_parameters_action.label:
-"""Action"""
-
 config_parameters_index.desc:
-"""Name of the data stream, index, or index alias to perform the action on.
-This parameter is required if a <target> is not specified in the request path."""
+"""Name of index, or index alias to perform the action on.
+This parameter is required."""
 
 config_parameters_index.label:
 """_index"""
@@ -97,33 +96,49 @@ config_parameters_require_alias.desc:
 config_parameters_require_alias.label:
 """_require_alias"""
 
-config_parameters_fields.desc:
-"""The document source to index. Required for create and index operations."""
-config_parameters_fields.label:
-"""fields"""
+config_parameters_doc.desc:
+"""JSON document"""
+config_parameters_doc.label:
+"""doc"""
+
+action_parameters.desc:
+"""ElasticSearch action parameters"""
+
+action_parameters.label:
+"""Parameters"""
 
-config_parameters_doc_as_upsert.desc:
-"""Instead of sending a partial doc plus an upsert doc, you can set doc_as_upsert to true
-to use the contents of doc as the upsert value."""
-config_parameters_doc_as_upsert.label:
-"""doc_as_upsert"""
+config_overwrite.desc:
+"""Set to false If a document with the specified _id already exists(conflict), the operation will fail."""
 
-config_parameters_upsert.desc:
-"""If the document does not already exist, the contents of the upsert element are inserted as a new document."""
-config_parameters_upsert.label:
-"""upsert"""
+config_overwrite.label:
+"""overwrite"""
 
+action_config.desc:
+"""ElasticSearch Action Configuration"""
+action_config.label:
+"""ElasticSearch Action Config"""
 
-action_parameters_data.desc:
-"""ElasticSearch action parameter data"""
+action_create.desc:
+"""Adds a JSON document to the specified index and makes it searchable.
+If the target is an index and the document already exists,
+the request updates the document and increments its version."""
+action_create.label:
+"""Create Doc"""
 
-action_parameters_data.label:
-"""Parameter Data"""
+action_delete.desc:
+"""Removes a JSON document from the specified index."""
+action_delete.label:
+"""Delete Doc"""
 
-action_parameters.desc:
-"""ElasticSearch action parameters"""
+action_update.desc:
+"""Updates a document using the specified doc."""
+action_update.label:
+"""Update Doc"""
 
-action_parameters.label:
-"""Parameters"""
+action_resource_opts.desc:
+"""Resource options."""
+
+action_resource_opts.label:
+"""Resource Options"""
 
 }

+ 0 - 5
rel/i18n/emqx_bridge_es_connector.hocon

@@ -24,11 +24,6 @@ config_auth_basic_password.desc:
 config_auth_basic_password.label:
 """HTTP Basic Auth Password"""
 
-config_base_url.desc:
-"""The base URL of the external ElasticSearch service's REST interface."""
-config_base_url.label:
-"""ElasticSearch REST Service Base URL"""
-
 config_max_retries.desc:
 """HTTP request max retry times if failed."""
 

+ 2 - 0
scripts/spellcheck/dicts/emqx.txt

@@ -297,3 +297,5 @@ Syskeeper
 msacc
 now_us
 ns
+elasticsearch
+ElasticSearch