plugin_kafka { connection { bootstrap_hosts = ["10.3.64.223:9192"] sasl { mechanism = plain username = "username" password = "password" } } producer { encode_payload_type = base64 } hooks = [ {endpoint = client.connect} , { endpoint = message.publish filter = "test/#" kafka_topic = emqx_msg_publish kafka_message = { timestamp = "${.timestamp}" value = "{topic:${.topic},payload:${.payload},qos:${.qos}}" key = "${.clientid}" } } ] }