Ver código fonte

统一kafka转发逻辑

Ultrakid 3 anos atrás
pai
commit
18220a776e
1 arquivos alterados com 9 adições e 10 exclusões
  1. 9 10
      src/emqx_plugin_kafka.erl

+ 9 - 10
src/emqx_plugin_kafka.erl

@@ -192,8 +192,8 @@ on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
   Qos = Message#message.qos,
   From = Message#message.from,
   Timestamp = Message#message.timestamp,
-  Json = jsx:encode([
-    {type, <<"deliver messqge">>},
+  Content = [
+    {action, <<"message_delivered">>},
     {from, From},
     {to, ClientId},
     {topic, Topic},
@@ -201,9 +201,8 @@ on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
     {qos, Qos},
     {cluster_node, node()},
     {ts, Timestamp}
-  ]),
-  EkafTopic = ekaf_get_topic(),
-  ekaf:produce_async_batched(EkafTopic, Json),
+  ],
+  produce_kafka_payload(Content),
   ok.
 
 on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
@@ -214,8 +213,8 @@ on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
   Qos = Message#message.qos,
   From = Message#message.from,
   Timestamp = Message#message.timestamp,
-  Json = jsx:encode([
-    {type, <<"acked messqge">>},
+  Content = [
+    {action, <<"message_acked">>},
     {from, From},
     {to, ClientId},
     {topic, Topic},
@@ -223,9 +222,9 @@ on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
     {qos, Qos},
     {cluster_node, node()},
     {ts, Timestamp}
-  ]),
-  EkafTopic = ekaf_get_topic(),
-  ekaf:produce_async_batched(EkafTopic, Json).
+  ],
+  produce_kafka_payload(Content),
+  ok.
 
 %%--------------------------------------------------------------------
 %% Session Lifecircle Hooks