Просмотр исходного кода

fix(kafka_consumer): rename `force_utf8` to `none`

We actually enforce the key/value to be a valid UTF-8 string when
using `emqx_json:encode`, if we do encode using that, which is
template-dependent.
Thales Macedo Garitezi 3 лет назад
Родитель
Сommit
fc5dfa108a

+ 4 - 2
lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf

@@ -633,10 +633,12 @@ emqx_ee_bridge_kafka {
         desc {
             en: "Defines how the key or value from the Kafka message is"
                 " dealt with before being forwarded via MQTT.\n"
-                "<code>force_utf8</code> Uses UTF-8 encoding directly from the original message.\n"
+                "<code>none</code> Uses the key or value from the Kafka message unchanged."
+                "  Note: in this case, then the key or value must be a valid UTF-8 string.\n"
                 "<code>base64</code> Uses base-64 encoding on the received key or value."
             zh: "定义了在通过MQTT转发之前如何处理Kafka消息的键或值。"
-                "<code>force_utf8</code> 直接使用原始信息的UTF-8编码。\n"
+                "<code>none</code> 使用Kafka消息中的键或值,不改变。"
+                "  注意:在这种情况下,那么键或值必须是一个有效的UTF-8字符串。\n"
                 "<code>base64</code> 对收到的密钥或值使用base-64编码。"
         }
         label {

+ 20 - 13
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl

@@ -109,7 +109,7 @@ values(consumer) ->
             offset_reset_policy => <<"reset_to_latest">>,
             offset_commit_interval_seconds => 5
         },
-        key_encoding_mode => <<"force_utf8">>,
+        key_encoding_mode => <<"none">>,
         topic_mapping => [
             #{
                 kafka_topic => <<"kafka-topic-1">>,
@@ -124,7 +124,7 @@ values(consumer) ->
                 payload_template => <<"v = ${.value}">>
             }
         ],
-        value_encoding_mode => <<"force_utf8">>
+        value_encoding_mode => <<"none">>
     }.
 
 %% -------------------------------------------------------------------------------------------------
@@ -334,22 +334,16 @@ fields(consumer_opts) ->
                 #{
                     required => true,
                     desc => ?DESC(consumer_topic_mapping),
-                    validator =>
-                        fun
-                            ([]) ->
-                                {error, "There must be at least one Kafka-MQTT topic mapping"};
-                            ([_ | _]) ->
-                                ok
-                        end
+                    validator => fun consumer_topic_mapping_validator/1
                 }
             )},
         {key_encoding_mode,
-            mk(enum([force_utf8, base64]), #{
-                default => force_utf8, desc => ?DESC(consumer_encoding_mode)
+            mk(enum([none, base64]), #{
+                default => none, desc => ?DESC(consumer_encoding_mode)
             })},
         {value_encoding_mode,
-            mk(enum([force_utf8, base64]), #{
-                default => force_utf8, desc => ?DESC(consumer_encoding_mode)
+            mk(enum([none, base64]), #{
+                default => none, desc => ?DESC(consumer_encoding_mode)
             })}
     ];
 fields(consumer_topic_mapping) ->
@@ -449,3 +443,16 @@ kafka_producer_converter(
 kafka_producer_converter(Config, _HoconOpts) ->
     %% new schema
     Config.
+
+consumer_topic_mapping_validator(_TopicMapping = []) ->
+    {error, "There must be at least one Kafka-MQTT topic mapping"};
+consumer_topic_mapping_validator(TopicMapping = [_ | _]) ->
+    NumEntries = length(TopicMapping),
+    KafkaTopics = [KT || #{<<"kafka_topic">> := KT} <- TopicMapping],
+    DistinctKafkaTopics = length(lists:usort(KafkaTopics)),
+    case DistinctKafkaTopics =:= NumEntries of
+        true ->
+            ok;
+        false ->
+            {error, "Kafka topics must not be repeated in a bridge"}
+    end.

+ 2 - 2
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl

@@ -61,7 +61,7 @@
 }.
 -type offset_reset_policy() :: reset_to_latest | reset_to_earliest | reset_by_subscriber.
 %% -type mqtt_payload() :: full_message | message_value.
--type encoding_mode() :: force_utf8 | base64.
+-type encoding_mode() :: none | base64.
 -type consumer_init_data() :: #{
     hookpoint := binary(),
     key_encoding_mode := encoding_mode(),
@@ -490,7 +490,7 @@ render(FullMessage, PayloadTemplate) ->
     },
     emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, FullMessage, Opts).
 
