Prechádzať zdrojové kódy

feat: es's update support doc_as_upsert

zhongwencool 2 rokov pred
rodič
commit
59797cfea7

+ 12 - 0
apps/emqx_bridge_es/src/emqx_bridge_es.erl

@@ -73,6 +73,7 @@ fields(action_update) ->
         index(),
         index(),
         id(true),
         id(true),
         doc(),
         doc(),
+        doc_as_upsert(),
         routing(),
         routing(),
         require_alias()
         require_alias()
         | http_common_opts()
         | http_common_opts()
@@ -172,6 +173,17 @@ http_common_opts() ->
         emqx_bridge_http_schema:fields("parameters_opts")
         emqx_bridge_http_schema:fields("parameters_opts")
     ).
     ).
 
 
+doc_as_upsert() ->
+    {doc_as_upsert,
+        ?HOCON(
+            boolean(),
+            #{
+                required => false,
+                default => false,
+                desc => ?DESC("config_doc_as_upsert")
+            }
+        )}.
+
 routing() ->
 routing() ->
     {routing,
     {routing,
         ?HOCON(
         ?HOCON(

+ 30 - 3
apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl

@@ -33,6 +33,8 @@
     connector_example_values/0
     connector_example_values/0
 ]).
 ]).
 
 
+-export([render_template/2]).
+
 %% emqx_connector_resource behaviour callbacks
 %% emqx_connector_resource behaviour callbacks
 -export([connector_config/2]).
 -export([connector_config/2]).
 
 
@@ -286,8 +288,12 @@ on_add_channel(
                 method => method(Parameter),
                 method => method(Parameter),
                 body => get_body_template(Parameter)
                 body => get_body_template(Parameter)
             },
             },
+            ChannelConfig = #{
+                parameters => Parameter1,
+                render_template_func => fun ?MODULE:render_template/2
+            },
             {ok, State} = emqx_bridge_http_connector:on_add_channel(
             {ok, State} = emqx_bridge_http_connector:on_add_channel(
-                InstanceId, State0, ChannelId, #{parameters => Parameter1}
+                InstanceId, State0, ChannelId, ChannelConfig
             ),
             ),
             Channel = Parameter1,
             Channel = Parameter1,
             Channels2 = Channels#{ChannelId => Channel},
             Channels2 = Channels#{ChannelId => Channel},
@@ -310,9 +316,23 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
             {error, not_exists}
             {error, not_exists}
     end.
     end.
 
 
+render_template(Template, Msg) ->
+    % Ignoring errors here, undefined bindings will be replaced with empty string.
+    Opts = #{var_trans => fun to_string/2},
+    {String, _Errors} = emqx_template:render(Template, {emqx_jsonish, Msg}, Opts),
+    String.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Internal Functions
 %% Internal Functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
