| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- plugin_kafka {
- connection {
- client_id = "kafka_client"
- bootstrap_hosts = ["10.3.64.223:9192", "10.3.64.223:9292", "10.3.64.223:9392"]
- connect_timeout = 5s
- connection_strategy = per_partition
- min_metadata_refresh_interval = 5s
- query_api_versions = true
- request_timeout = 3s
- sasl {
- mechanism = plain
- username = "username"
- password = "password"
- }
- ssl {
- enable = false
- }
- health_check_interval = 32s
- }
- producer {
- max_batch_bytes = 896KB
- compression = no_compression
- partition_strategy = random
- encode_payload_type = plain
- }
- hooks = [
- {
- endpoint = message.publish
- filter = "test/#"
- kafka_topic = emqx_msg_publish
- kafka_message = {
- timestamp = "${.timestamp}"
- value = "${.}"
- key = "${.clientid}"
- }
- }
- , {
- endpoint = client.connected
- kafka_topic = emqx_client_connected
- kafka_message = {
- timestamp = "${.timestamp}"
- value = "${.}"
- key = "${.proto_name}"
- }
- }
- , {
- endpoint = client.disconnected
- kafka_topic = emqx_client_disconnected
- kafka_message = {
- timestamp = "${.timestamp}"
- value = "${.reason}"
- key = "${.clientid}"
- }
- }
- ]
- }
|