Просмотр исходного кода

docs(kafka): improve kafka consumer/producer API examples

Thales Macedo Garitezi 3 лет назад
Родитель
Сommit
37a35b22df
1 измененных файлов с 72 добавлено и 8 удалено
  1. 72 8
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl

+ 72 - 8
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl

@@ -43,25 +43,89 @@ conn_bridge_examples(Method) ->
             %% for backwards compatibility.
             <<"kafka">> => #{
                 summary => <<"Kafka Producer Bridge">>,
-                value => values(Method)
+                value => values({Method, producer})
             }
         },
         #{
             <<"kafka_consumer">> => #{
                 summary => <<"Kafka Consumer Bridge">>,
-                value => values(Method)
+                value => values({Method, consumer})
             }
         }
     ].
 
-values(get) ->
-    maps:merge(values(post), ?METRICS_EXAMPLE);
-values(post) ->
+values({get, KafkaType}) ->
+    maps:merge(values({post, KafkaType}), ?METRICS_EXAMPLE);
+values({post, KafkaType}) ->
+    maps:merge(values(common_config), values(KafkaType));
+values({put, KafkaType}) ->
+    values({post, KafkaType});
+values(common_config) ->
     #{
-        bootstrap_hosts => <<"localhost:9092">>
+        authentication => #{
+            mechanism => <<"plain">>,
+            username => <<"username">>,
+            password => <<"password">>
+        },
+        bootstrap_hosts => <<"localhost:9092">>,
+        connect_timeout => <<"5s">>,
+        enable => true,
+        metadata_request_timeout => <<"4s">>,
+        min_metadata_refresh_interval => <<"3s">>,
+        socket_opts => #{
+            sndbuf => <<"1024KB">>,
+            recbuf => <<"1024KB">>,
+            nodelay => true
+        }
     };
-values(put) ->
-    values(post).
+values(producer) ->
+    #{
+        kafka => #{
+            topic => <<"kafka-topic">>,
+            message => #{
+                key => <<"${.clientid}">>,
+                value => <<"${.}">>,
+                timestamp => <<"${.timestamp}">>
+            },
+            max_batch_bytes => <<"896KB">>,
+            compression => <<"no_compression">>,
+            partition_strategy => <<"random">>,
+            required_acks => <<"all_isr">>,
+            partition_count_refresh_interval => <<"60s">>,
+            max_inflight => 10,
+            buffer => #{
+                mode => <<"hybrid">>,
+                per_partition_limit => <<"2GB">>,
+                segment_bytes => <<"100MB">>,
+                memory_overload_protection => true
+            }
+        },
+        local_topic => <<"mqtt/local/topic">>
+    };
+values(consumer) ->
+    #{
+        kafka => #{
+            max_batch_bytes => <<"896KB">>,
+            offset_reset_policy => <<"reset_to_latest">>,
+            offset_commit_interval_seconds => 5
+        },
+        key_encoding_mode => <<"force_utf8">>,
+        topic_mapping => [
+            #{
+                kafka_topic => <<"kafka-topic-1">>,
+                mqtt_topic => <<"mqtt/topic/1">>,
+                qos => 1,
+                payload_template => <<"${.}">>
+            },
+            #{
+                kafka_topic => <<"kafka-topic-2">>,
+                mqtt_topic => <<"mqtt/topic/2">>,
+                qos => 2,
+                payload_template => <<"v = ${.value}">>
+            }
+        ],
+        value_encoding_mode => <<"force_utf8">>
+    }.
 
 %% -------------------------------------------------------------------------------------------------
 %% Hocon Schema Definitions