plugin_kafka { connection { client_id = "kafka_client" bootstrap_hosts = ["10.3.64.223:9192", "10.3.64.223:9292", "10.3.64.223:9392"] connect_timeout = 5s connection_strategy = per_partition min_metadata_refresh_interval = 5s query_api_versions = true request_timeout = 3s sasl { mechanism = plain username = "username" password = "password" } ssl { enable = false } health_check_interval = 32s } producer { max_batch_bytes = 896KB compression = no_compression partition_strategy = random encode_payload_type = plain } hooks = [ { endpoint = message.publish filter = "test/#" kafka_topic = emqx_msg_publish kafka_message = { timestamp = "${.timestamp}" value = "${.payload}" key = "${.clientid}" } } , { endpoint = client.connected kafka_topic = emqx_client_connected kafka_message = { timestamp = "${.timestamp}" value = "${.}" key = "${.proto_name}" } } , { endpoint = client.disconnected kafka_topic = emqx_client_disconnected kafka_message = { timestamp = "${.timestamp}" value = "${.reason}" key = "${.clientid}" } } ] }