Просмотр исходного кода

Merge pull request #9761 from zmstone/0114-fix-kafka-value-template-and-docs

feat: introduce 'this' concept for placeholder, and use it in Kafka bridge
Zaiming (Stone) Shi 3 лет назад
Родитель
Сommit
a7fc5e8fe1

+ 1 - 1
Makefile

@@ -7,7 +7,7 @@ export EMQX_DEFAULT_RUNNER = debian:11-slim
 export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
 export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh)
 export EMQX_DASHBOARD_VERSION ?= v1.1.5
-export EMQX_EE_DASHBOARD_VERSION ?= e1.0.1-beta.12
+export EMQX_EE_DASHBOARD_VERSION ?= e1.0.1-beta.13
 export EMQX_REL_FORM ?= tgz
 export QUICER_DOWNLOAD_FROM_RELEASE = 1
 ifeq ($(OS),Windows_NT)

+ 2 - 0
apps/emqx/include/emqx_placeholder.hrl

@@ -17,6 +17,8 @@
 -ifndef(EMQX_PLACEHOLDER_HRL).
 -define(EMQX_PLACEHOLDER_HRL, true).
 
+-define(PH_VAR_THIS, <<"$_THIS_">>).
+
 -define(PH(Type), <<"${", Type/binary, "}">>).
 
 %% action: publish/subscribe/all

+ 6 - 4
apps/emqx_plugin_libs/src/emqx_placeholder.erl

@@ -39,6 +39,8 @@
     sql_data/1
 ]).
 
+-include_lib("emqx/include/emqx_placeholder.hrl").
+
 -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
 
 -define(EX_PLACE_HOLDER_DOUBLE_QUOTE, "(\\$\\{[a-zA-Z0-9\\._]+\\}|\"\\$\\{[a-zA-Z0-9\\._]+\\}\")").
@@ -233,9 +235,6 @@ proc_param_str(Tokens, Data, Quote) ->
         proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})
     ).
 
-%% backward compatibility for hot upgrading from =< e4.2.1
-get_phld_var(Fun, Data) when is_function(Fun) ->
-    Fun(Data);
 get_phld_var(Phld, Data) ->
     emqx_rule_maps:nested_get(Phld, Data).
 
@@ -298,9 +297,12 @@ replace_with(Tmpl, RE, '$n') ->
             Parts
         ),
     Res.
-
+parse_nested(<<".", R/binary>>) ->
+    %% ignore the root .
+    parse_nested(R);
 parse_nested(Attr) ->
     case string:split(Attr, <<".">>, all) of
+        [<<>>] -> {var, ?PH_VAR_THIS};
         [Attr] -> {var, Attr};
         Nested -> {path, [{key, P} || P <- Nested]}
     end.

+ 4 - 0
apps/emqx_rule_engine/src/emqx_rule_maps.erl

@@ -26,9 +26,13 @@
     unsafe_atom_key_map/1
 ]).
 
+-include_lib("emqx/include/emqx_placeholder.hrl").
+
 nested_get(Key, Data) ->
     nested_get(Key, Data, undefined).
 
+nested_get({var, ?PH_VAR_THIS}, Data, _Default) ->
+    Data;
 nested_get({var, Key}, Data, Default) ->
     general_map_get({key, Key}, Data, Data, Default);
 nested_get({path, Path}, Data, Default) when is_list(Path) ->

+ 9 - 6
lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf

@@ -250,9 +250,10 @@ emqx_ee_bridge_kafka {
     kafka_message_key {
         desc {
             en: "Template to render Kafka message key. "
-                "If the desired variable for this template is not found in the input data "
-                "<code>NULL</code> is used."
-            zh: "生成 Kafka 消息 Key 的模版。当所需要的输入没有时,会使用 <code>NULL</code>。"
+                "If the template is rendered into a NULL value (i.e. there is no such data field in Rule Engine context) "
+                "then Kafka's <code>NULL</code> (but not empty string) is used."
+            zh: "生成 Kafka 消息 Key 的模版。如果模版生成后为空值,"
+                "则会使用 Kafka 的 <code>NULL</code> ,而非空字符串。"
         }
         label {
             en: "Message Key"
@@ -262,9 +263,11 @@ emqx_ee_bridge_kafka {
     kafka_message_value {
         desc {
             en: "Template to render Kafka message value. "
-                "If the desired variable for this template is not found in the input data "
-                "<code>NULL</code> is used."
-            zh: "生成 Kafka 消息 Value 的模版。当所需要的输入没有时,会使用 <code>NULL</code>。"
+                "If the template is rendered "
+                "into a NULL value (i.e. there is no such data field in Rule Engine context) "
+                "then Kafka's <code>NULL</code> (but not empty string) is used."
+            zh: "生成 Kafka 消息 Value 的模版。如果模版生成后为空值,"
+                "则会使用 Kafka 的 <code>NULL</code>,而非空字符串。"
         }
         label {
             en: "Message Value"

+ 3 - 3
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl

@@ -207,11 +207,11 @@ fields(producer_kafka_opts) ->
     ];
 fields(kafka_message) ->
     [
-        {key, mk(string(), #{default => "${clientid}", desc => ?DESC(kafka_message_key)})},
-        {value, mk(string(), #{default => "${payload}", desc => ?DESC(kafka_message_value)})},
+        {key, mk(string(), #{default => "${.clientid}", desc => ?DESC(kafka_message_key)})},
+        {value, mk(string(), #{default => "${.}", desc => ?DESC(kafka_message_value)})},
         {timestamp,
             mk(string(), #{
-                default => "${timestamp}", desc => ?DESC(kafka_message_timestamp)
+                default => "${.timestamp}", desc => ?DESC(kafka_message_timestamp)
             })}
     ];
 fields(producer_buffer) ->

+ 18 - 7
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl

@@ -145,15 +145,19 @@ on_query(_InstId, {send_message, Message}, #{message_template := Template, produ
     {_Partition, _Pid} = wolff:send(Producers, [KafkaMessage], {fun ?MODULE:on_kafka_ack/3, [#{}]}),
     {async_return, ok}.
 
-compile_message_template(#{
-    key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate
-}) ->
+compile_message_template(T) ->
+    KeyTemplate = maps:get(key, T, <<"${.clientid}">>),
+    ValueTemplate = maps:get(value, T, <<"${.}">>),
+    TimestampTemplate = maps:get(value, T, <<"${.timestamp}">>),
     #{
-        key => emqx_plugin_libs_rule:preproc_tmpl(KeyTemplate),
-        value => emqx_plugin_libs_rule:preproc_tmpl(ValueTemplate),
-        timestamp => emqx_plugin_libs_rule:preproc_tmpl(TimestampTemplate)
+        key => preproc_tmpl(KeyTemplate),
+        value => preproc_tmpl(ValueTemplate),
+        timestamp => preproc_tmpl(TimestampTemplate)
     }.
 
+preproc_tmpl(Tmpl) ->
+    emqx_plugin_libs_rule:preproc_tmpl(Tmpl).
+
 render_message(
     #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message
 ) ->
@@ -164,7 +168,14 @@ render_message(
     }.
 
 render(Template, Message) ->
-    emqx_plugin_libs_rule:proc_tmpl(Template, Message).
+    Opts = #{
+        var_trans => fun
+            (undefined) -> <<"">>;
+            (X) -> emqx_plugin_libs_rule:bin(X)
+        end,
+        return => full_binary
+    },
+    emqx_plugin_libs_rule:proc_tmpl(Template, Message, Opts).
 
 render_timestamp(Template, Message) ->
     try

+ 6 - 1
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl

@@ -260,7 +260,11 @@ kafka_bridge_rest_api_helper(Config) ->
                 topic => <<"t/#">>
             },
             <<"kafka">> => #{
-                <<"topic">> => erlang:list_to_binary(KafkaTopic)
+                <<"topic">> => erlang:list_to_binary(KafkaTopic),
+                <<"message">> => #{
+                    <<"key">> => <<"${clientid}">>,
+                    <<"value">> => <<"${.payload}">>
+                }
             }
         }
     },
@@ -501,6 +505,7 @@ producer = {
     }
     kafka = {
         topic = \"{{ kafka_topic }}\"
+        message = {key = \"${clientid}\", value = \"${.payload}\"}
     }
 }
 """.