+
+to_string(Name, Value) ->
+    emqx_template:to_string(render_var(Name, Value)).
+render_var(_, undefined) ->
+    % NOTE Any allowed but undefined binding will be replaced with empty string
+    <<>>;
+render_var(_Name, Value) ->
+    Value.
 %% delete DELETE /<index>/_doc/<_id>
 %% delete DELETE /<index>/_doc/<_id>
 path(#{action := delete, id := Id, index := Index} = Action) ->
 path(#{action := delete, id := Id, index := Index} = Action) ->
     BasePath = ["/", Index, "/_doc/", Id],
     BasePath = ["/", Index, "/_doc/", Id],
@@ -370,5 +390,12 @@ handle_response({ok, Code, Body}) ->
 handle_response({error, _} = Error) ->
 handle_response({error, _} = Error) ->
     Error.
     Error.
 
 
-get_body_template(#{doc := Doc}) -> Doc;
-get_body_template(_) -> undefined.
+get_body_template(#{action := update, doc := Doc} = Template) ->
+    case maps:get(doc_as_upsert, Template, false) of
+        false -> <<"{\"doc\":", Doc/binary, "}">>;
+        true -> <<"{\"doc\":", Doc/binary, ",\"doc_as_upsert\": true}">>
+    end;
+get_body_template(#{doc := Doc}) ->
+    Doc;
+get_body_template(_) ->
+    undefined.

+ 58 - 51
apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl

@@ -103,45 +103,46 @@ end_per_testcase(_TestCase, _Config) ->
 %% Helper fns
 %% Helper fns
 %%-------------------------------------------------------------------------------------
 %%-------------------------------------------------------------------------------------
 
 
-check_send_message_with_action(ActionName, ConnectorName) ->
-    #{payload := _Payload} = send_message(ActionName),
+check_send_message_with_action(Topic, ActionName, ConnectorName) ->
+    send_message(Topic),
     %% ######################################
     %% ######################################
     %% Check if message is sent to es
     %% Check if message is sent to es
     %% ######################################
     %% ######################################
+    timer:sleep(500),
     check_action_metrics(ActionName, ConnectorName).
     check_action_metrics(ActionName, ConnectorName).
 
 
-send_message(ActionName) ->
-    %% ######################################
-    %% Create message
-    %% ######################################
-    Time = erlang:unique_integer(),
-    BinTime = integer_to_binary(Time),
-    Payload = #{<<"name">> => <<"emqx">>, <<"release_time">> => BinTime},
+send_message(Topic) ->
+    Now = emqx_utils_calendar:now_to_rfc3339(microsecond),
+    Doc = #{<<"name">> => <<"emqx">>, <<"release_date">> => Now},
     Index = <<"emqx-test-index">>,
     Index = <<"emqx-test-index">>,
-    Msg = #{
-        clientid => BinTime,
-        payload => Payload,
-        timestamp => Time,
-        index => Index
-    },
-    %% ######################################
-    %% Send message
-    %% ######################################
-    emqx_bridge_v2:send_message(?TYPE, ActionName, Msg, #{}),
-    #{payload => Payload}.
+    Payload = emqx_utils_json:encode(#{doc => Doc, index => Index}),
+
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    {ok, Client} = emqtt:start_link([{clientid, ClientId}, {port, 1883}]),
+    {ok, _} = emqtt:connect(Client),
+    ok = emqtt:publish(Client, Topic, Payload, [{qos, 0}]),
+    ok.
 
 
 check_action_metrics(ActionName, ConnectorName) ->
 check_action_metrics(ActionName, ConnectorName) ->
     ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName),
     ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName),
     Metrics =
     Metrics =
         #{
         #{
             match => emqx_resource_metrics:matched_get(ActionId),
             match => emqx_resource_metrics:matched_get(ActionId),
+            success => emqx_resource_metrics:success_get(ActionId),
             failed => emqx_resource_metrics:failed_get(ActionId),
             failed => emqx_resource_metrics:failed_get(ActionId),
             queuing => emqx_resource_metrics:queuing_get(ActionId),
             queuing => emqx_resource_metrics:queuing_get(ActionId),
             dropped => emqx_resource_metrics:dropped_get(ActionId)
             dropped => emqx_resource_metrics:dropped_get(ActionId)
         },
         },
     ?assertEqual(
     ?assertEqual(
-        #{match => 1, dropped => 0, failed => 0, queuing => 0},
-        Metrics
+        #{
+            match => 1,
+            success => 1,
+            dropped => 0,
+            failed => 0,
+            queuing => 0
+        },
+        Metrics,
+        {ActionName, ConnectorName, ActionId}
     ).
     ).
 
 
 action_config(ConnectorName) ->
 action_config(ConnectorName) ->
@@ -164,7 +165,7 @@ action(ConnectorName) ->
         <<"connector">> => ConnectorName,
         <<"connector">> => ConnectorName,
         <<"resource_opts">> => #{
         <<"resource_opts">> => #{
             <<"health_check_interval">> => <<"30s">>,
             <<"health_check_interval">> => <<"30s">>,
-            <<"query_mode">> => <<"async">>
+            <<"query_mode">> => <<"sync">>
         }
         }
     }.
     }.
 
 
