|
@@ -23,7 +23,7 @@
|
|
% -include_lib("emqx/include/emqx.hrl").
|
|
% -include_lib("emqx/include/emqx.hrl").
|
|
|
|
|
|
-include("emqx.hrl").
|
|
-include("emqx.hrl").
|
|
--include("logger.hrl").
|
|
|
|
|
|
+-include_lib("kernel/include/logger.hrl").
|
|
|
|
|
|
-export([load/1, unload/0]).
|
|
-export([load/1, unload/0]).
|
|
|
|
|
|
@@ -80,17 +80,17 @@ load(Env) ->
|
|
emqx:hook('message.dropped', {?MODULE, on_message_dropped, [Env]}).
|
|
emqx:hook('message.dropped', {?MODULE, on_message_dropped, [Env]}).
|
|
|
|
|
|
on_client_connect(ConnInfo = #{clientid := ClientId}, Props, _Env) ->
|
|
on_client_connect(ConnInfo = #{clientid := ClientId}, Props, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Client(~s) connect, ConnInfo: ~p, Props: ~p~n",
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Client(~s) connect, ConnInfo: ~p, Props: ~p~n",
|
|
[ClientId, ConnInfo, Props]),
|
|
[ClientId, ConnInfo, Props]),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
on_client_connack(ConnInfo = #{clientid := ClientId}, Rc, Props, _Env) ->
|
|
on_client_connack(ConnInfo = #{clientid := ClientId}, Rc, Props, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Client(~s) connack, ConnInfo: ~p, Rc: ~p, Props: ~p~n",
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Client(~s) connack, ConnInfo: ~p, Rc: ~p, Props: ~p~n",
|
|
[ClientId, ConnInfo, Rc, Props]),
|
|
[ClientId, ConnInfo, Rc, Props]),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, _Env) ->
|
|
on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Client(~s) connected, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Client(~s) connected, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
|
|
[ClientId, ClientInfo, ConnInfo]),
|
|
[ClientId, ClientInfo, ConnInfo]),
|
|
{IpAddr, _Port} = maps:get(peername, ConnInfo),
|
|
{IpAddr, _Port} = maps:get(peername, ConnInfo),
|
|
Action = <<"connected">>,
|
|
Action = <<"connected">>,
|
|
@@ -111,7 +111,7 @@ on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, _Env) ->
|
|
ok.
|
|
ok.
|
|
|
|
|
|
on_client_disconnected(ClientInfo = #{clientid := ClientId}, ReasonCode, ConnInfo, _Env) ->
|
|
on_client_disconnected(ClientInfo = #{clientid := ClientId}, ReasonCode, ConnInfo, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Client(~s) disconnected due to ~p, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Client(~s) disconnected due to ~p, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
|
|
[ClientId, ReasonCode, ClientInfo, ConnInfo]),
|
|
[ClientId, ReasonCode, ClientInfo, ConnInfo]),
|
|
Action = <<"disconnected">>,
|
|
Action = <<"disconnected">>,
|
|
Now = now_mill_secs(os:timestamp()),
|
|
Now = now_mill_secs(os:timestamp()),
|
|
@@ -128,17 +128,17 @@ on_client_disconnected(ClientInfo = #{clientid := ClientId}, ReasonCode, ConnInf
|
|
ok.
|
|
ok.
|
|
|
|
|
|
on_client_authenticate(_ClientInfo = #{clientid := ClientId}, Result, _Env) ->
|
|
on_client_authenticate(_ClientInfo = #{clientid := ClientId}, Result, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Client(~s) authenticate, Result:~n~p~n", [ClientId, Result]),
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Client(~s) authenticate, Result:~n~p~n", [ClientId, Result]),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
on_client_check_acl(_ClientInfo = #{clientid := ClientId}, Topic, PubSub, Result, _Env) ->
|
|
on_client_check_acl(_ClientInfo = #{clientid := ClientId}, Topic, PubSub, Result, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Client(~s) check_acl, PubSub:~p, Topic:~p, Result:~p~n",
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Client(~s) check_acl, PubSub:~p, Topic:~p, Result:~p~n",
|
|
[ClientId, PubSub, Topic, Result]),
|
|
[ClientId, PubSub, Topic, Result]),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
%%---------------------------client subscribe start--------------------------%%
|
|
%%---------------------------client subscribe start--------------------------%%
|
|
on_client_subscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
|
|
on_client_subscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Client(~s) will subscribe: ~p~n", [ClientId, TopicFilters]),
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Client(~s) will subscribe: ~p~n", [ClientId, TopicFilters]),
|
|
Topic = erlang:element(1, erlang:hd(TopicFilters)),
|
|
Topic = erlang:element(1, erlang:hd(TopicFilters)),
|
|
Qos = erlang:element(2, lists:last(TopicFilters)),
|
|
Qos = erlang:element(2, lists:last(TopicFilters)),
|
|
Action = <<"subscribe">>,
|
|
Action = <<"subscribe">>,
|
|
@@ -154,7 +154,7 @@ on_client_subscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
|
|
ok.
|
|
ok.
|
|
%%---------------------client subscribe stop----------------------%%
|
|
%%---------------------client subscribe stop----------------------%%
|
|
on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
|
|
on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Client(~s) will unsubscribe ~p~n", [ClientId, TopicFilters]),
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Client(~s) will unsubscribe ~p~n", [ClientId, TopicFilters]),
|
|
Topic = erlang:element(1, erlang:hd(TopicFilters)),
|
|
Topic = erlang:element(1, erlang:hd(TopicFilters)),
|
|
Action = <<"unsubscribe">>,
|
|
Action = <<"unsubscribe">>,
|
|
Now = now_mill_secs(os:timestamp()),
|
|
Now = now_mill_secs(os:timestamp()),
|
|
@@ -170,7 +170,7 @@ on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env)
|
|
on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason, _Env) ->
|
|
on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason, _Env) ->
|
|
ok;
|
|
ok;
|
|
on_message_dropped(Message, _By = #{node := Node}, Reason, _Env) ->
|
|
on_message_dropped(Message, _By = #{node := Node}, Reason, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Message dropped by node ~s due to ~s: ~s~n",
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Message dropped by node ~s due to ~s: ~s~n",
|
|
[Node, Reason, emqx_message:format(Message)]),
|
|
[Node, Reason, emqx_message:format(Message)]),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
@@ -185,7 +185,7 @@ on_message_publish(Message, _Env) ->
|
|
%%---------------------message publish stop----------------------%%
|
|
%%---------------------message publish stop----------------------%%
|
|
|
|
|
|
on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
|
|
on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Message delivered to client(~s): ~s~n",
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Message delivered to client(~s): ~s~n",
|
|
[ClientId, emqx_message:format(Message)]),
|
|
[ClientId, emqx_message:format(Message)]),
|
|
Topic = Message#message.topic,
|
|
Topic = Message#message.topic,
|
|
Payload = Message#message.payload,
|
|
Payload = Message#message.payload,
|
|
@@ -207,7 +207,7 @@ on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
|
|
ok.
|
|
ok.
|
|
|
|
|
|
on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
|
|
on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Message acked by client(~s): ~s~n",
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Message acked by client(~s): ~s~n",
|
|
[ClientId, emqx_message:format(Message)]),
|
|
[ClientId, emqx_message:format(Message)]),
|
|
Topic = Message#message.topic,
|
|
Topic = Message#message.topic,
|
|
Payload = Message#message.payload,
|
|
Payload = Message#message.payload,
|
|
@@ -232,32 +232,32 @@ on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
on_session_created(#{clientid := ClientId}, SessInfo, _Env) ->
|
|
on_session_created(#{clientid := ClientId}, SessInfo, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Session(~s) created, Session Info:~n~p~n", [ClientId, SessInfo]).
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Session(~s) created, Session Info:~n~p~n", [ClientId, SessInfo]).
|
|
ok.
|
|
ok.
|
|
|
|
|
|
|
|
|
|
on_session_subscribed(#{clientid := ClientId}, Topic, SubOpts, _Env) ->
|
|
on_session_subscribed(#{clientid := ClientId}, Topic, SubOpts, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Session(~s) subscribed ~s with subopts: ~p~n", [ClientId, Topic, SubOpts]),
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Session(~s) subscribed ~s with subopts: ~p~n", [ClientId, Topic, SubOpts]),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
on_session_unsubscribed(#{clientid := ClientId}, Topic, Opts, _Env) ->
|
|
on_session_unsubscribed(#{clientid := ClientId}, Topic, Opts, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Session(~s) unsubscribed ~s with opts: ~p~n", [ClientId, Topic, Opts]),
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Session(~s) unsubscribed ~s with opts: ~p~n", [ClientId, Topic, Opts]),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
on_session_resumed(#{clientid := ClientId}, SessInfo, _Env) ->
|
|
on_session_resumed(#{clientid := ClientId}, SessInfo, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Session(~s) resumed, Session Info:~n~p~n", [ClientId, SessInfo]),
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Session(~s) resumed, Session Info:~n~p~n", [ClientId, SessInfo]),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
on_session_discarded(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) ->
|
|
on_session_discarded(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Session(~s) is discarded. Session Info: ~p~n", [ClientId, SessInfo]),
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Session(~s) is discarded. Session Info: ~p~n", [ClientId, SessInfo]),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
on_session_takeovered(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) ->
|
|
on_session_takeovered(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Session(~s) is takeovered. Session Info: ~p~n", [ClientId, SessInfo]),
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Session(~s) is takeovered. Session Info: ~p~n", [ClientId, SessInfo]),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
on_session_terminated(_ClientInfo = #{clientid := ClientId}, Reason, SessInfo, _Env) ->
|
|
on_session_terminated(_ClientInfo = #{clientid := ClientId}, Reason, SessInfo, _Env) ->
|
|
- ?INFO("[KAFKA PLUGIN]Session(~s) is terminated due to ~p~nSession Info: ~p~n",
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]Session(~s) is terminated due to ~p~nSession Info: ~p~n",
|
|
[ClientId, Reason, SessInfo]),
|
|
[ClientId, Reason, SessInfo]),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
@@ -265,13 +265,13 @@ ekaf_init(_Env) ->
|
|
io:format("Init emqx plugin kafka.....")
|
|
io:format("Init emqx plugin kafka.....")
|
|
{ok, BrokerValues} = application:get_env(emqx_plugin_kafka, broker),
|
|
{ok, BrokerValues} = application:get_env(emqx_plugin_kafka, broker),
|
|
KafkaHost = proplists:get_value(host, BrokerValues),
|
|
KafkaHost = proplists:get_value(host, BrokerValues),
|
|
- ?INFO("[KAFKA PLUGIN]KafkaHost = ~s~n", [KafkaHost]),
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]KafkaHost = ~s~n", [KafkaHost]),
|
|
KafkaPort = proplists:get_value(port, BrokerValues),
|
|
KafkaPort = proplists:get_value(port, BrokerValues),
|
|
- ?INFO("[KAFKA PLUGIN]KafkaPort = ~s~n", [KafkaPort]),
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]KafkaPort = ~s~n", [KafkaPort]),
|
|
KafkaPartitionStrategy = proplists:get_value(partitionstrategy, BrokerValues),
|
|
KafkaPartitionStrategy = proplists:get_value(partitionstrategy, BrokerValues),
|
|
KafkaPartitionWorkers = proplists:get_value(partitionworkers, BrokerValues),
|
|
KafkaPartitionWorkers = proplists:get_value(partitionworkers, BrokerValues),
|
|
KafkaTopic = proplists:get_value(payloadtopic, BrokerValues),
|
|
KafkaTopic = proplists:get_value(payloadtopic, BrokerValues),
|
|
- ?INFO("[KAFKA PLUGIN]KafkaTopic = ~s~n", [KafkaTopic]),
|
|
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]KafkaTopic = ~s~n", [KafkaTopic]),
|
|
application:set_env(ekaf, ekaf_bootstrap_broker, {KafkaHost, list_to_integer(KafkaPort)}),
|
|
application:set_env(ekaf, ekaf_bootstrap_broker, {KafkaHost, list_to_integer(KafkaPort)}),
|
|
application:set_env(ekaf, ekaf_partition_strategy, list_to_atom(KafkaPartitionStrategy)),
|
|
application:set_env(ekaf, ekaf_partition_strategy, list_to_atom(KafkaPartitionStrategy)),
|
|
application:set_env(ekaf, ekaf_per_partition_workers, KafkaPartitionWorkers),
|
|
application:set_env(ekaf, ekaf_per_partition_workers, KafkaPartitionWorkers),
|
|
@@ -293,14 +293,14 @@ format_payload(Message) ->
|
|
Topic = Message#message.topic,
|
|
Topic = Message#message.topic,
|
|
Tail = string:right(binary_to_list(Topic), 4),
|
|
Tail = string:right(binary_to_list(Topic), 4),
|
|
RawType = string:equal(Tail, <<"_raw">>),
|
|
RawType = string:equal(Tail, <<"_raw">>),
|
|
- % ?INFO("[KAFKA PLUGIN]Tail= ~s , RawType= ~s~n",[Tail,RawType]),
|
|
|
|
|
|
+ % ?LOG_INFO("[KAFKA PLUGIN]Tail= ~s , RawType= ~s~n",[Tail,RawType]),
|
|
|
|
|
|
MsgPayload = Message#message.payload,
|
|
MsgPayload = Message#message.payload,
|
|
- % ?INFO("[KAFKA PLUGIN]MsgPayload : ~s~n", [MsgPayload]),
|
|
|
|
|
|
+ % ?LOG_INFO("[KAFKA PLUGIN]MsgPayload : ~s~n", [MsgPayload]),
|
|
if
|
|
if
|
|
RawType == true ->
|
|
RawType == true ->
|
|
MsgPayload64 = list_to_binary(base64:encode_to_string(MsgPayload));
|
|
MsgPayload64 = list_to_binary(base64:encode_to_string(MsgPayload));
|
|
- % ?INFO("[KAFKA PLUGIN]MsgPayload64 : ~s~n", [MsgPayload64]);
|
|
|
|
|
|
+ % ?LOG_INFO("[KAFKA PLUGIN]MsgPayload64 : ~s~n", [MsgPayload64]);
|
|
RawType == false ->
|
|
RawType == false ->
|
|
MsgPayload64 = MsgPayload
|
|
MsgPayload64 = MsgPayload
|
|
end,
|
|
end,
|
|
@@ -339,7 +339,7 @@ unload() ->
|
|
produce_kafka_payload(Message) ->
|
|
produce_kafka_payload(Message) ->
|
|
Topic = ekaf_get_topic(),
|
|
Topic = ekaf_get_topic(),
|
|
{ok, MessageBody} = emqx_json:safe_encode(Message),
|
|
{ok, MessageBody} = emqx_json:safe_encode(Message),
|
|
- % ?INFO("[KAFKA PLUGIN]Message = ~s~n",[MessageBody]),
|
|
|
|
|
|
+ % ?LOG_INFO("[KAFKA PLUGIN]Message = ~s~n",[MessageBody]),
|
|
Payload = iolist_to_binary(MessageBody),
|
|
Payload = iolist_to_binary(MessageBody),
|
|
ekaf:produce_async_batched(Topic, Payload).
|
|
ekaf:produce_async_batched(Topic, Payload).
|
|
|
|
|