Browse Source

格式化

Ultrakid 3 years ago
parent
commit
e4269e72d2
1 changed files with 248 additions and 249 deletions
  1. 248 249
      src/emqx_plugin_kafka.erl

+ 248 - 249
src/emqx_plugin_kafka.erl

@@ -27,317 +27,316 @@
 -export([load/1, unload/0]).
 -export([load/1, unload/0]).
 
 
 %% Client Lifecircle Hooks
 %% Client Lifecircle Hooks
--export([ on_client_connect/3
-        , on_client_connack/4
-        , on_client_connected/3
-        , on_client_disconnected/4
-        , on_client_authenticate/3
-        , on_client_check_acl/5
-        , on_client_subscribe/4
-        , on_client_unsubscribe/4
-        ]).
+-export([on_client_connect/3
+  , on_client_connack/4
+  , on_client_connected/3
+  , on_client_disconnected/4
+  , on_client_authenticate/3
+  , on_client_check_acl/5
+  , on_client_subscribe/4
+  , on_client_unsubscribe/4
+]).
 
 
 %% Session Lifecircle Hooks
 %% Session Lifecircle Hooks
--export([ on_session_created/3
-        , on_session_subscribed/4
-        , on_session_unsubscribed/4
-        , on_session_resumed/3
-        , on_session_discarded/3
-        , on_session_takeovered/3
-        , on_session_terminated/4
-        ]).
+-export([on_session_created/3
+  , on_session_subscribed/4
+  , on_session_unsubscribed/4
+  , on_session_resumed/3
+  , on_session_discarded/3
+  , on_session_takeovered/3
+  , on_session_terminated/4
+]).
 
 
 %% Message Pubsub Hooks
 %% Message Pubsub Hooks
--export([ on_message_publish/2
-        , on_message_delivered/3
-        , on_message_acked/3
-        , on_message_dropped/4
-        ]).
-
+-export([on_message_publish/2
+  , on_message_delivered/3
+  , on_message_acked/3
+  , on_message_dropped/4
+]).
 
 
 
 
 %% Called when the plugin application start
 %% Called when the plugin application start
 load(Env) ->
 load(Env) ->
