Просмотр исходного кода

feat: tie `offset_reset_policy` and `begin_offset` together

To make the configuration more intuitive and avoid exposing more
parameters to the user, we should:

1) Remove reset_by_subscriber as an enum constructor for
`offset_reset_policy`, as that might make the consumer hang
indefinitely without manual action.

2) Set the `begin_offset` `brod_consumer` parameter to `earliest` or
`latest` depending on the value of `offset_reset_policy`, as that’s
probably the user’s intention.
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
5cf09209cd

+ 3 - 4
lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf

@@ -600,12 +600,11 @@ emqx_ee_bridge_kafka {
     consumer_offset_reset_policy {
         desc {
             en: "Defines how the consumers should reset the start offset when "
-                "a topic partition has an invalid offset (i.e. when an `OffsetOutOfRange` occurs)."
-                " Note that this is not the same as the `begin_offset`, which defines where to start"
-                " consumption when there is no offset committed yet."
+                "a topic partition has an invalid offset (i.e. when an `OffsetOutOfRange` occurs) or"
+                " when there is no committed offset for the topic-partition yet."
             zh: "定义当一个主题分区的初始偏移量无效或没有初始偏移量时,"
                 "消费者应如何重置开始偏移量。(即当发生 \"OffsetOutOfRange\" 时)。"
-                " 请注意,这与`begin_offset'不同,后者定义了在还没有提交偏移量的情况下从哪里开始消费。"
+                "  或者当主题分区还没有承诺的偏移量时。"
         }
         label {
             en: "Offset Reset Policy"

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src

@@ -1,6 +1,6 @@
 {application, emqx_ee_bridge, [
     {description, "EMQX Enterprise data bridges"},
-    {vsn, "0.1.7"},
+    {vsn, "0.1.8"},
     {registered, [emqx_ee_bridge_kafka_consumer_sup]},
     {applications, [
         kernel,

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl

@@ -370,7 +370,7 @@ fields(consumer_kafka_opts) ->
             })},
         {offset_reset_policy,
             mk(
-                enum([reset_to_latest, reset_to_earliest, reset_by_subscriber]),
+                enum([reset_to_latest, reset_to_earliest]),
                 #{default => reset_to_latest, desc => ?DESC(consumer_offset_reset_policy)}
             )},
         {offset_commit_interval_seconds,

+ 6 - 0
lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl

@@ -290,7 +290,13 @@ start_consumer(Config, InstanceId, ClientID) ->
     %% cluster, so that the load gets distributed between all
     %% consumers and we don't repeat messages in the same cluster.
     GroupID = consumer_group_id(BridgeName),
+    BeginOffset =
+        case OffsetResetPolicy of
+            reset_to_latest -> latest;
+            reset_to_earliest -> earliest
+        end,
     ConsumerConfig = [
+        {begin_offset, BeginOffset},
         {max_bytes, MaxBatchBytes},
         {offset_reset_policy, OffsetResetPolicy}
     ],

+ 55 - 0
lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl

@@ -53,6 +53,7 @@ sasl_only_tests() ->
 %% tests that do not need to be run on all groups
 only_once_tests() ->
     [
+        t_begin_offset_earliest,
         t_bridge_rule_action_source,
         t_cluster_group,
         t_node_joins_existing_cluster,
@@ -1915,3 +1916,57 @@ t_cluster_node_down(Config) ->
         end
     ),
     ok.
+
+t_begin_offset_earliest(Config) ->
+    MQTTTopic = ?config(mqtt_topic, Config),
+    ResourceId = resource_id(Config),
+    Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
+    {ok, C} = emqtt:start_link([{proto_ver, v5}]),
+    on_exit(fun() -> emqtt:stop(C) end),
+    {ok, _} = emqtt:connect(C),
+    {ok, _, [2]} = emqtt:subscribe(C, MQTTTopic, 2),
+
+    ?check_trace(
+        begin
+            %% publish a message before the bridge is started.
+            NumMessages = 5,
+            lists:foreach(
+                fun(N) ->
+                    publish(Config, [
+                        #{
+                            key => <<"mykey", (integer_to_binary(N))/binary>>,
+                            value => Payload,
+                            headers => [{<<"hkey">>, <<"hvalue">>}]
+                        }
+                    ])
+                end,
+                lists:seq(1, NumMessages)
+            ),
+
+            {ok, _} = create_bridge(Config, #{
+                <<"kafka">> => #{<<"offset_reset_policy">> => <<"reset_to_earliest">>}
+            }),
+
+            #{num_published => NumMessages}
+        end,
+        fun(Res, _Trace) ->
+            #{num_published := NumMessages} = Res,
+            %% we should receive messages published before starting
+            %% the consumers
+            Published = receive_published(#{n => NumMessages}),
+            Payloads = lists:map(
+                fun(#{payload := P}) -> emqx_json:decode(P, [return_maps]) end,
+                Published
+            ),
+            ?assert(
+                lists:all(
+                    fun(#{<<"value">> := V}) -> V =:= Payload end,
+                    Payloads
+                ),
+                #{payloads => Payloads}
+            ),
+            ?assertEqual(NumMessages, emqx_resource_metrics:received_get(ResourceId)),
+            ok
+        end
+    ),
+    ok.