Просмотр исходного кода

Merge pull request #10249 from thalesmg/fix-kafka-offset-doc-rv50

feat(kafka_consumer): tie `offset_reset_policy` and `begin_offset` together (rv5.0)
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
1a7ca7235e

+ 1 - 1
deploy/docker/Dockerfile

@@ -53,7 +53,7 @@ VOLUME ["/opt/emqx/log", "/opt/emqx/data"]
 # - 11883 port for internal MQTT/TCP
 # - 11883 port for internal MQTT/TCP
 # - 18083 for dashboard and API
 # - 18083 for dashboard and API
 # - 4370 default Erlang distribution port
 # - 4370 default Erlang distribution port
-# - 5369 for backplain gen_rpc
+# - 5369 for backplane gen_rpc
 EXPOSE 1883 8083 8084 8883 11883 18083 4370 5369
 EXPOSE 1883 8083 8084 8883 11883 18083 4370 5369
 
 
 ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"]
 ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"]

+ 4 - 4
lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf

@@ -599,10 +599,10 @@ emqx_ee_bridge_kafka {
     }
     }
     consumer_offset_reset_policy {
     consumer_offset_reset_policy {
         desc {
         desc {
-            en: "Defines how the consumers should reset the start offset when "
-                "a topic partition has and invalid or no initial offset."
-            zh: "定义当一个主题分区的初始偏移量无效或没有初始偏移量时,"
-                "消费者应如何重置开始偏移量。"
+            en: "Defines from which offset a consumer should start fetching when there"
+                " is no commit history or when the commit history becomes invalid."
+            zh: "当没有主题分区没有偏移量的历史记录,或则历史记录失效后,"
+                "消费者应该使用哪个偏移量重新开始消费"
         }
         }
         label {
         label {
             en: "Offset Reset Policy"
             en: "Offset Reset Policy"

+ 3 - 3
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl

@@ -105,7 +105,7 @@ values(consumer) ->
     #{
     #{
         kafka => #{
         kafka => #{
             max_batch_bytes => <<"896KB">>,
             max_batch_bytes => <<"896KB">>,
-            offset_reset_policy => <<"reset_to_latest">>,
+            offset_reset_policy => <<"latest">>,
             offset_commit_interval_seconds => 5
             offset_commit_interval_seconds => 5
         },
         },
         key_encoding_mode => <<"none">>,
         key_encoding_mode => <<"none">>,
@@ -370,8 +370,8 @@ fields(consumer_kafka_opts) ->
             })},
             })},
         {offset_reset_policy,
         {offset_reset_policy,
             mk(
             mk(
-                enum([reset_to_latest, reset_to_earliest, reset_by_subscriber]),
-                #{default => reset_to_latest, desc => ?DESC(consumer_offset_reset_policy)}
+                enum([latest, earliest]),
+                #{default => latest, desc => ?DESC(consumer_offset_reset_policy)}
             )},
             )},
         {offset_commit_interval_seconds,
         {offset_commit_interval_seconds,
             mk(
             mk(

+ 10 - 3
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl

@@ -59,8 +59,7 @@
     subscriber_id := subscriber_id(),
     subscriber_id := subscriber_id(),
     kafka_client_id := brod:client_id()
     kafka_client_id := brod:client_id()
 }.
 }.
--type offset_reset_policy() :: reset_to_latest | reset_to_earliest | reset_by_subscriber.
-%% -type mqtt_payload() :: full_message | message_value.
+-type offset_reset_policy() :: latest | earliest.
 -type encoding_mode() :: none | base64.
 -type encoding_mode() :: none | base64.
 -type consumer_init_data() :: #{
 -type consumer_init_data() :: #{
     hookpoint := binary(),
     hookpoint := binary(),
@@ -271,7 +270,7 @@ start_consumer(Config, InstanceId, ClientID) ->
             max_batch_bytes := MaxBatchBytes,
             max_batch_bytes := MaxBatchBytes,
             max_rejoin_attempts := MaxRejoinAttempts,
             max_rejoin_attempts := MaxRejoinAttempts,
             offset_commit_interval_seconds := OffsetCommitInterval,
             offset_commit_interval_seconds := OffsetCommitInterval,
-            offset_reset_policy := OffsetResetPolicy
+            offset_reset_policy := OffsetResetPolicy0
         },
         },
         key_encoding_mode := KeyEncodingMode,
         key_encoding_mode := KeyEncodingMode,
         topic_mapping := TopicMapping0,
         topic_mapping := TopicMapping0,
