Explorar el Código

- 功能说明/问题现象: 将kafka的依赖库从ekaf替换为brod,修改初始化逻辑
- 代码提交人: ultrakid
- 自测报告:
- 软件开发号:
- 来源项目编号/版本简称:
- 问题分类:
- 方案/解决办法:
- 有无兼容性问题:
- 有无配合更新模块:
- 需求路径:
- BUG路径:
- 设计文档路径:
- 建议:
- 代码审核人:

Ultrakid hace 3 años
padre
commit
a574c3f1ad
Se han modificado 1 ficheros con 7 adiciones y 4 borrados
  1. 7 4
      src/emqx_plugin_kafka.erl

+ 7 - 4
src/emqx_plugin_kafka.erl

@@ -261,15 +261,18 @@ on_session_terminated(_ClientInfo = #{clientid := ClientId}, Reason, SessInfo, _
   ok.
 
 kafka_init(_Env) ->
-  io:format("Init emqx plugin kafka....."),
+  ?LOG_INFO("Start to init emqx plugin kafka.....~n"),
   {ok, AddressList} = application:get_env(emqx_plugin_kafka, kafka_address_list),
-  ?LOG_INFO("[KAFKA PLUGIN]KafkaAddressList = ~s~n", [AddressList]),
+  ?LOG_INFO("[KAFKA PLUGIN]KafkaAddressList = ~p~n", [AddressList]),
   {ok, KafkaConfig} = application:get_env(emqx_plugin_kafka, kafka_config),
-  ?LOG_INFO("[KAFKA PLUGIN]KafkaConfig = ~s~n", [KafkaConfig]),
+  ?LOG_INFO("[KAFKA PLUGIN]KafkaConfig = ~p~n", [KafkaConfig]),
   {ok, KafkaTopic} = application:get_env(emqx_plugin_kafka, topic),
   ?LOG_INFO("[KAFKA PLUGIN]KafkaTopic = ~s~n", [KafkaTopic]),
+  {ok, _} = application:ensure_all_started(brod),
   ok = brod:start_client(AddressList, emqx_repost_worker, KafkaConfig),
-  ok = brod:start_producer(emqx_repost_worker, KafkaTopic, []).
+  ok = brod:start_producer(emqx_repost_worker, KafkaTopic, []),
+  ?LOG_INFO("Init emqx plugin kafka successfully.....~n"),
+  ok.
 
 get_kafka_topic() ->
   {ok, Topic} = application:get_env(emqx_plugin_kafka, topic),