@@ -235,7 +236,8 @@ t_create_remove_list(Config) ->
     #{
     #{
         name := <<"test_action_1">>,
         name := <<"test_action_1">>,
         type := <<"elasticsearch">>,
         type := <<"elasticsearch">>,
-        raw_config := _RawConfig
+        raw_config := _,
+        status := connected
     } = ActionInfo,
     } = ActionInfo,
     {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_2, ActionConfig),
     {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_2, ActionConfig),
     2 = length(emqx_bridge_v2:list()),
     2 = length(emqx_bridge_v2:list()),
@@ -252,39 +254,44 @@ t_send_message(Config) ->
     {ok, _} = emqx_connector:create(?TYPE, test_connector2, ConnectorConfig),
     {ok, _} = emqx_connector:create(?TYPE, test_connector2, ConnectorConfig),
     ActionConfig = action(<<"test_connector2">>),
     ActionConfig = action(<<"test_connector2">>),
     {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_1, ActionConfig),
     {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_1, ActionConfig),
+    Rule = #{
+        id => <<"rule:t_es">>,
+        sql => <<"SELECT\n  *\nFROM\n  \"es/#\"">>,
+        actions => [<<"elasticsearch:test_action_1">>],
+        description => <<"sink doc to elasticsearch">>
+    },
+    {ok, _} = emqx_rule_engine:create_rule(Rule),
     %% Use the action to send a message
     %% Use the action to send a message
-    check_send_message_with_action(test_action_1, test_connector2),
+    check_send_message_with_action(<<"es/1">>, test_action_1, test_connector2),
     %% Create a few more bridges with the same connector and test them
     %% Create a few more bridges with the same connector and test them
-    BridgeNames1 = [
-        list_to_atom("test_bridge_v2_" ++ integer_to_list(I))
-     || I <- lists:seq(2, 10)
-    ],
-    lists:foreach(
-        fun(BridgeName) ->
-            {ok, _} = emqx_bridge_v2:create(?TYPE, BridgeName, ActionConfig),
-            check_send_message_with_action(BridgeName, test_connector2)
-        end,
-        BridgeNames1
-    ),
-    BridgeNames = [test_bridge_v2_1 | BridgeNames1],
-    %% Send more messages to the bridges
-    lists:foreach(
-        fun(BridgeName) ->
-            lists:foreach(
-                fun(_) ->
-                    check_send_message_with_action(BridgeName, test_connector2)
-                end,
-                lists:seq(1, 10)
-            )
-        end,
-        BridgeNames
-    ),
+    ActionNames1 =
+        lists:foldl(
+            fun(I, Acc) ->
+                Seq = integer_to_binary(I),
+                ActionNameStr = "test_action_" ++ integer_to_list(I),
+                ActionName = list_to_atom(ActionNameStr),
+                {ok, _} = emqx_bridge_v2:create(?TYPE, ActionName, ActionConfig),
+                Rule1 = #{
+                    id => <<"rule:t_es", Seq/binary>>,
+                    sql => <<"SELECT\n  *\nFROM\n  \"es/", Seq/binary, "\"">>,
+                    actions => [<<"elasticsearch:", (list_to_binary(ActionNameStr))/binary>>],
+                    description => <<"sink doc to elasticsearch">>
+                },
+                {ok, _} = emqx_rule_engine:create_rule(Rule1),
+                Topic = <<"es/", Seq/binary>>,
+                check_send_message_with_action(Topic, ActionName, test_connector2),
+                [ActionName | Acc]
+            end,
+            [],
+            lists:seq(2, 10)
+        ),
+    ActionNames = [test_action_1 | ActionNames1],
     %% Remove all the bridges
     %% Remove all the bridges
     lists:foreach(
     lists:foreach(
         fun(BridgeName) ->
         fun(BridgeName) ->
             ok = emqx_bridge_v2:remove(?TYPE, BridgeName)
             ok = emqx_bridge_v2:remove(?TYPE, BridgeName)
         end,
         end,
-        BridgeNames
+        ActionNames
     ),
     ),
     emqx_connector:remove(?TYPE, test_connector2),
     emqx_connector:remove(?TYPE, test_connector2),
     ok.
     ok.
