|
@@ -82,19 +82,19 @@ load(Env) ->
|
|
|
on_client_connect(ConnInfo = #{clientid := ClientId}, Props, _Env) ->
|
|
|
?INFO("[KAFKA PLUGIN]Client(~s) connect, ConnInfo: ~p, Props: ~p~n",
|
|
|
[ClientId, ConnInfo, Props]),
|
|
|
- {ok, Props}.
|
|
|
+ ok.
|
|
|
|
|
|
on_client_connack(ConnInfo = #{clientid := ClientId}, Rc, Props, _Env) ->
|
|
|
?INFO("[KAFKA PLUGIN]Client(~s) connack, ConnInfo: ~p, Rc: ~p, Props: ~p~n",
|
|
|
[ClientId, ConnInfo, Rc, Props]),
|
|
|
- {ok, Props}.
|
|
|
+ ok.
|
|
|
|
|
|
on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, _Env) ->
|
|
|
?INFO("[KAFKA PLUGIN]Client(~s) connected, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
|
|
|
[ClientId, ClientInfo, ConnInfo]),
|
|
|
{IpAddr, _Port} = maps:get(peername, ConnInfo),
|
|
|
Action = <<"connected">>,
|
|
|
- Now = os:timestamp(),
|
|
|
+ Now = now_mill_secs(os:timestamp()),
|
|
|
Online = 1,
|
|
|
Payload = [
|
|
|
{action, Action},
|
|
@@ -114,7 +114,7 @@ on_client_disconnected(ClientInfo = #{clientid := ClientId}, ReasonCode, ConnInf
|
|
|
?INFO("[KAFKA PLUGIN]Client(~s) disconnected due to ~p, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
|
|
|
[ClientId, ReasonCode, ClientInfo, ConnInfo]),
|
|
|
Action = <<"disconnected">>,
|
|
|
- Now = calendar:datetime_to_gregorian_seconds(calendar:universal_time()) - 719528 * 24 * 3600,
|
|
|
+ Now = now_mill_secs(os:timestamp()),
|
|
|
Online = 0,
|
|
|
Payload = [
|
|
|
{action, Action},
|
|
@@ -129,12 +129,12 @@ on_client_disconnected(ClientInfo = #{clientid := ClientId}, ReasonCode, ConnInf
|
|
|
|
|
|
on_client_authenticate(_ClientInfo = #{clientid := ClientId}, Result, _Env) ->
|
|
|
%% ?INFO("[KAFKA PLUGIN]Client(~s) authenticate, Result:~n~p~n", [ClientId, Result]),
|
|
|
- {ok, Result}.
|
|
|
+ ok.
|
|
|
|
|
|
on_client_check_acl(_ClientInfo = #{clientid := ClientId}, Topic, PubSub, Result, _Env) ->
|
|
|
%% ?INFO("[KAFKA PLUGIN]Client(~s) check_acl, PubSub:~p, Topic:~p, Result:~p~n",
|
|
|
%% [ClientId, PubSub, Topic, Result]),
|
|
|
- {ok, Result}.
|
|
|
+ ok.
|
|
|
|
|
|
%%---------------------------client subscribe start--------------------------%%
|
|
|
on_client_subscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
|
|
@@ -151,7 +151,7 @@ on_client_subscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
|
|
|
{timestamp, Now}
|
|
|
],
|
|
|
produce_kafka_payload(Payload),
|
|
|
- {ok, TopicFilters}.
|
|
|
+ ok.
|
|
|
%%---------------------client subscribe stop----------------------%%
|
|
|
on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
|
|
|
?INFO("[KAFKA PLUGIN]Client(~s) will unsubscribe ~p~n", [ClientId, TopicFilters]),
|
|
@@ -165,22 +165,23 @@ on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env)
|
|
|
{timestamp, Now}
|
|
|
],
|
|
|
produce_kafka_payload(Payload),
|
|
|
- {ok, TopicFilters}.
|
|
|
+ ok.
|
|
|
|
|
|
on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason, _Env) ->
|
|
|
ok;
|
|
|
on_message_dropped(Message, _By = #{node := Node}, Reason, _Env) ->
|
|
|
?INFO("[KAFKA PLUGIN]Message dropped by node ~s due to ~s: ~s~n",
|
|
|
- [Node, Reason, emqx_message:format(Message)]).
|
|
|
+ [Node, Reason, emqx_message:format(Message)]),
|
|
|
+ ok.
|
|
|
|
|
|
|
|
|
%%---------------------------message publish start--------------------------%%
|
|
|
on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
|
|
|
- {ok, Message};
|
|
|
+ ok;
|
|
|
on_message_publish(Message, _Env) ->
|
|
|
{ok, Payload} = format_payload(Message),
|
|
|
produce_kafka_payload(Payload),
|
|
|
- {ok, Message}.
|
|
|
+ ok.
|
|
|
%%---------------------message publish stop----------------------%%
|
|
|
|
|
|
on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
|
|
@@ -202,8 +203,8 @@ on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
|
|
|
{ts, Timestamp}
|
|
|
]),
|
|
|
EkafTopic = ekaf_get_topic(),
|
|
|
- ekaf:produce_sync(EkafTopic, Json),
|
|
|
- {ok, Message}.
|
|
|
+ ekaf:produce_async_batched(EkafTopic, Json),
|
|
|
+ ok.
|
|
|
|
|
|
on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
|
|
|
?INFO("[KAFKA PLUGIN]Message acked by client(~s): ~s~n",
|
|
@@ -236,23 +237,29 @@ on_session_created(#{clientid := ClientId}, SessInfo, _Env) ->
|
|
|
|
|
|
|
|
|
on_session_subscribed(#{clientid := ClientId}, Topic, SubOpts, _Env) ->
|
|
|
- ?INFO("[KAFKA PLUGIN]Session(~s) subscribed ~s with subopts: ~p~n", [ClientId, Topic, SubOpts]).
|
|
|
+ ?INFO("[KAFKA PLUGIN]Session(~s) subscribed ~s with subopts: ~p~n", [ClientId, Topic, SubOpts]),
|
|
|
+ ok.
|
|
|
|
|
|
on_session_unsubscribed(#{clientid := ClientId}, Topic, Opts, _Env) ->
|
|
|
- ?INFO("[KAFKA PLUGIN]Session(~s) unsubscribed ~s with opts: ~p~n", [ClientId, Topic, Opts]).
|
|
|
+ ?INFO("[KAFKA PLUGIN]Session(~s) unsubscribed ~s with opts: ~p~n", [ClientId, Topic, Opts]),
|
|
|
+ ok.
|
|
|
|
|
|
on_session_resumed(#{clientid := ClientId}, SessInfo, _Env) ->
|
|
|
- ?INFO("[KAFKA PLUGIN]Session(~s) resumed, Session Info:~n~p~n", [ClientId, SessInfo]).
|
|
|
+ ?INFO("[KAFKA PLUGIN]Session(~s) resumed, Session Info:~n~p~n", [ClientId, SessInfo]),
|
|
|
+ ok.
|
|
|
|
|
|
on_session_discarded(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) ->
|
|
|
- ?INFO("[KAFKA PLUGIN]Session(~s) is discarded. Session Info: ~p~n", [ClientId, SessInfo]).
|
|
|
+ ?INFO("[KAFKA PLUGIN]Session(~s) is discarded. Session Info: ~p~n", [ClientId, SessInfo]),
|
|
|
+ ok.
|
|
|
|
|
|
on_session_takeovered(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) ->
|
|
|
- ?INFO("[KAFKA PLUGIN]Session(~s) is takeovered. Session Info: ~p~n", [ClientId, SessInfo]).
|
|
|
+ ?INFO("[KAFKA PLUGIN]Session(~s) is takeovered. Session Info: ~p~n", [ClientId, SessInfo]),
|
|
|
+ ok.
|
|
|
|
|
|
on_session_terminated(_ClientInfo = #{clientid := ClientId}, Reason, SessInfo, _Env) ->
|
|
|
?INFO("[KAFKA PLUGIN]Session(~s) is terminated due to ~p~nSession Info: ~p~n",
|
|
|
- [ClientId, Reason, SessInfo]).
|
|
|
+ [ClientId, Reason, SessInfo]),
|
|
|
+ ok.
|
|
|
|
|
|
ekaf_init(_Env) ->
|
|
|
{ok, BrokerValues} = application:get_env(emqx_plugin_kafka, broker),
|