-    ekaf_init([Env]), 
-    emqx:hook('client.connect',      {?MODULE, on_client_connect, [Env]}),
-    emqx:hook('client.connack',      {?MODULE, on_client_connack, [Env]}),
-    emqx:hook('client.connected',    {?MODULE, on_client_connected, [Env]}),
-    emqx:hook('client.disconnected', {?MODULE, on_client_disconnected, [Env]}),
-    emqx:hook('client.authenticate', {?MODULE, on_client_authenticate, [Env]}),
-    emqx:hook('client.check_acl',    {?MODULE, on_client_check_acl, [Env]}),
-    emqx:hook('client.subscribe',    {?MODULE, on_client_subscribe, [Env]}),
-    emqx:hook('client.unsubscribe',  {?MODULE, on_client_unsubscribe, [Env]}),
-    emqx:hook('session.created',     {?MODULE, on_session_created, [Env]}),
-    emqx:hook('session.subscribed',  {?MODULE, on_session_subscribed, [Env]}),
-    emqx:hook('session.unsubscribed',{?MODULE, on_session_unsubscribed, [Env]}),
-    emqx:hook('session.resumed',     {?MODULE, on_session_resumed, [Env]}),
-    emqx:hook('session.discarded',   {?MODULE, on_session_discarded, [Env]}),
-    emqx:hook('session.takeovered',  {?MODULE, on_session_takeovered, [Env]}),
-    emqx:hook('session.terminated',  {?MODULE, on_session_terminated, [Env]}),
-    emqx:hook('message.publish',     {?MODULE, on_message_publish, [Env]}),
-    emqx:hook('message.delivered',   {?MODULE, on_message_delivered, [Env]}),
-    emqx:hook('message.acked',       {?MODULE, on_message_acked, [Env]}),
-    emqx:hook('message.dropped',     {?MODULE, on_message_dropped, [Env]}).
-	
+  ekaf_init([Env]),
+  emqx:hook('client.connect', {?MODULE, on_client_connect, [Env]}),
+  emqx:hook('client.connack', {?MODULE, on_client_connack, [Env]}),
+  emqx:hook('client.connected', {?MODULE, on_client_connected, [Env]}),
+  emqx:hook('client.disconnected', {?MODULE, on_client_disconnected, [Env]}),
+  emqx:hook('client.authenticate', {?MODULE, on_client_authenticate, [Env]}),
+  emqx:hook('client.check_acl', {?MODULE, on_client_check_acl, [Env]}),
+  emqx:hook('client.subscribe', {?MODULE, on_client_subscribe, [Env]}),
+  emqx:hook('client.unsubscribe', {?MODULE, on_client_unsubscribe, [Env]}),
+  emqx:hook('session.created', {?MODULE, on_session_created, [Env]}),
+  emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Env]}),
+  emqx:hook('session.unsubscribed', {?MODULE, on_session_unsubscribed, [Env]}),
+  emqx:hook('session.resumed', {?MODULE, on_session_resumed, [Env]}),
+  emqx:hook('session.discarded', {?MODULE, on_session_discarded, [Env]}),
+  emqx:hook('session.takeovered', {?MODULE, on_session_takeovered, [Env]}),
+  emqx:hook('session.terminated', {?MODULE, on_session_terminated, [Env]}),
+  emqx:hook('message.publish', {?MODULE, on_message_publish, [Env]}),
+  emqx:hook('message.delivered', {?MODULE, on_message_delivered, [Env]}),
+  emqx:hook('message.acked', {?MODULE, on_message_acked, [Env]}),
+  emqx:hook('message.dropped', {?MODULE, on_message_dropped, [Env]}).
+
 on_client_connect(ConnInfo = #{clientid := ClientId}, Props, _Env) ->
 on_client_connect(ConnInfo = #{clientid := ClientId}, Props, _Env) ->
-    %io:format("Client(~s) connect, ConnInfo: ~p, Props: ~p~n",
-              %[ClientId, ConnInfo, Props]),
-    {ok, Props}.
+  %io:format("Client(~s) connect, ConnInfo: ~p, Props: ~p~n",
+  %[ClientId, ConnInfo, Props]),
+  {ok, Props}.
 
 
 on_client_connack(ConnInfo = #{clientid := ClientId}, Rc, Props, _Env) ->
 on_client_connack(ConnInfo = #{clientid := ClientId}, Rc, Props, _Env) ->
-    %io:format("Client(~s) connack, ConnInfo: ~p, Rc: ~p, Props: ~p~n",
-              %[ClientId, ConnInfo, Rc, Props]),
-    {ok, Props}.
-	
+  %io:format("Client(~s) connack, ConnInfo: ~p, Rc: ~p, Props: ~p~n",
+  %[ClientId, ConnInfo, Rc, Props]),
+  {ok, Props}.
+
 on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, _Env) ->
 on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, _Env) ->
-    io:format("Client(~s) connected, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
-              [ClientId, ClientInfo, ConnInfo]),
-		{IpAddr, _Port} = maps:get(peername, ConnInfo),
-    	Action = <<"connected">>,
-    	Now = calendar:datetime_to_gregorian_seconds(calendar:universal_time())-719528*24*3600,
-    	Online = 1,
-    	Payload = [
-		{action, Action}, 
-		{device_id, ClientId}, 
-		{username, maps:get(username, ClientInfo)},
-		{keepalive, maps:get(keepalive, ConnInfo)},
-		{ipaddress, iolist_to_binary(ntoa(IpAddr))},
-		{proto_name, maps:get(proto_name, ConnInfo)},
-		{proto_ver, maps:get(proto_ver, ConnInfo)},
-		{timestamp, Now},
-		{online, Online}
-    ],
-   produce_kafka_payload(Payload),
-   ok.
+  io:format("Client(~s) connected, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
+    [ClientId, ClientInfo, ConnInfo]),
+  {IpAddr, _Port} = maps:get(peername, ConnInfo),
+  Action = <<"connected">>,
+  Now = calendar:datetime_to_gregorian_seconds(calendar:universal_time()) - 719528 * 24 * 3600,
+  Online = 1,
+  Payload = [
+    {action, Action},
+    {device_id, ClientId},
+    {username, maps:get(username, ClientInfo)},
+    {keepalive, maps:get(keepalive, ConnInfo)},
+    {ipaddress, iolist_to_binary(ntoa(IpAddr))},
+    {proto_name, maps:get(proto_name, ConnInfo)},
+    {proto_ver, maps:get(proto_ver, ConnInfo)},
+    {timestamp, Now},
+    {online, Online}
+  ],
+  produce_kafka_payload(Payload),
+  ok.
 
 
 on_client_disconnected(ClientInfo = #{clientid := ClientId}, ReasonCode, ConnInfo, _Env) ->
 on_client_disconnected(ClientInfo = #{clientid := ClientId}, ReasonCode, ConnInfo, _Env) ->
-    io:format("Client(~s) disconnected due to ~p, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
-              [ClientId, ReasonCode, ClientInfo, ConnInfo]), 
-    Action = <<"disconnected">>,
-    Now = calendar:datetime_to_gregorian_seconds(calendar:universal_time())-719528*24*3600,
-    Online = 0,
-    Payload = [
-		{action, Action}, 
-		{device_id, ClientId}, 
-		{username, maps:get(username, ClientInfo)},
- 		{reason, ReasonCode},
-		{timestamp, Now},
-		{online, Online}
-	],	
-    produce_kafka_payload(Payload),
-    ok.
+  io:format("Client(~s) disconnected due to ~p, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
+    [ClientId, ReasonCode, ClientInfo, ConnInfo]),
+  Action = <<"disconnected">>,
+  Now = calendar:datetime_to_gregorian_seconds(calendar:universal_time()) - 719528 * 24 * 3600,
+  Online = 0,
+  Payload = [
+    {action, Action},
+    {device_id, ClientId},
+    {username, maps:get(username, ClientInfo)},
+    {reason, ReasonCode},
+    {timestamp, Now},
+    {online, Online}
+  ],
+  produce_kafka_payload(Payload),
+  ok.
 
 
 on_client_authenticate(_ClientInfo = #{clientid := ClientId}, Result, _Env) ->
 on_client_authenticate(_ClientInfo = #{clientid := ClientId}, Result, _Env) ->
-    io:format("Client(~s) authenticate, Result:~n~p~n", [ClientId, Result]),
-    {ok, Result}.
+  io:format("Client(~s) authenticate, Result:~n~p~n", [ClientId, Result]),
+  {ok, Result}.
 
 
 on_client_check_acl(_ClientInfo = #{clientid := ClientId}, Topic, PubSub, Result, _Env) ->
 on_client_check_acl(_ClientInfo = #{clientid := ClientId}, Topic, PubSub, Result, _Env) ->
-    io:format("Client(~s) check_acl, PubSub:~p, Topic:~p, Result:~p~n",
-              [ClientId, PubSub, Topic, Result]),
-    {ok, Result}.
-	
+  io:format("Client(~s) check_acl, PubSub:~p, Topic:~p, Result:~p~n",
+    [ClientId, PubSub, Topic, Result]),
+  {ok, Result}.
+
 %%---------------------------client subscribe start--------------------------%%
 %%---------------------------client subscribe start--------------------------%%
 on_client_subscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
 on_client_subscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
-    io:format("Client(~s) will subscribe: ~p~n", [ClientId, TopicFilters]),  
-    Topic=erlang:element(1,erlang:hd(TopicFilters)),
-    Qos = erlang:element(2,lists:last(TopicFilters)),
-    Action = <<"subscribe">>,
-    Now = calendar:datetime_to_gregorian_seconds(calendar:universal_time())-719528*24*3600,
-    Payload = [
-		{device_id, ClientId},
-		{action, Action},  
-		{topic, Topic},
-		{qos, maps:get(qos, Qos)},
-		{timestamp, Now}
-	],	
-    produce_kafka_payload(Payload),
-	{ok, TopicFilters}.
+  io:format("Client(~s) will subscribe: ~p~n", [ClientId, TopicFilters]),
+  Topic = erlang:element(1, erlang:hd(TopicFilters)),
+  Qos = erlang:element(2, lists:last(TopicFilters)),
+  Action = <<"subscribe">>,
+  Now = calendar:datetime_to_gregorian_seconds(calendar:universal_time()) - 719528 * 24 * 3600,
+  Payload = [
+    {device_id, ClientId},
+    {action, Action},
+    {topic, Topic},
+    {qos, maps:get(qos, Qos)},
+    {timestamp, Now}
+  ],
+  produce_kafka_payload(Payload),
+  {ok, TopicFilters}.
 %%---------------------client subscribe stop----------------------%%
 %%---------------------client subscribe stop----------------------%%
 on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
 on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
-    io:format("Client(~s) will unsubscribe ~p~n", [ClientId, TopicFilters]),   
-	Topic=erlang:element(1,erlang:hd(TopicFilters)),
-    Action = <<"unsubscribe">>,
-    Now = calendar:datetime_to_gregorian_seconds(calendar:universal_time())-719528*24*3600,
-    Payload = [
-		{device_id, ClientId},
-		{action, Action},  
-		{topic, Topic},
-		{timestamp, Now}
-	],	
-    produce_kafka_payload(Payload),
-    {ok, TopicFilters}.
+  io:format("Client(~s) will unsubscribe ~p~n", [ClientId, TopicFilters]),
+  Topic = erlang:element(1, erlang:hd(TopicFilters)),
+  Action = <<"unsubscribe">>,
+  Now = calendar:datetime_to_gregorian_seconds(calendar:universal_time()) - 719528 * 24 * 3600,
+  Payload = [
+    {device_id, ClientId},
+    {action, Action},
+    {topic, Topic},
+    {timestamp, Now}
+  ],
+  produce_kafka_payload(Payload),
+  {ok, TopicFilters}.
 
 
 on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason, _Env) ->
 on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason, _Env) ->
-    ok;
+  ok;
 on_message_dropped(Message, _By = #{node := Node}, Reason, _Env) ->
 on_message_dropped(Message, _By = #{node := Node}, Reason, _Env) ->
-    io:format("Message dropped by node ~s due to ~s: ~s~n",
-              [Node, Reason, emqx_message:format(Message)]).
+  io:format("Message dropped by node ~s due to ~s: ~s~n",
+    [Node, Reason, emqx_message:format(Message)]).
+
 
 
-	
 %%---------------------------message publish start--------------------------%%
 %%---------------------------message publish start--------------------------%%
 on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
 on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
-    {ok, Message};
+  {ok, Message};
 on_message_publish(Message, _Env) ->
 on_message_publish(Message, _Env) ->
-    {ok, Payload} = format_payload(Message),
-    produce_kafka_payload(Payload),
-    {ok, Message}.	
+  {ok, Payload} = format_payload(Message),
+  produce_kafka_payload(Payload),
+  {ok, Message}.
 %%---------------------message publish stop----------------------%%
 %%---------------------message publish stop----------------------%%
 
 
 on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
 on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
-    io:format("Message delivered to client(~s): ~s~n",
-              [ClientId, emqx_message:format(Message)]),
-    Topic=Message#message.topic,
-    Payload=Message#message.payload,
-    Qos=Message#message.qos,
-    From=Message#message.from,
-    Timestamp=Message#message.timestamp,
-    Json = jsx:encode([
-     		{type,<<"deliver messqge">>},
-		{from,From},
-		{to,ClientId},
-		{topic,Topic},
-     	{payload,Payload},
-     	{qos,Qos},
-     	{cluster_node,node()},
-	{ts,now_secs(Timestamp)}
-	 ]),
-	EkafTopic=ekaf_get_topic(),
-	ekaf:produce_sync(EkafTopic,Json),
- 	{ok, Message}.
+  io:format("Message delivered to client(~s): ~s~n",
+    [ClientId, emqx_message:format(Message)]),
+  Topic = Message#message.topic,
+  Payload = Message#message.payload,
+  Qos = Message#message.qos,
+  From = Message#message.from,
+  Timestamp = Message#message.timestamp,
+  Json = jsx:encode([
+    {type, <<"deliver messqge">>},
+    {from, From},
+    {to, ClientId},
+    {topic, Topic},
+    {payload, Payload},
+    {qos, Qos},
+    {cluster_node, node()},
+    {ts, now_secs(Timestamp)}
+  ]),
+  EkafTopic = ekaf_get_topic(),
+  ekaf:produce_sync(EkafTopic, Json),
+  {ok, Message}.
 
 
 on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
 on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
-    io:format("Message acked by client(~s): ~s~n",
-              [ClientId, emqx_message:format(Message)]),
-    Topic=Message#message.topic,
-    Payload=Message#message.payload,
-    Qos=Message#message.qos,
-    From=Message#message.from,
-    Timestamp=Message#message.timestamp,
-    Json = jsx:encode([
-     	{type,<<"acked messqge">>},
-	{from,From},
-	{to,ClientId},
-	{topic,Topic},
-     	{payload,Payload},
-     	{qos,Qos},
-     	{cluster_node,node()},
-	{ts,now_secs(Timestamp)}
-	 ]),
-	EkafTopic=ekaf_get_topic(),
-	ekaf:produce_sync(EkafTopic,Json).
-	
+  io:format("Message acked by client(~s): ~s~n",
+    [ClientId, emqx_message:format(Message)]),
+  Topic = Message#message.topic,
+  Payload = Message#message.payload,
+  Qos = Message#message.qos,
+  From = Message#message.from,
+  Timestamp = Message#message.timestamp,
+  Json = jsx:encode([
+    {type, <<"acked messqge">>},
+    {from, From},
+    {to, ClientId},
+    {topic, Topic},
+    {payload, Payload},
+    {qos, Qos},
+    {cluster_node, node()},
+    {ts, now_secs(Timestamp)}
+  ]),
+  EkafTopic = ekaf_get_topic(),
+  ekaf:produce_sync(EkafTopic, Json).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Session Lifecircle Hooks
 %% Session Lifecircle Hooks
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 on_session_created(#{clientid := ClientId}, SessInfo, _Env) ->
 on_session_created(#{clientid := ClientId}, SessInfo, _Env) ->
-    %io:format("Session(~s) created, Session Info:~n~p~n", [ClientId, SessInfo]).
-     ok.
-	
+  %io:format("Session(~s) created, Session Info:~n~p~n", [ClientId, SessInfo]).
+  ok.
+
 
 
 on_session_subscribed(#{clientid := ClientId}, Topic, SubOpts, _Env) ->
 on_session_subscribed(#{clientid := ClientId}, Topic, SubOpts, _Env) ->
-    io:format("Session(~s) subscribed ~s with subopts: ~p~n", [ClientId, Topic, SubOpts]).
+  io:format("Session(~s) subscribed ~s with subopts: ~p~n", [ClientId, Topic, SubOpts]).
 
 
 on_session_unsubscribed(#{clientid := ClientId}, Topic, Opts, _Env) ->
 on_session_unsubscribed(#{clientid := ClientId}, Topic, Opts, _Env) ->
-    io:format("Session(~s) unsubscribed ~s with opts: ~p~n", [ClientId, Topic, Opts]).
+  io:format("Session(~s) unsubscribed ~s with opts: ~p~n", [ClientId, Topic, Opts]).
 
 
 on_session_resumed(#{clientid := ClientId}, SessInfo, _Env) ->
 on_session_resumed(#{clientid := ClientId}, SessInfo, _Env) ->
-    io:format("Session(~s) resumed, Session Info:~n~p~n", [ClientId, SessInfo]).
+  io:format("Session(~s) resumed, Session Info:~n~p~n", [ClientId, SessInfo]).
 
 
 on_session_discarded(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) ->
 on_session_discarded(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) ->
-    io:format("Session(~s) is discarded. Session Info: ~p~n", [ClientId, SessInfo]).
+  io:format("Session(~s) is discarded. Session Info: ~p~n", [ClientId, SessInfo]).
 
 
 on_session_takeovered(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) ->
 on_session_takeovered(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) ->
-    io:format("Session(~s) is takeovered. Session Info: ~p~n", [ClientId, SessInfo]).
+  io:format("Session(~s) is takeovered. Session Info: ~p~n", [ClientId, SessInfo]).
 
 
 on_session_terminated(_ClientInfo = #{clientid := ClientId}, Reason, SessInfo, _Env) ->
 on_session_terminated(_ClientInfo = #{clientid := ClientId}, Reason, SessInfo, _Env) ->
-    io:format("Session(~s) is terminated due to ~p~nSession Info: ~p~n",
-              [ClientId, Reason, SessInfo]).	
- 	
+  io:format("Session(~s) is terminated due to ~p~nSession Info: ~p~n",
+    [ClientId, Reason, SessInfo]).
+
 ekaf_init(_Env) ->
 ekaf_init(_Env) ->
-    {ok, BrokerValues} = application:get_env(emqx_plugin_kafka, broker),
-    KafkaHost = proplists:get_value(host, BrokerValues),
-    io:format("KafkaHost = ~s~n",[KafkaHost]),
-    KafkaPort = proplists:get_value(port, BrokerValues),
-    io:format("KafkaPort = ~s~n",[KafkaPort]),
-    KafkaPartitionStrategy = proplists:get_value(partitionstrategy, BrokerValues),
-    KafkaPartitionWorkers = proplists:get_value(partitionworkers, BrokerValues),
-    KafkaTopic = proplists:get_value(payloadtopic, BrokerValues),
-    io:format("KafkaTopic = ~s~n",[KafkaTopic]),
-    application:set_env(ekaf, ekaf_bootstrap_broker, {KafkaHost, list_to_integer(KafkaPort)}),
-    application:set_env(ekaf, ekaf_partition_strategy, list_to_atom(KafkaPartitionStrategy)),
-    application:set_env(ekaf, ekaf_per_partition_workers, KafkaPartitionWorkers),
-    application:set_env(ekaf, ekaf_bootstrap_topics, list_to_binary(KafkaTopic)),
-    application:set_env(ekaf, ekaf_buffer_ttl, 10),
-    application:set_env(ekaf, ekaf_max_downtime_buffer_size, 5),
-    % {ok, _} = application:ensure_all_started(kafkamocker),
-    {ok, _} = application:ensure_all_started(gproc),
-    % {ok, _} = application:ensure_all_started(ranch),
-    {ok, _} = application:ensure_all_started(ekaf).
+  {ok, BrokerValues} = application:get_env(emqx_plugin_kafka, broker),
+  KafkaHost = proplists:get_value(host, BrokerValues),
+  io:format("KafkaHost = ~s~n", [KafkaHost]),
+  KafkaPort = proplists:get_value(port, BrokerValues),
+  io:format("KafkaPort = ~s~n", [KafkaPort]),
+  KafkaPartitionStrategy = proplists:get_value(partitionstrategy, BrokerValues),
+  KafkaPartitionWorkers = proplists:get_value(partitionworkers, BrokerValues),
+  KafkaTopic = proplists:get_value(payloadtopic, BrokerValues),
+  io:format("KafkaTopic = ~s~n", [KafkaTopic]),
+  application:set_env(ekaf, ekaf_bootstrap_broker, {KafkaHost, list_to_integer(KafkaPort)}),
+  application:set_env(ekaf, ekaf_partition_strategy, list_to_atom(KafkaPartitionStrategy)),
+  application:set_env(ekaf, ekaf_per_partition_workers, KafkaPartitionWorkers),
+  application:set_env(ekaf, ekaf_bootstrap_topics, list_to_binary(KafkaTopic)),
+  application:set_env(ekaf, ekaf_buffer_ttl, 10),
+  application:set_env(ekaf, ekaf_max_downtime_buffer_size, 5),
+  % {ok, _} = application:ensure_all_started(kafkamocker),
+  {ok, _} = application:ensure_all_started(gproc),
+  % {ok, _} = application:ensure_all_started(ranch),
+  {ok, _} = application:ensure_all_started(ekaf).
 
 
 ekaf_get_topic() ->
 ekaf_get_topic() ->
-    {ok, Topic} = application:get_env(ekaf, ekaf_bootstrap_topics),
-    Topic.
+  {ok, Topic} = application:get_env(ekaf, ekaf_bootstrap_topics),
+  Topic.
 
 
 
 
 format_payload(Message) ->
 format_payload(Message) ->
-    Username = emqx_message:get_header(username, Message),
-    Topic = Message#message.topic,
-    Tail = string:right(binary_to_list(Topic), 4),
-    RawType = string:equal(Tail, <<"_raw">>),
-    % io:format("Tail= ~s , RawType= ~s~n",[Tail,RawType]),
-
-    MsgPayload = Message#message.payload,
-    % io:format("MsgPayload : ~s~n", [MsgPayload]),
-    if
-        RawType == true ->
-            MsgPayload64 = list_to_binary(base64:encode_to_string(MsgPayload));
-    % io:format("MsgPayload64 : ~s~n", [MsgPayload64]);
-        RawType == false ->
-            MsgPayload64 = MsgPayload
-    end,
-    Payload = [{action, message_publish},
-        {device_id, Message#message.from},
-        {username, Username},
-        {topic, Topic},
-        {payload, MsgPayload64},
-        {ts, emqx_time:now_secs(Message#message.timestamp)}],
-
-    {ok, Payload}.
+  Username = emqx_message:get_header(username, Message),
+  Topic = Message#message.topic,
+  Tail = string:right(binary_to_list(Topic), 4),
+  RawType = string:equal(Tail, <<"_raw">>),
+  % io:format("Tail= ~s , RawType= ~s~n",[Tail,RawType]),
+
+  MsgPayload = Message#message.payload,
+  % io:format("MsgPayload : ~s~n", [MsgPayload]),
+  if
+    RawType == true ->
+      MsgPayload64 = list_to_binary(base64:encode_to_string(MsgPayload));
+  % io:format("MsgPayload64 : ~s~n", [MsgPayload64]);
+    RawType == false ->
+      MsgPayload64 = MsgPayload
+  end,
+  Payload = [{action, message_publish},
+    {device_id, Message#message.from},
+    {username, Username},
+    {topic, Topic},
+    {payload, MsgPayload64},
+    {ts, emqx_time:now_secs(Message#message.timestamp)}],
+
+  {ok, Payload}.
 
 
 
 
 %% Called when the plugin application stop
 %% Called when the plugin application stop
 unload() ->
 unload() ->
-    emqx:unhook('client.connect',      {?MODULE, on_client_connect}),
-    emqx:unhook('client.connack',      {?MODULE, on_client_connack}),
-    emqx:unhook('client.connected',    {?MODULE, on_client_connected}),
-    emqx:unhook('client.disconnected', {?MODULE, on_client_disconnected}),
-    emqx:unhook('client.authenticate', {?MODULE, on_client_authenticate}),
-    emqx:unhook('client.check_acl',    {?MODULE, on_client_check_acl}),
-    emqx:unhook('client.subscribe',    {?MODULE, on_client_subscribe}),
-    emqx:unhook('client.unsubscribe',  {?MODULE, on_client_unsubscribe}),
-    emqx:unhook('session.created',     {?MODULE, on_session_created}),
-    emqx:unhook('session.subscribed',  {?MODULE, on_session_subscribed}),
-    emqx:unhook('session.unsubscribed',{?MODULE, on_session_unsubscribed}),
-    emqx:unhook('session.resumed',     {?MODULE, on_session_resumed}),
-    emqx:unhook('session.discarded',   {?MODULE, on_session_discarded}),
-    emqx:unhook('session.takeovered',  {?MODULE, on_session_takeovered}),
-    emqx:unhook('session.terminated',  {?MODULE, on_session_terminated}),
-    emqx:unhook('message.publish',     {?MODULE, on_message_publish}),
-    emqx:unhook('message.delivered',   {?MODULE, on_message_delivered}),
-    emqx:unhook('message.acked',       {?MODULE, on_message_acked}),
-    emqx:unhook('message.dropped',     {?MODULE, on_message_dropped}).
+  emqx:unhook('client.connect', {?MODULE, on_client_connect}),
+  emqx:unhook('client.connack', {?MODULE, on_client_connack}),
+  emqx:unhook('client.connected', {?MODULE, on_client_connected}),
+  emqx:unhook('client.disconnected', {?MODULE, on_client_disconnected}),
+  emqx:unhook('client.authenticate', {?MODULE, on_client_authenticate}),
+  emqx:unhook('client.check_acl', {?MODULE, on_client_check_acl}),
+  emqx:unhook('client.subscribe', {?MODULE, on_client_subscribe}),
+  emqx:unhook('client.unsubscribe', {?MODULE, on_client_unsubscribe}),
+  emqx:unhook('session.created', {?MODULE, on_session_created}),
+  emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}),
+  emqx:unhook('session.unsubscribed', {?MODULE, on_session_unsubscribed}),
+  emqx:unhook('session.resumed', {?MODULE, on_session_resumed}),
+  emqx:unhook('session.discarded', {?MODULE, on_session_discarded}),
+  emqx:unhook('session.takeovered', {?MODULE, on_session_takeovered}),
+  emqx:unhook('session.terminated', {?MODULE, on_session_terminated}),
+  emqx:unhook('message.publish', {?MODULE, on_message_publish}),
+  emqx:unhook('message.delivered', {?MODULE, on_message_delivered}),
+  emqx:unhook('message.acked', {?MODULE, on_message_acked}),
+  emqx:unhook('message.dropped', {?MODULE, on_message_dropped}).
 
 
 produce_kafka_payload(Message) ->
 produce_kafka_payload(Message) ->
-    Topic = ekaf_get_topic(),
-    {ok, MessageBody} = emqx_json:safe_encode(Message),
-    % io:format("Message = ~s~n",[MessageBody]),
-    Payload = iolist_to_binary(MessageBody),
-    ekaf:produce_async_batched(Topic, Payload).
-
-ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
-    inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
+  Topic = ekaf_get_topic(),
+  {ok, MessageBody} = emqx_json:safe_encode(Message),
+  % io:format("Message = ~s~n",[MessageBody]),
+  Payload = iolist_to_binary(MessageBody),
+  ekaf:produce_async_batched(Topic, Payload).
+
+ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
+  inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
 ntoa(IP) ->
 ntoa(IP) ->
-    inet_parse:ntoa(IP).
+  inet_parse:ntoa(IP).
 now_secs({MegaSecs, Secs, _MicroSecs}) ->
 now_secs({MegaSecs, Secs, _MicroSecs}) ->
-    MegaSecs * 1000000 + Secs.
+  MegaSecs * 1000000 + Secs.