@@ -361,7 +368,7 @@ t_http_api_get(Config) ->
                             <<"max_retries">> := 2,
                             <<"max_retries">> := 2,
                             <<"overwrite">> := true
                             <<"overwrite">> := true
                         },
                         },
-                    <<"resource_opts">> := #{<<"query_mode">> := <<"async">>},
+                    <<"resource_opts">> := #{<<"query_mode">> := <<"sync">>},
                     <<"status">> := <<"connected">>,
                     <<"status">> := <<"connected">>,
                     <<"status_reason">> := <<>>,
                     <<"status_reason">> := <<>>,
                     <<"type">> := <<"elasticsearch">>
                     <<"type">> := <<"elasticsearch">>

+ 23 - 19
apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl

@@ -266,7 +266,9 @@ on_add_channel(
 ) ->
 ) ->
     InstalledActions = maps:get(installed_actions, OldState, #{}),
     InstalledActions = maps:get(installed_actions, OldState, #{}),
     {ok, ActionState} = do_create_http_action(ActionConfig),
     {ok, ActionState} = do_create_http_action(ActionConfig),
-    NewInstalledActions = maps:put(ActionId, ActionState, InstalledActions),
+    RenderTemplate = maps:get(render_template_func, ActionConfig, fun render_template/2),
+    ActionState1 = ActionState#{render_template_func => RenderTemplate},
+    NewInstalledActions = maps:put(ActionId, ActionState1, InstalledActions),
     NewState = maps:put(installed_actions, NewInstalledActions, OldState),
     NewState = maps:put(installed_actions, NewInstalledActions, OldState),
     {ok, NewState}.
     {ok, NewState}.
 
 
@@ -631,9 +633,10 @@ parse_template(String) ->
 
 
 process_request_and_action(Request, ActionState, Msg) ->
 process_request_and_action(Request, ActionState, Msg) ->
     MethodTemplate = maps:get(method, ActionState),
     MethodTemplate = maps:get(method, ActionState),
-    Method = make_method(render_template_string(MethodTemplate, Msg)),
-    PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)),
-    PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)),
+    RenderTmplFunc = maps:get(render_template_func, ActionState),
+    Method = make_method(render_template_string(MethodTemplate, RenderTmplFunc, Msg)),
+    PathPrefix = unicode:characters_to_list(RenderTmplFunc(maps:get(path, Request), Msg)),
+    PathSuffix = unicode:characters_to_list(RenderTmplFunc(maps:get(path, ActionState), Msg)),
 
 
     Path =
     Path =
         case PathSuffix of
         case PathSuffix of
@@ -644,11 +647,11 @@ process_request_and_action(Request, ActionState, Msg) ->
     HeadersTemplate1 = maps:get(headers, Request),
     HeadersTemplate1 = maps:get(headers, Request),
     HeadersTemplate2 = maps:get(headers, ActionState),
     HeadersTemplate2 = maps:get(headers, ActionState),
     Headers = merge_proplist(
     Headers = merge_proplist(
-        render_headers(HeadersTemplate1, Msg),
-        render_headers(HeadersTemplate2, Msg)
+        render_headers(HeadersTemplate1, RenderTmplFunc, Msg),
+        render_headers(HeadersTemplate2, RenderTmplFunc, Msg)
     ),
     ),
     BodyTemplate = maps:get(body, ActionState),
     BodyTemplate = maps:get(body, ActionState),
