Pārlūkot izejas kodu

适配emqx v4.3版本,解决topic发布消息时无法桥接到kafka的问题

Ultrakid 3 gadi atpakaļ
vecāks
revīzija
1fb47896e2
1 mainītis faili ar 51 papildinājumiem un 50 dzēšanām
  1. 51 50
      src/emqx_plugin_kafka.erl

+ 51 - 50
src/emqx_plugin_kafka.erl

@@ -23,6 +23,7 @@
 % -include_lib("emqx/include/emqx.hrl").
 
 -include_lib("emqx.hrl").
+-include_lib("logger.hrl").
 
 -export([load/1, unload/0]).
 
@@ -79,21 +80,21 @@ load(Env) ->
   emqx:hook('message.dropped', {?MODULE, on_message_dropped, [Env]}).
 
 on_client_connect(ConnInfo = #{clientid := ClientId}, Props, _Env) ->
-  %io:format("Client(~s) connect, ConnInfo: ~p, Props: ~p~n",
-  %[ClientId, ConnInfo, Props]),
+  ?INFO("[KAFKA PLUGIN]Client(~s) connect, ConnInfo: ~p, Props: ~p~n",
+    [ClientId, ConnInfo, Props]),
   {ok, Props}.
 
 on_client_connack(ConnInfo = #{clientid := ClientId}, Rc, Props, _Env) ->
-  %io:format("Client(~s) connack, ConnInfo: ~p, Rc: ~p, Props: ~p~n",
-  %[ClientId, ConnInfo, Rc, Props]),
+  ?INFO("[KAFKA PLUGIN]Client(~s) connack, ConnInfo: ~p, Rc: ~p, Props: ~p~n",
+    [ClientId, ConnInfo, Rc, Props]),
   {ok, Props}.
 
 on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, _Env) ->
-  io:format("Client(~s) connected, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
+  ?INFO("[KAFKA PLUGIN]Client(~s) connected, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
     [ClientId, ClientInfo, ConnInfo]),
   {IpAddr, _Port} = maps:get(peername, ConnInfo),
   Action = <<"connected">>,
-  Now = calendar:datetime_to_gregorian_seconds(calendar:universal_time()) - 719528 * 24 * 3600,
+  Now = os:timestamp(),
   Online = 1,
   Payload = [
     {action, Action},
@@ -110,7 +111,7 @@ on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, _Env) ->
   ok.
 
 on_client_disconnected(ClientInfo = #{clientid := ClientId}, ReasonCode, ConnInfo, _Env) ->
-  io:format("Client(~s) disconnected due to ~p, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
+  ?INFO("[KAFKA PLUGIN]Client(~s) disconnected due to ~p, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
     [ClientId, ReasonCode, ClientInfo, ConnInfo]),
   Action = <<"disconnected">>,
   Now = calendar:datetime_to_gregorian_seconds(calendar:universal_time()) - 719528 * 24 * 3600,
@@ -127,21 +128,21 @@ on_client_disconnected(ClientInfo = #{clientid := ClientId}, ReasonCode, ConnInf
   ok.
 
 on_client_authenticate(_ClientInfo = #{clientid := ClientId}, Result, _Env) ->
-  io:format("Client(~s) authenticate, Result:~n~p~n", [ClientId, Result]),
+%%  ?INFO("[KAFKA PLUGIN]Client(~s) authenticate, Result:~n~p~n", [ClientId, Result]),
   {ok, Result}.
 
 on_client_check_acl(_ClientInfo = #{clientid := ClientId}, Topic, PubSub, Result, _Env) ->
-  io:format("Client(~s) check_acl, PubSub:~p, Topic:~p, Result:~p~n",
-    [ClientId, PubSub, Topic, Result]),
+%%  ?INFO("[KAFKA PLUGIN]Client(~s) check_acl, PubSub:~p, Topic:~p, Result:~p~n",
+%%    [ClientId, PubSub, Topic, Result]),
   {ok, Result}.
 
 %%---------------------------client subscribe start--------------------------%%
 on_client_subscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
-  io:format("Client(~s) will subscribe: ~p~n", [ClientId, TopicFilters]),
+  ?INFO("[KAFKA PLUGIN]Client(~s) will subscribe: ~p~n", [ClientId, TopicFilters]),
   Topic = erlang:element(1, erlang:hd(TopicFilters)),
   Qos = erlang:element(2, lists:last(TopicFilters)),
   Action = <<"subscribe">>,
-  Now = calendar:datetime_to_gregorian_seconds(calendar:universal_time()) - 719528 * 24 * 3600,
+  Now = now_mill_secs(os:timestamp()),
   Payload = [
     {device_id, ClientId},
     {action, Action},
@@ -153,10 +154,10 @@ on_client_subscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
   {ok, TopicFilters}.
 %%---------------------client subscribe stop----------------------%%
 on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
-  io:format("Client(~s) will unsubscribe ~p~n", [ClientId, TopicFilters]),
+  ?INFO("[KAFKA PLUGIN]Client(~s) will unsubscribe ~p~n", [ClientId, TopicFilters]),
   Topic = erlang:element(1, erlang:hd(TopicFilters)),
   Action = <<"unsubscribe">>,
-  Now = calendar:datetime_to_gregorian_seconds(calendar:universal_time()) - 719528 * 24 * 3600,
+  Now = now_mill_secs(os:timestamp()),
   Payload = [
     {device_id, ClientId},
     {action, Action},
@@ -169,7 +170,7 @@ on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env)
 on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason, _Env) ->
   ok;
 on_message_dropped(Message, _By = #{node := Node}, Reason, _Env) ->
-  io:format("Message dropped by node ~s due to ~s: ~s~n",
+  ?INFO("[KAFKA PLUGIN]Message dropped by node ~s due to ~s: ~s~n",
     [Node, Reason, emqx_message:format(Message)]).
 
 
@@ -183,7 +184,7 @@ on_message_publish(Message, _Env) ->
 %%---------------------message publish stop----------------------%%
 
 on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
-  io:format("Message delivered to client(~s): ~s~n",
+  ?INFO("[KAFKA PLUGIN]Message delivered to client(~s): ~s~n",
     [ClientId, emqx_message:format(Message)]),
   Topic = Message#message.topic,
   Payload = Message#message.payload,
@@ -198,14 +199,14 @@ on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
     {payload, Payload},
     {qos, Qos},
     {cluster_node, node()},
-    {ts, now_secs(Timestamp)}
+    {ts, Timestamp}
   ]),
   EkafTopic = ekaf_get_topic(),
   ekaf:produce_sync(EkafTopic, Json),
   {ok, Message}.
 
 on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
-  io:format("Message acked by client(~s): ~s~n",
+  ?INFO("[KAFKA PLUGIN]Message acked by client(~s): ~s~n",
     [ClientId, emqx_message:format(Message)]),
   Topic = Message#message.topic,
   Payload = Message#message.payload,
@@ -220,49 +221,49 @@ on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
     {payload, Payload},
     {qos, Qos},
     {cluster_node, node()},
-    {ts, now_secs(Timestamp)}
+    {ts, Timestamp}
   ]),
   EkafTopic = ekaf_get_topic(),
-  ekaf:produce_sync(EkafTopic, Json).
+  ekaf:produce_async_batched(EkafTopic, Json).
 
 %%--------------------------------------------------------------------
 %% Session Lifecircle Hooks
 %%--------------------------------------------------------------------
 
 on_session_created(#{clientid := ClientId}, SessInfo, _Env) ->
-  %io:format("Session(~s) created, Session Info:~n~p~n", [ClientId, SessInfo]).
+  %?INFO("[KAFKA PLUGIN]Session(~s) created, Session Info:~n~p~n", [ClientId, SessInfo]).
   ok.
 
 
 on_session_subscribed(#{clientid := ClientId}, Topic, SubOpts, _Env) ->
-  io:format("Session(~s) subscribed ~s with subopts: ~p~n", [ClientId, Topic, SubOpts]).
+  ?INFO("[KAFKA PLUGIN]Session(~s) subscribed ~s with subopts: ~p~n", [ClientId, Topic, SubOpts]).
 
 on_session_unsubscribed(#{clientid := ClientId}, Topic, Opts, _Env) ->
-  io:format("Session(~s) unsubscribed ~s with opts: ~p~n", [ClientId, Topic, Opts]).
+  ?INFO("[KAFKA PLUGIN]Session(~s) unsubscribed ~s with opts: ~p~n", [ClientId, Topic, Opts]).
 
 on_session_resumed(#{clientid := ClientId}, SessInfo, _Env) ->
-  io:format("Session(~s) resumed, Session Info:~n~p~n", [ClientId, SessInfo]).
+  ?INFO("[KAFKA PLUGIN]Session(~s) resumed, Session Info:~n~p~n", [ClientId, SessInfo]).
 
 on_session_discarded(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) ->
-  io:format("Session(~s) is discarded. Session Info: ~p~n", [ClientId, SessInfo]).
+  ?INFO("[KAFKA PLUGIN]Session(~s) is discarded. Session Info: ~p~n", [ClientId, SessInfo]).
 
 on_session_takeovered(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) ->
-  io:format("Session(~s) is takeovered. Session Info: ~p~n", [ClientId, SessInfo]).
+  ?INFO("[KAFKA PLUGIN]Session(~s) is takeovered. Session Info: ~p~n", [ClientId, SessInfo]).
 
 on_session_terminated(_ClientInfo = #{clientid := ClientId}, Reason, SessInfo, _Env) ->
-  io:format("Session(~s) is terminated due to ~p~nSession Info: ~p~n",
+  ?INFO("[KAFKA PLUGIN]Session(~s) is terminated due to ~p~nSession Info: ~p~n",
     [ClientId, Reason, SessInfo]).
 
 ekaf_init(_Env) ->
   {ok, BrokerValues} = application:get_env(emqx_plugin_kafka, broker),
   KafkaHost = proplists:get_value(host, BrokerValues),
-  io:format("KafkaHost = ~s~n", [KafkaHost]),
+  ?INFO("[KAFKA PLUGIN]KafkaHost = ~s~n", [KafkaHost]),
   KafkaPort = proplists:get_value(port, BrokerValues),
-  io:format("KafkaPort = ~s~n", [KafkaPort]),
+  ?INFO("[KAFKA PLUGIN]KafkaPort = ~s~n", [KafkaPort]),
   KafkaPartitionStrategy = proplists:get_value(partitionstrategy, BrokerValues),
   KafkaPartitionWorkers = proplists:get_value(partitionworkers, BrokerValues),
   KafkaTopic = proplists:get_value(payloadtopic, BrokerValues),
-  io:format("KafkaTopic = ~s~n", [KafkaTopic]),
+  ?INFO("[KAFKA PLUGIN]KafkaTopic = ~s~n", [KafkaTopic]),
   application:set_env(ekaf, ekaf_bootstrap_broker, {KafkaHost, list_to_integer(KafkaPort)}),
   application:set_env(ekaf, ekaf_partition_strategy, list_to_atom(KafkaPartitionStrategy)),
   application:set_env(ekaf, ekaf_per_partition_workers, KafkaPartitionWorkers),
@@ -284,14 +285,14 @@ format_payload(Message) ->
   Topic = Message#message.topic,
   Tail = string:right(binary_to_list(Topic), 4),
   RawType = string:equal(Tail, <<"_raw">>),
-  % io:format("Tail= ~s , RawType= ~s~n",[Tail,RawType]),
+  % ?INFO("[KAFKA PLUGIN]Tail= ~s , RawType= ~s~n",[Tail,RawType]),
 
   MsgPayload = Message#message.payload,
-  % io:format("MsgPayload : ~s~n", [MsgPayload]),
+  % ?INFO("[KAFKA PLUGIN]MsgPayload : ~s~n", [MsgPayload]),
   if
     RawType == true ->
       MsgPayload64 = list_to_binary(base64:encode_to_string(MsgPayload));
-  % io:format("MsgPayload64 : ~s~n", [MsgPayload64]);
+  % ?INFO("[KAFKA PLUGIN]MsgPayload64 : ~s~n", [MsgPayload64]);
     RawType == false ->
       MsgPayload64 = MsgPayload
   end,
@@ -300,28 +301,28 @@ format_payload(Message) ->
     {username, Username},
     {topic, Topic},
     {payload, MsgPayload64},
-    {ts, emqx_time:now_secs(Message#message.timestamp)}],
+    {ts, Message#message.timestamp}],
 
   {ok, Payload}.
 
 
 %% Called when the plugin application stop
 unload() ->
-  emqx:unhook('client.connect', {?MODULE, on_client_connect}),
-  emqx:unhook('client.connack', {?MODULE, on_client_connack}),
+%%  emqx:unhook('client.connect', {?MODULE, on_client_connect}),
+%%  emqx:unhook('client.connack', {?MODULE, on_client_connack}),
   emqx:unhook('client.connected', {?MODULE, on_client_connected}),
   emqx:unhook('client.disconnected', {?MODULE, on_client_disconnected}),
-  emqx:unhook('client.authenticate', {?MODULE, on_client_authenticate}),
-  emqx:unhook('client.check_acl', {?MODULE, on_client_check_acl}),
-  emqx:unhook('client.subscribe', {?MODULE, on_client_subscribe}),
-  emqx:unhook('client.unsubscribe', {?MODULE, on_client_unsubscribe}),
-  emqx:unhook('session.created', {?MODULE, on_session_created}),
-  emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}),
-  emqx:unhook('session.unsubscribed', {?MODULE, on_session_unsubscribed}),
-  emqx:unhook('session.resumed', {?MODULE, on_session_resumed}),
-  emqx:unhook('session.discarded', {?MODULE, on_session_discarded}),
-  emqx:unhook('session.takeovered', {?MODULE, on_session_takeovered}),
-  emqx:unhook('session.terminated', {?MODULE, on_session_terminated}),
+%%  emqx:unhook('client.authenticate', {?MODULE, on_client_authenticate}),
+%%  emqx:unhook('client.check_acl', {?MODULE, on_client_check_acl}),
+%%  emqx:unhook('client.subscribe', {?MODULE, on_client_subscribe}),
+%%  emqx:unhook('client.unsubscribe', {?MODULE, on_client_unsubscribe}),
+%%  emqx:unhook('session.created', {?MODULE, on_session_created}),
+%%  emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}),
+%%  emqx:unhook('session.unsubscribed', {?MODULE, on_session_unsubscribed}),
+%%  emqx:unhook('session.resumed', {?MODULE, on_session_resumed}),
+%%  emqx:unhook('session.discarded', {?MODULE, on_session_discarded}),
+%%  emqx:unhook('session.takeovered', {?MODULE, on_session_takeovered}),
+%%  emqx:unhook('session.terminated', {?MODULE, on_session_terminated}),
   emqx:unhook('message.publish', {?MODULE, on_message_publish}),
   emqx:unhook('message.delivered', {?MODULE, on_message_delivered}),
   emqx:unhook('message.acked', {?MODULE, on_message_acked}),
@@ -330,7 +331,7 @@ unload() ->
 produce_kafka_payload(Message) ->
   Topic = ekaf_get_topic(),
   {ok, MessageBody} = emqx_json:safe_encode(Message),
-  % io:format("Message = ~s~n",[MessageBody]),
+  % ?INFO("[KAFKA PLUGIN]Message = ~s~n",[MessageBody]),
   Payload = iolist_to_binary(MessageBody),
   ekaf:produce_async_batched(Topic, Payload).
 
@@ -338,5 +339,5 @@ ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
   inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
 ntoa(IP) ->
   inet_parse:ntoa(IP).
-now_secs({MegaSecs, Secs, _MicroSecs}) ->
-  MegaSecs * 1000000 + Secs.
+now_mill_secs({MegaSecs, Secs, _MicroSecs}) ->
+  MegaSecs * 1000000000 + Secs * 1000 + _MicroSecs.