@@ -290,7 +289,15 @@ start_consumer(Config, InstanceId, ClientID) ->
     %% cluster, so that the load gets distributed between all
     %% cluster, so that the load gets distributed between all
     %% consumers and we don't repeat messages in the same cluster.
     %% consumers and we don't repeat messages in the same cluster.
     GroupID = consumer_group_id(BridgeName),
     GroupID = consumer_group_id(BridgeName),
+    %% earliest or latest
+    BeginOffset = OffsetResetPolicy0,
+    OffsetResetPolicy =
+        case OffsetResetPolicy0 of
+            latest -> reset_to_latest;
+            earliest -> reset_to_earliest
+        end,
     ConsumerConfig = [
     ConsumerConfig = [
+        {begin_offset, BeginOffset},
         {max_bytes, MaxBatchBytes},
         {max_bytes, MaxBatchBytes},
         {offset_reset_policy, OffsetResetPolicy}
         {offset_reset_policy, OffsetResetPolicy}
     ],
     ],

+ 56 - 1
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl

@@ -53,6 +53,7 @@ sasl_only_tests() ->
 %% tests that do not need to be run on all groups
 %% tests that do not need to be run on all groups
 only_once_tests() ->
 only_once_tests() ->
     [
     [
+        t_begin_offset_earliest,
         t_bridge_rule_action_source,
         t_bridge_rule_action_source,
         t_cluster_group,
         t_cluster_group,
         t_node_joins_existing_cluster,
         t_node_joins_existing_cluster,
@@ -585,7 +586,7 @@ kafka_config(TestCase, _KafkaType, Config) ->
             "    max_rejoin_attempts = 5\n"
             "    max_rejoin_attempts = 5\n"
             "    offset_commit_interval_seconds = 3\n"
             "    offset_commit_interval_seconds = 3\n"
             %% todo: matrix this
             %% todo: matrix this
-            "    offset_reset_policy = reset_to_latest\n"
+            "    offset_reset_policy = latest\n"
             "  }\n"
             "  }\n"
             "~s"
             "~s"
             "  key_encoding_mode = none\n"
             "  key_encoding_mode = none\n"
@@ -1926,3 +1927,57 @@ t_cluster_node_down(Config) ->
         end
         end
     ),
     ),
     ok.
     ok.
+
+t_begin_offset_earliest(Config) ->
+    MQTTTopic = ?config(mqtt_topic, Config),
+    ResourceId = resource_id(Config),
+    Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
+    {ok, C} = emqtt:start_link([{proto_ver, v5}]),
+    on_exit(fun() -> emqtt:stop(C) end),
+    {ok, _} = emqtt:connect(C),
+    {ok, _, [2]} = emqtt:subscribe(C, MQTTTopic, 2),
+
+    ?check_trace(
+        begin
+            %% publish a message before the bridge is started.
+            NumMessages = 5,
+            lists:foreach(
+                fun(N) ->
+                    publish(Config, [
+                        #{
+                            key => <<"mykey", (integer_to_binary(N))/binary>>,
+                            value => Payload,
+                            headers => [{<<"hkey">>, <<"hvalue">>}]
+                        }
+                    ])
+                end,
+                lists:seq(1, NumMessages)
+            ),
+
+            {ok, _} = create_bridge(Config, #{
+                <<"kafka">> => #{<<"offset_reset_policy">> => <<"earliest">>}
+            }),
+
+            #{num_published => NumMessages}
+        end,
+        fun(Res, _Trace) ->
+            #{num_published := NumMessages} = Res,
+            %% we should receive messages published before starting
+            %% the consumers
+            Published = receive_published(#{n => NumMessages}),
+            Payloads = lists:map(
+                fun(#{payload := P}) -> emqx_json:decode(P, [return_maps]) end,
+                Published
+            ),
+            ?assert(
+                lists:all(
+                    fun(#{<<"value">> := V}) -> V =:= Payload end,
+                    Payloads
+                ),
+                #{payloads => Payloads}
+            ),
+            ?assertEqual(NumMessages, emqx_resource_metrics:received_get(ResourceId)),
+            ok
+        end
+    ),
+    ok.

+ 1 - 1
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl

@@ -260,7 +260,7 @@ bridges.kafka_consumer.my_consumer {
     max_batch_bytes = 896KB
     max_batch_bytes = 896KB
     max_rejoin_attempts = 5
     max_rejoin_attempts = 5
     offset_commit_interval_seconds = 3
     offset_commit_interval_seconds = 3
-    offset_reset_policy = reset_to_latest
+    offset_reset_policy = latest
   }
   }
   topic_mapping = [
   topic_mapping = [
     {
     {