-    Body = render_request_body(BodyTemplate, Msg),
+    Body = render_request_body(BodyTemplate, RenderTmplFunc, Msg),
     #{
     #{
         method => Method,
         method => Method,
         path => Path,
         path => Path,
@@ -681,25 +684,26 @@ process_request(
     } = Conf,
     } = Conf,
     Msg
     Msg
 ) ->
 ) ->
+    RenderTemplateFun = fun render_template/2,
     Conf#{
     Conf#{
-        method => make_method(render_template_string(MethodTemplate, Msg)),
-        path => unicode:characters_to_list(render_template(PathTemplate, Msg)),
-        body => render_request_body(BodyTemplate, Msg),
-        headers => render_headers(HeadersTemplate, Msg),
+        method => make_method(render_template_string(MethodTemplate, RenderTemplateFun, Msg)),
+        path => unicode:characters_to_list(RenderTemplateFun(PathTemplate, Msg)),
+        body => render_request_body(BodyTemplate, RenderTemplateFun, Msg),
+        headers => render_headers(HeadersTemplate, RenderTemplateFun, Msg),
         request_timeout => ReqTimeout
         request_timeout => ReqTimeout
     }.
     }.
 
 
-render_request_body(undefined, Msg) ->
+render_request_body(undefined, _, Msg) ->
     emqx_utils_json:encode(Msg);
     emqx_utils_json:encode(Msg);
-render_request_body(BodyTks, Msg) ->
-    render_template(BodyTks, Msg).
+render_request_body(BodyTks, RenderTmplFunc, Msg) ->
+    RenderTmplFunc(BodyTks, Msg).
 
 
-render_headers(HeaderTks, Msg) ->
+render_headers(HeaderTks, RenderTmplFunc, Msg) ->
     lists:map(
     lists:map(
         fun({K, V}) ->
         fun({K, V}) ->
             {
             {
-                render_template_string(K, Msg),
-                render_template_string(emqx_secret:unwrap(V), Msg)
+                render_template_string(K, RenderTmplFunc, Msg),
+                render_template_string(emqx_secret:unwrap(V), RenderTmplFunc, Msg)
             }
             }
         end,
         end,
         HeaderTks
         HeaderTks
@@ -710,8 +714,8 @@ render_template(Template, Msg) ->
     {String, _Errors} = emqx_template:render(Template, {emqx_jsonish, Msg}),
     {String, _Errors} = emqx_template:render(Template, {emqx_jsonish, Msg}),
     String.
     String.
 
 
-render_template_string(Template, Msg) ->
-    unicode:characters_to_binary(render_template(Template, Msg)).
+render_template_string(Template, RenderTmplFunc, Msg) ->
+    unicode:characters_to_binary(RenderTmplFunc(Template, Msg)).
 
 
 make_method(M) when M == <<"POST">>; M == <<"post">> -> post;
 make_method(M) when M == <<"POST">>; M == <<"post">> -> post;
 make_method(M) when M == <<"PUT">>; M == <<"put">> -> put;
 make_method(M) when M == <<"PUT">>; M == <<"put">> -> put;

+ 6 - 0
rel/i18n/emqx_bridge_es.hocon

@@ -56,6 +56,12 @@ config_routing.desc:
 config_routing.label:
 config_routing.label:
 """Routing"""
 """Routing"""
 
 
+config_doc_as_upsert.desc:
+"""Instead of sending a partial doc plus an upsert doc,
+you can set doc_as_upsert to true to use the contents of doc as the upsert value."""
+config_doc_as_upsert.label:
+"""doc_as_upsert"""
+
 config_wait_for_active_shards.desc:
 config_wait_for_active_shards.desc:
 """The number of shard copies that must be active before proceeding with the operation.
 """The number of shard copies that must be active before proceeding with the operation.
 Set to all or any positive integer up to the total number of shards in the index (number_of_replicas+1).
 Set to all or any positive integer up to the total number of shards in the index (number_of_replicas+1).