Bladeren bron

Add support payload base64 fucntion

kennyh 2 weken geleden
bovenliggende
commit
c6a1867ba9
2 gewijzigde bestanden met toevoegingen van 82 en 28 verwijderingen
  1. 56 22
      priv/emqx_plugin_kafka.hocon
  2. 26 6
      src/emqx_plugin_kafka_producer.erl

+ 56 - 22
priv/emqx_plugin_kafka.hocon

@@ -1,32 +1,66 @@
 plugin_kafka {
   connection {
-    bootstrap_hosts = ["10.3.64.223:9192", "10.3.64.223:9292", "10.3.64.223:9392"]
+    bootstrap_hosts = ["192.168.2.207:9092"]
+    // optional   default:5s
+    connect_timeout = 5s
+    // enum: per_partition | per_broker
+    // optional   default:per_partition
+    connection_strategy = per_partition
+    // optional   default:5s
+    min_metadata_refresh_interval = 5s
+    // optional   default:true
+    query_api_versions = true
+    // optional   default:3s
+    request_timeout = 3s
     sasl {
       mechanism = plain
-      username = "username"
-      password = "password"
+      username = "admin"
+      password = "Welamp@2025"
     }
+    ssl {
+      enable = false
+    }
+    health_check_interval = 32s
+  }
+
+  // optional
+  producer {
+    // Most number of bytes to collect into a produce request.
+    // optional   default:896KB
+    max_batch_bytes = 896KB
+    // enum:  no_compression | snappy | gzip
+    // optional   default:no_compression
+    compression = no_compression
+    // enum:  random | roundrobin | first_key_dispatch
+    // optional   default:random
+    partition_strategy = random
+
+    // Encode kafka value.
+    // enum:  plain | base64
+    // optional   default:plain
+    encode_payload_type = base64
   }
 
   hooks = [
-    {endpoint = client.connect}
-    , {endpoint = client.connack}
-    , {endpoint = client.connected}
-    , {endpoint = client.disconnected}
-    , {endpoint = client.authenticate}
-    , {endpoint = client.authorize}
-    , {endpoint = client.authenticate}
-    , {endpoint = client.check_authz_complete}
-    , {endpoint = session.created}
-    , {endpoint = session.subscribed}
-    , {endpoint = session.unsubscribed}
-    , {endpoint = session.resumed}
-    , {endpoint = session.discarded}
-    , {endpoint = session.takenover}
-    , {endpoint = session.terminated}
-    , {endpoint = message.publish, filter = "test/#"}
-    , {endpoint = message.delivered, filter = "test/#"}
-    , {endpoint = message.acked, filter = "test/#"}
-    , {endpoint = message.dropped, filter = "test/#"}
+    {
+      // Hook point.
+      // required
+      endpoint = message.publish
+      // Emqx topic pattern.
+      // 1. Cannot match the system message;
+      // 2. Cannot use filters that start with '+' or '#'.
+      // message required
+      filter = "/Lamp/DataReport/#"
+      // Kafka topic, must be created in advance in Kafka.
+      // required
+      kafka_topic = log_info
+      // Matching template, value = ${.} indicates that all keys match
+      // optional default:{timestamp = "${.timestamp}", value = "${.}",key = "${.clientid}"}
+      kafka_message = {
+        timestamp = "${.timestamp}"
+        value = "${.}"
+        key = "${.clientid}"
+      }
+    }
   ]
 }

+ 26 - 6
src/emqx_plugin_kafka_producer.erl

@@ -272,11 +272,29 @@ render_message(
     Message,
     EncodePayloadType
 ) ->
-    #{
-        key => render(KeyTemplate, Message),
-        value => encode_payload(EncodePayloadType, render(ValueTemplate, Message)),
-        ts => render_timestamp(TimestampTemplate, Message)
-    }.
+    % 如果配置为base64编码,则只对payload字段进行编码
+    case EncodePayloadType of
+        base64 ->
+            % 提取原始消息中的payload字段
+            Payload = maps:get(payload, Message, <<"">>),
+            % 对payload进行base64编码
+            EncodedPayload = base64:encode(Payload),
+            % 创建新的消息结构,将编码后的payload替换回原消息
+            MessageWithEncodedPayload = Message#{payload => EncodedPayload},
+            % 使用修改后的消息渲染模板
+            #{
+                key => render(KeyTemplate, MessageWithEncodedPayload),
+                value => render(ValueTemplate, MessageWithEncodedPayload),
+                ts => render_timestamp(TimestampTemplate, MessageWithEncodedPayload)
+            };
+        _ ->
+            % 其他编码类型保持原逻辑
+            #{
+                key => render(KeyTemplate, Message),
+                value => encode_payload(EncodePayloadType, render(ValueTemplate, Message)),
+                ts => render_timestamp(TimestampTemplate, Message)
+            }
+    end.
 
 render(Template, Message) ->
     Opts = #{
@@ -297,7 +315,9 @@ render_timestamp(Template, Message) ->
     end.
 
 encode_payload(base64, Payload) ->
-    base64:encode(Payload);
+    % 这个函数现在只在非base64编码类型时被调用
+    % 对于base64编码,我们已经在render_message中处理了payload字段
+    Payload;
 encode_payload(_, Payload) ->
     Payload.