Просмотр исходного кода

fix(emqx_bridge_kafka): match example in api schema

Stefan Strigler 2 лет назад
Родитель
Сommit
d2901afd1b
1 измененных файлов с 42 добавлено и 41 удалено
  1. 42 41
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl

+ 42 - 41
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl

@@ -112,16 +112,15 @@ values({put, connector}) ->
 values({put, KafkaType}) ->
     maps:merge(values(common_config), values(KafkaType));
 values(bridge_v2_producer) ->
-    maps:merge(
-        #{
-            enable => true,
-            connector => <<"my_kafka_producer_connector">>,
-            resource_opts => #{
-                health_check_interval => "32s"
-            }
-        },
-        values(producer)
-    );
+    #{
+        enable => true,
+        connector => <<"my_kafka_producer_connector">>,
+        parameters => values(producer_values),
+        local_topic => <<"mqtt/local/topic">>,
+        resource_opts => #{
+            health_check_interval => "32s"
+        }
+    };
 values(common_config) ->
     #{
         authentication => #{
@@ -143,39 +142,41 @@ values(common_config) ->
     };
 values(producer) ->
     #{
-        kafka => #{
-            topic => <<"kafka-topic">>,
-            message => #{
-                key => <<"${.clientid}">>,
-                value => <<"${.}">>,
-                timestamp => <<"${.timestamp}">>
+        kafka => values(producer_values),
+        local_topic => <<"mqtt/local/topic">>
+    };
+values(producer_values) ->
+    #{
+        topic => <<"kafka-topic">>,
+        message => #{
+            key => <<"${.clientid}">>,
+            value => <<"${.}">>,
+            timestamp => <<"${.timestamp}">>
+        },
+        max_batch_bytes => <<"896KB">>,
+        compression => <<"no_compression">>,
+        partition_strategy => <<"random">>,
+        required_acks => <<"all_isr">>,
+        partition_count_refresh_interval => <<"60s">>,
+        kafka_headers => <<"${pub_props}">>,
+        kafka_ext_headers => [
+            #{
+                kafka_ext_header_key => <<"clientid">>,
+                kafka_ext_header_value => <<"${clientid}">>
             },
-            max_batch_bytes => <<"896KB">>,
-            compression => <<"no_compression">>,
-            partition_strategy => <<"random">>,
-            required_acks => <<"all_isr">>,
-            partition_count_refresh_interval => <<"60s">>,
-            kafka_headers => <<"${pub_props}">>,
-            kafka_ext_headers => [
-                #{
-                    kafka_ext_header_key => <<"clientid">>,
-                    kafka_ext_header_value => <<"${clientid}">>
-                },
-                #{
-                    kafka_ext_header_key => <<"topic">>,
-                    kafka_ext_header_value => <<"${topic}">>
-                }
-            ],
-            kafka_header_value_encode_mode => none,
-            max_inflight => 10,
-            buffer => #{
-                mode => <<"hybrid">>,
-                per_partition_limit => <<"2GB">>,
-                segment_bytes => <<"100MB">>,
-                memory_overload_protection => true
+            #{
+                kafka_ext_header_key => <<"topic">>,
+                kafka_ext_header_value => <<"${topic}">>
             }
-        },
-        local_topic => <<"mqtt/local/topic">>
+        ],
+        kafka_header_value_encode_mode => none,
+        max_inflight => 10,
+        buffer => #{
+            mode => <<"hybrid">>,
+            per_partition_limit => <<"2GB">>,
+            segment_bytes => <<"100MB">>,
+            memory_overload_protection => true
+        }
     };
 values(consumer) ->
     #{