kennyh 2 rokov pred
rodič
commit
10e4f3b12c
2 zmenil súbory, kde vykonal 41 pridanie a 15 odobranie
  1. 3 2
      etc/emqx_plugin_kafka.config
  2. 38 13
      src/emqx_plugin_kafka.erl

+ 3 - 2
etc/emqx_plugin_kafka.config

@@ -1,11 +1,12 @@
 [
     {emqx_plugin_kafka, [
-        {kafka_address_list, [{"localhost", 9092}]},    %% kafka地址,可配置多个
+        {kafka_address_list, [{"120.24.144.183", 9092}]},    %% kafka地址,可配置多个
         {kafka_config, [                                %% 这部分是https://github.com/kafka4beam/brod库需要的配置
             {reconnect_cool_down_seconds, 10},          %% socket error recovery
             {query_api_versions, true}                  %% Kafka 0.9.x or Earlier请设置为false
         ]},
-        {topic, <<"emqx-topic">>},                           %% 转发的目标topic
+        {topicList, ["/Lamp/Login/","/Lamp/DataReport/"]},
+        {topic, <<"Lamp_DataReport">>},                           %% 转发的目标topic
         {publish_base64, false}                         %% 对于publish到mqtt的消息内容,是否进行base64编码之后再转发(为了兼容发送字节流的情况)
     ]}
 ].

+ 38 - 13
src/emqx_plugin_kafka.erl

@@ -150,7 +150,7 @@ on_client_subscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
     {qos, maps:get(qos, Qos)},
     {ts, Now}
   ],
-  produce_kafka_payload(ClientId, Payload),
+  %produce_kafka_payload(ClientId, Payload),
   ok.
 %%---------------------client subscribe stop----------------------%%
 on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
@@ -164,7 +164,7 @@ on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env)
     {topic, Topic},
     {ts, Now}
   ],
-  produce_kafka_payload(ClientId, Payload),
+  %produce_kafka_payload(ClientId, Payload),
   ok.
 
 on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason, _Env) ->
@@ -179,8 +179,7 @@ on_message_dropped(Message, _By = #{node := Node}, Reason, _Env) ->
 on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
   ok;
 on_message_publish(Message, _Env) ->
-  {ok, ClientId, Payload} = format_payload(Message),
-  produce_kafka_payload(ClientId, Payload),
+  push_data_to_msg_queu(Message),
   ok.
 %%---------------------message publish stop----------------------%%
 
@@ -202,7 +201,7 @@ on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
     {cluster_node, node()},
     {ts, Timestamp}
   ],
-  produce_kafka_payload(ClientId, Content),
+  %produce_kafka_payload(ClientId, Content),
   ok.
 
 on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
@@ -223,7 +222,7 @@ on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
     {cluster_node, node()},
     {ts, Timestamp}
   ],
-  produce_kafka_payload(ClientId, Content),
+  %produce_kafka_payload(ClientId, Content),
   ok.
 
 %%--------------------------------------------------------------------
@@ -268,6 +267,8 @@ kafka_init(_Env) ->
   ?LOG_INFO("[KAFKA PLUGIN]KafkaConfig = ~p~n", [KafkaConfig]),
   {ok, KafkaTopic} = application:get_env(emqx_plugin_kafka, topic),
   ?LOG_INFO("[KAFKA PLUGIN]KafkaTopic = ~s~n", [KafkaTopic]),
+  {ok, KafkaTopicList} = application:get_env(emqx_plugin_kafka, topicList),
+  ?LOG_INFO("[KAFKA PLUGIN]KafkaTopic = ~s~n", [KafkaTopicList]),
   {ok, _} = application:ensure_all_started(brod),
   ok = brod:start_client(AddressList, emqx_repost_worker, KafkaConfig),
   ok = brod:start_producer(emqx_repost_worker, KafkaTopic, []),
@@ -278,6 +279,10 @@ get_kafka_topic() ->
   {ok, Topic} = application:get_env(emqx_plugin_kafka, topic),
   Topic.
 
+get_kafka_topiclist() ->
+    {ok, TopicList} = application:get_env(emqx_plugin_kafka, topicList),
+    TopicList.
+
 need_base64() ->
   {ok, NeedBase64} = application:get_env(emqx_plugin_kafka, publish_base64),
   NeedBase64.
@@ -292,11 +297,10 @@ transform_payload(Payload) ->
   end,
   Content.
 
-
-format_payload(Message) ->
+push_data_to_msg_queu(Message) ->
   Username = emqx_message:get_header(username, Message),
   Topic = Message#message.topic,
-  % ?LOG_INFO("[KAFKA PLUGIN]Tail= ~s , RawType= ~s~n",[Tail,RawType]),
+  ?LOG_INFO("[KAFKA PLUGIN] Topic= ~s ~n",[Topic]),
   ClientId = Message#message.from,
   Content = transform_payload(Message#message.payload),
   Payload = [{action, message_publish},
@@ -305,9 +309,30 @@ format_payload(Message) ->
     {topic, Topic},
     {payload, Content},
     {ts, Message#message.timestamp}],
-
-  {ok, ClientId, Payload}.
-
+  %%case string:str(Topic, "/Lamp/DataReport/") of
+  %%  0 ->
+  %%      case string:str(Topic, "/Lamp/Login/") of
+  %%          0 ->
+  %%              ?LOG_INFO("[KAFKA PLUGIN] nomatch Topic= ~s ~n",[Topic]),
+  %%              none;
+  %%          _ ->
+  %%              produce_kafka_payload(ClientId, Payload, <<"Lamp_Login">>)
+  %%      end;
+  %%  _ ->
+  %%    produce_kafka_payload(ClientId, Payload, <<"Lamp_DataReport">>)
+  %%end.
+  case binary:match(Topic, <<"/Lamp/DataReport/">>) of
+    nomatch ->
+      case binary:match(Topic, <<"/Lamp/Login/">>) of
+        nomatch ->
+          %%?LOG_INFO("[KAFKA PLUGIN] nomatch Topic= ~s ~n",[Topic]),
+          none;
+        _ ->
+          produce_kafka_payload(ClientId, Payload)
+      end;
+    _ ->
+      produce_kafka_payload(ClientId, Payload)
+  end.
 
 %% Called when the plugin application stop
 unload() ->
@@ -334,7 +359,7 @@ unload() ->
 produce_kafka_payload(Key, Message) ->
   Topic = get_kafka_topic(),
   {ok, MessageBody} = emqx_json:safe_encode(Message),
-  % ?LOG_INFO("[KAFKA PLUGIN]Message = ~s~n",[MessageBody]),
+  ?LOG_INFO("[KAFKA PLUGIN]Message = ~s~n",[MessageBody]),
   Payload = iolist_to_binary(MessageBody),
   brod:produce_cb(emqx_repost_worker, Topic, hash, Key, Payload, fun(_,_) -> ok end),
   ok.