Selaa lähdekoodia

- 功能说明/问题现象: 新增是否对publish消息进行base64编码后再kafka转发的配置项publish_base64
- 代码提交人: ultrakid
- 自测报告:
- 软件开发号:
- 来源项目编号/版本简称:
- 问题分类:
- 方案/解决办法:
- 有无兼容性问题:
- 有无配合更新模块:
- 需求路径:
- BUG路径:
- 设计文档路径:
- 建议:
- 代码审核人:

Ultrakid 3 vuotta sitten
vanhempi
commit
c3010f74cb
2 muutettua tiedostoa jossa 20 lisäystä ja 15 poistoa
  1. 2 1
      etc/emqx_plugin_kafka.config
  2. 18 14
      src/emqx_plugin_kafka.erl

+ 2 - 1
etc/emqx_plugin_kafka.config

@@ -5,6 +5,7 @@
             {reconnect_cool_down_seconds, 10},          %% socket error recovery
             {query_api_versions, true}                  %% Kafka 0.9.x or Earlier请设置为false
         ]},
-        {topic, <<"emqx-topic">>}                           %% 转发的目标topic
+        {topic, <<"emqx-topic">>},                           %% 转发的目标topic
+        {publish_base64, false}                         %% 对于publish到mqtt的消息内容,是否进行base64编码之后再转发(为了兼容发送字节流的情况)
     ]}
 ].

+ 18 - 14
src/emqx_plugin_kafka.erl

@@ -188,7 +188,7 @@ on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
   ?LOG_INFO("[KAFKA PLUGIN]Message delivered to client(~s): ~s~n",
     [ClientId, emqx_message:format(Message)]),
   Topic = Message#message.topic,
-  Payload = Message#message.payload,
+  Payload = transform_payload(Message#message.payload),
   Qos = Message#message.qos,
   From = Message#message.from,
   Timestamp = Message#message.timestamp,
@@ -209,7 +209,7 @@ on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
   ?LOG_INFO("[KAFKA PLUGIN]Message acked by client(~s): ~s~n",
     [ClientId, emqx_message:format(Message)]),
   Topic = Message#message.topic,
-  Payload = Message#message.payload,
+  Payload = transform_payload(Message#message.payload),
   Qos = Message#message.qos,
   From = Message#message.from,
   Timestamp = Message#message.timestamp,
@@ -278,28 +278,32 @@ get_kafka_topic() ->
   {ok, Topic} = application:get_env(emqx_plugin_kafka, topic),
   Topic.
 
+need_base64() ->
+  {ok, NeedBase64} = application:get_env(emqx_plugin_kafka, publish_base64),
+  NeedBase64.
+
+transform_payload(Payload) ->
+  NeedBase64 = need_base64(),
+  if
+    NeedBase64 == true ->
+      Content = list_to_binary(base64:encode_to_string(Payload));
+    NeedBase64 == false ->
+      Content = Payload
+  end,
+  Content.
+
 
 format_payload(Message) ->
   Username = emqx_message:get_header(username, Message),
   Topic = Message#message.topic,
-  Tail = string:right(binary_to_list(Topic), 4),
-  RawType = string:equal(Tail, <<"_raw">>),
   % ?LOG_INFO("[KAFKA PLUGIN]Tail= ~s , RawType= ~s~n",[Tail,RawType]),
   ClientId = Message#message.from,
-  MsgPayload = Message#message.payload,
-  % ?LOG_INFO("[KAFKA PLUGIN]MsgPayload : ~s~n", [MsgPayload]),
-  if
-    RawType == true ->
-      MsgPayload64 = list_to_binary(base64:encode_to_string(MsgPayload));
-  % ?LOG_INFO("[KAFKA PLUGIN]MsgPayload64 : ~s~n", [MsgPayload64]);
-    RawType == false ->
-      MsgPayload64 = MsgPayload
-  end,
+  Content = transform_payload(Message#message.payload),
   Payload = [{action, message_publish},
     {device_id, ClientId},
     {username, Username},
     {topic, Topic},
-    {payload, MsgPayload64},
+    {payload, Content},
     {ts, Message#message.timestamp}],
 
   {ok, ClientId, Payload}.