|
@@ -261,15 +261,18 @@ on_session_terminated(_ClientInfo = #{clientid := ClientId}, Reason, SessInfo, _
|
|
|
ok.
|
|
|
|
|
|
kafka_init(_Env) ->
|
|
|
- io:format("Init emqx plugin kafka....."),
|
|
|
+ ?LOG_INFO("Start to init emqx plugin kafka.....~n"),
|
|
|
{ok, AddressList} = application:get_env(emqx_plugin_kafka, kafka_address_list),
|
|
|
- ?LOG_INFO("[KAFKA PLUGIN]KafkaAddressList = ~s~n", [AddressList]),
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]KafkaAddressList = ~p~n", [AddressList]),
|
|
|
{ok, KafkaConfig} = application:get_env(emqx_plugin_kafka, kafka_config),
|
|
|
- ?LOG_INFO("[KAFKA PLUGIN]KafkaConfig = ~s~n", [KafkaConfig]),
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]KafkaConfig = ~p~n", [KafkaConfig]),
|
|
|
{ok, KafkaTopic} = application:get_env(emqx_plugin_kafka, topic),
|
|
|
?LOG_INFO("[KAFKA PLUGIN]KafkaTopic = ~s~n", [KafkaTopic]),
|
|
|
+ {ok, _} = application:ensure_all_started(brod),
|
|
|
ok = brod:start_client(AddressList, emqx_repost_worker, KafkaConfig),
|
|
|
- ok = brod:start_producer(emqx_repost_worker, KafkaTopic, []).
|
|
|
+ ok = brod:start_producer(emqx_repost_worker, KafkaTopic, []),
|
|
|
+ ?LOG_INFO("Init emqx plugin kafka successfully.....~n"),
|
|
|
+ ok.
|
|
|
|
|
|
get_kafka_topic() ->
|
|
|
{ok, Topic} = application:get_env(emqx_plugin_kafka, topic),
|