Jelajahi Sumber

Merge pull request #11513 from thalesmg/kafka-fix-ts-template-r52-20230824

fix: use correct timestamp template for kafka and remove such fields from azure event hub producer
Thales Macedo Garitezi 2 tahun lalu
induk
melakukan
d9a5a9ea2b

+ 1 - 1
apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_azure_event_hub, [
     {description, "EMQX Enterprise Azure Event Hub Bridge"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {applications, [
         kernel,

+ 6 - 0
apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl

@@ -64,6 +64,10 @@ fields(producer_kafka_opts) ->
         kafka_producer_overrides()
     ),
     override_documentations(Fields);
+fields(kafka_message) ->
+    Fields0 = emqx_bridge_kafka:fields(kafka_message),
+    Fields = proplists:delete(timestamp, Fields0),
+    override_documentations(Fields);
 fields(Method) ->
     Fields = emqx_bridge_kafka:fields(Method),
     override_documentations(Fields).
@@ -85,6 +89,7 @@ desc(Name) ->
 struct_names() ->
     [
         auth_username_password,
+        kafka_message,
         producer_kafka_opts
     ].
 
@@ -245,6 +250,7 @@ kafka_producer_overrides() ->
                 default => no_compression,
                 importance => ?IMPORTANCE_HIDDEN
             }),
+        message => mk(ref(kafka_message), #{}),
         required_acks => mk(enum([all_isr, leader_only]), #{default => all_isr})
     }.
 

+ 0 - 5
apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl

@@ -65,10 +65,6 @@ init_per_suite(Config) ->
     end.
 
 end_per_suite(Config) ->
-    %% emqx_mgmt_api_test_util:end_suite(),
-    %% ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
-    %% ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
-    %% _ = application:stop(emqx_connector),
     Apps = ?config(tc_apps, Config),
     emqx_cth_suite:stop(Apps),
     ok.
@@ -145,7 +141,6 @@ bridge_config(TestCase, Config) ->
                     <<"message">> =>
                         #{
                             <<"key">> => <<"${.clientid}">>,
-                            <<"timestamp">> => <<"${.timestamp}">>,
                             <<"value">> => <<"${.}">>
                         },
                     <<"partition_count_refresh_interval">> => <<"60s">>,

+ 0 - 1
apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_tests.erl

@@ -33,7 +33,6 @@ bridges.azure_event_hub_producer.my_producer {
     max_inflight = 10
     message {
       key = \"${.clientid}\"
-      timestamp = \"${.timestamp}\"
       value = \"${.}\"
     }
     partition_count_refresh_interval = 60s

+ 1 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -286,7 +286,7 @@ on_query_async(
 compile_message_template(T) ->
     KeyTemplate = maps:get(key, T, <<"${.clientid}">>),
     ValueTemplate = maps:get(value, T, <<"${.}">>),
-    TimestampTemplate = maps:get(value, T, <<"${.timestamp}">>),
+    TimestampTemplate = maps:get(timestamp, T, <<"${.timestamp}">>),
     #{
         key => preproc_tmpl(KeyTemplate),
         value => preproc_tmpl(ValueTemplate),

+ 45 - 1
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl

@@ -470,7 +470,51 @@ t_failed_creation_then_fix(Config) ->
     delete_all_bridges(),
     ok.
 
-t_table_removed(_Config) ->
+t_custom_timestamp(_Config) ->
+    HostsString = kafka_hosts_string_sasl(),
+    AuthSettings = valid_sasl_plain_settings(),
+    Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
+    Type = ?BRIDGE_TYPE,
+    Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
+    ResourceId = emqx_bridge_resource:resource_id(Type, Name),
+    KafkaTopic = "test-topic-one-partition",
+    MQTTTopic = <<"t/local/kafka">>,
+    emqx:subscribe(MQTTTopic),
+    Conf0 = config(#{
+        "authentication" => AuthSettings,
+        "kafka_hosts_string" => HostsString,
+        "local_topic" => MQTTTopic,
+        "kafka_topic" => KafkaTopic,
+        "instance_id" => ResourceId,
+        "ssl" => #{}
+    }),
+    Conf = emqx_utils_maps:deep_put(
+        [<<"kafka">>, <<"message">>, <<"timestamp">>],
+        Conf0,
+        <<"123">>
+    ),
+    {ok, _} = emqx_bridge:create(Type, erlang:list_to_atom(Name), Conf),
+    {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
+    ct:pal("base offset before testing ~p", [Offset]),
+    Time = erlang:unique_integer(),
+    BinTime = integer_to_binary(Time),
+    Msg = #{
+        clientid => BinTime,
+        payload => <<"payload">>,
+        timestamp => Time
+    },
+    emqx:publish(emqx_message:make(MQTTTopic, emqx_utils_json:encode(Msg))),
+    {ok, {_, [KafkaMsg]}} =
+        ?retry(
+            _Interval = 500,
+            _NAttempts = 20,
+            {ok, {_, [_]}} = brod:fetch(kafka_hosts(), KafkaTopic, _Partition = 0, Offset)
+        ),
+    ?assertMatch(#kafka_message{ts = 123, ts_type = create}, KafkaMsg),
+    delete_all_bridges(),
+    ok.
+
+t_nonexistent_topic(_Config) ->
     HostsString = kafka_hosts_string_sasl(),
     AuthSettings = valid_sasl_plain_settings(),
     Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),

+ 1 - 0
changes/ee/fix-11513.en.md

@@ -0,0 +1 @@
+Fixed a bug which prevented the Kafka Producer bridge from using the correct template for the `timestamp` field.