-encode(Value, force_utf8) ->
+encode(Value, none) ->
     Value;
 encode(Value, base64) ->
     base64:encode(Value).

+ 2 - 2
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl

@@ -576,8 +576,8 @@ kafka_config(TestCase, _KafkaType, Config) ->
             "    offset_reset_policy = reset_to_latest\n"
             "  }\n"
             "~s"
-            "  key_encoding_mode = force_utf8\n"
-            "  value_encoding_mode = force_utf8\n"
+            "  key_encoding_mode = none\n"
+            "  value_encoding_mode = none\n"
             "  ssl {\n"
             "    enable = ~p\n"
             "    verify = verify_none\n"

+ 106 - 0
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl

@@ -76,6 +76,68 @@ kafka_producer_test() ->
 
     ok.
 
+kafka_consumer_test() ->
+    Conf1 = parse(kafka_consumer_hocon()),
+    ?assertMatch(
+        #{
+            <<"bridges">> :=
+                #{
+                    <<"kafka_consumer">> :=
+                        #{
+                            <<"my_consumer">> := _
+                        }
+                }
+        },
+        check(Conf1)
+    ),
+
+    %% Bad: can't repeat kafka topics.
+    BadConf1 = emqx_map_lib:deep_put(
+        [<<"bridges">>, <<"kafka_consumer">>, <<"my_consumer">>, <<"topic_mapping">>],
+        Conf1,
+        [
+            #{
+                <<"kafka_topic">> => <<"t1">>,
+                <<"mqtt_topic">> => <<"mqtt/t1">>,
+                <<"qos">> => 1,
+                <<"payload_template">> => <<"${.}">>
+            },
+            #{
+                <<"kafka_topic">> => <<"t1">>,
+                <<"mqtt_topic">> => <<"mqtt/t2">>,
+                <<"qos">> => 2,
+                <<"payload_template">> => <<"v = ${.value}">>
+            }
+        ]
+    ),
+    ?assertThrow(
+        {_, [
+            #{
+                path := "bridges.kafka_consumer.my_consumer.topic_mapping",
+                reason := "Kafka topics must not be repeated in a bridge"
+            }
+        ]},
+        check(BadConf1)
+    ),
+
+    %% Bad: there must be at least 1 mapping.
+    BadConf2 = emqx_map_lib:deep_put(
+        [<<"bridges">>, <<"kafka_consumer">>, <<"my_consumer">>, <<"topic_mapping">>],
+        Conf1,
+        []
+    ),
+    ?assertThrow(
+        {_, [
+            #{
+                path := "bridges.kafka_consumer.my_consumer.topic_mapping",
+                reason := "There must be at least one Kafka-MQTT topic mapping"
+            }
+        ]},
+        check(BadConf2)
+    ),
+
+    ok.
+
 %%===========================================================================
 %% Helper functions
 %%===========================================================================
@@ -179,3 +241,47 @@ kafka_producer_new_hocon() ->
     "  }\n"
     "}\n"
     "".
+
+%% erlfmt-ignore
+kafka_consumer_hocon() ->
+"""
+bridges.kafka_consumer.my_consumer {
+  enable = true
+  bootstrap_hosts = \"kafka-1.emqx.net:9292\"
+  connect_timeout = 5s
+  min_metadata_refresh_interval = 3s
+  metadata_request_timeout = 5s
+  authentication = {
+    mechanism = plain
+    username = emqxuser
+    password = password
+  }
+  kafka {
+    max_batch_bytes = 896KB
+    max_rejoin_attempts = 5
+    offset_commit_interval_seconds = 3
+    offset_reset_policy = reset_to_latest
+  }
+  topic_mapping = [
+    {
+      kafka_topic = \"kafka-topic-1\"
+      mqtt_topic = \"mqtt/topic/1\"
+      qos = 1
+      payload_template = \"${.}\"
+    },
+    {
+      kafka_topic = \"kafka-topic-2\"
+      mqtt_topic = \"mqtt/topic/2\"
+      qos = 2
+      payload_template = \"v = ${.value}\"
+    }
+  ]
+  key_encoding_mode = none
+  value_encoding_mode = none
+  ssl {
+    enable = false
+    verify = verify_none
+    server_name_indication = \"auto\"
+  }
+}
+""".