|
@@ -58,7 +58,7 @@
|
|
|
|
|
|
%% Called when the plugin application start
|
|
%% Called when the plugin application start
|
|
load(Env) ->
|
|
load(Env) ->
|
|
- ekaf_init([Env]),
|
|
|
|
|
|
+ kafka_init([Env]),
|
|
emqx:hook('client.connect', {?MODULE, on_client_connect, [Env]}),
|
|
emqx:hook('client.connect', {?MODULE, on_client_connect, [Env]}),
|
|
emqx:hook('client.connack', {?MODULE, on_client_connack, [Env]}),
|
|
emqx:hook('client.connack', {?MODULE, on_client_connack, [Env]}),
|
|
emqx:hook('client.connected', {?MODULE, on_client_connected, [Env]}),
|
|
emqx:hook('client.connected', {?MODULE, on_client_connected, [Env]}),
|
|
@@ -104,10 +104,10 @@ on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, _Env) ->
|
|
{ipaddress, iolist_to_binary(ntoa(IpAddr))},
|
|
{ipaddress, iolist_to_binary(ntoa(IpAddr))},
|
|
{proto_name, maps:get(proto_name, ConnInfo)},
|
|
{proto_name, maps:get(proto_name, ConnInfo)},
|
|
{proto_ver, maps:get(proto_ver, ConnInfo)},
|
|
{proto_ver, maps:get(proto_ver, ConnInfo)},
|
|
- {timestamp, Now},
|
|
|
|
|
|
+ {ts, Now},
|
|
{online, Online}
|
|
{online, Online}
|
|
],
|
|
],
|
|
- produce_kafka_payload(Payload),
|
|
|
|
|
|
+ produce_kafka_payload(ClientId, Payload),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
on_client_disconnected(ClientInfo = #{clientid := ClientId}, ReasonCode, ConnInfo, _Env) ->
|
|
on_client_disconnected(ClientInfo = #{clientid := ClientId}, ReasonCode, ConnInfo, _Env) ->
|
|
@@ -121,10 +121,10 @@ on_client_disconnected(ClientInfo = #{clientid := ClientId}, ReasonCode, ConnInf
|
|
{device_id, ClientId},
|
|
{device_id, ClientId},
|
|
{username, maps:get(username, ClientInfo)},
|
|
{username, maps:get(username, ClientInfo)},
|
|
{reason, ReasonCode},
|
|
{reason, ReasonCode},
|
|
- {timestamp, Now},
|
|
|
|
|
|
+ {ts, Now},
|
|
{online, Online}
|
|
{online, Online}
|
|
],
|
|
],
|
|
- produce_kafka_payload(Payload),
|
|
|
|
|
|
+ produce_kafka_payload(ClientId, Payload),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
on_client_authenticate(_ClientInfo = #{clientid := ClientId}, Result, _Env) ->
|
|
on_client_authenticate(_ClientInfo = #{clientid := ClientId}, Result, _Env) ->
|
|
@@ -148,9 +148,9 @@ on_client_subscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
|
|
{action, Action},
|
|
{action, Action},
|
|
{topic, Topic},
|
|
{topic, Topic},
|
|
{qos, maps:get(qos, Qos)},
|
|
{qos, maps:get(qos, Qos)},
|
|
- {timestamp, Now}
|
|
|
|
|
|
+ {ts, Now}
|
|
],
|
|
],
|
|
- produce_kafka_payload(Payload),
|
|
|
|
|
|
+ produce_kafka_payload(ClientId, Payload),
|
|
ok.
|
|
ok.
|
|
%%---------------------client subscribe stop----------------------%%
|
|
%%---------------------client subscribe stop----------------------%%
|
|
on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
|
|
on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
|
|
@@ -162,9 +162,9 @@ on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env)
|
|
{device_id, ClientId},
|
|
{device_id, ClientId},
|
|
{action, Action},
|
|
{action, Action},
|
|
{topic, Topic},
|
|
{topic, Topic},
|
|
- {timestamp, Now}
|
|
|
|
|
|
+ {ts, Now}
|
|
],
|
|
],
|
|
- produce_kafka_payload(Payload),
|
|
|
|
|
|
+ produce_kafka_payload(ClientId, Payload),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason, _Env) ->
|
|
on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason, _Env) ->
|
|
@@ -179,8 +179,8 @@ on_message_dropped(Message, _By = #{node := Node}, Reason, _Env) ->
|
|
on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
|
|
on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
|
|
ok;
|
|
ok;
|
|
on_message_publish(Message, _Env) ->
|
|
on_message_publish(Message, _Env) ->
|
|
- {ok, Payload} = format_payload(Message),
|
|
|
|
- produce_kafka_payload(Payload),
|
|
|
|
|
|
+ {ok, ClientId, Payload} = format_payload(Message),
|
|
|
|
+ produce_kafka_payload(ClientId, Payload),
|
|
ok.
|
|
ok.
|
|
%%---------------------message publish stop----------------------%%
|
|
%%---------------------message publish stop----------------------%%
|
|
|
|
|
|
@@ -202,7 +202,7 @@ on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
|
|
{cluster_node, node()},
|
|
{cluster_node, node()},
|
|
{ts, Timestamp}
|
|
{ts, Timestamp}
|
|
],
|
|
],
|
|
- produce_kafka_payload(Content),
|
|
|
|
|
|
+ produce_kafka_payload(ClientId, Content),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
|
|
on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
|
|
@@ -223,7 +223,7 @@ on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
|
|
{cluster_node, node()},
|
|
{cluster_node, node()},
|
|
{ts, Timestamp}
|
|
{ts, Timestamp}
|
|
],
|
|
],
|
|
- produce_kafka_payload(Content),
|
|
|
|
|
|
+ produce_kafka_payload(ClientId, Content),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
@@ -260,30 +260,19 @@ on_session_terminated(_ClientInfo = #{clientid := ClientId}, Reason, SessInfo, _
|
|
[ClientId, Reason, SessInfo]),
|
|
[ClientId, Reason, SessInfo]),
|
|
ok.
|
|
ok.
|
|
|
|
|
|
-ekaf_init(_Env) ->
|
|
|
|
|
|
+kafka_init(_Env) ->
|
|
io:format("Init emqx plugin kafka....."),
|
|
io:format("Init emqx plugin kafka....."),
|
|
- {ok, BrokerValues} = application:get_env(emqx_plugin_kafka, broker),
|
|
|
|
- KafkaHost = proplists:get_value(host, BrokerValues),
|
|
|
|
- ?LOG_INFO("[KAFKA PLUGIN]KafkaHost = ~s~n", [KafkaHost]),
|
|
|
|
- KafkaPort = proplists:get_value(port, BrokerValues),
|
|
|
|
- ?LOG_INFO("[KAFKA PLUGIN]KafkaPort = ~s~n", [KafkaPort]),
|
|
|
|
- KafkaPartitionStrategy = proplists:get_value(partitionstrategy, BrokerValues),
|
|
|
|
- KafkaPartitionWorkers = proplists:get_value(partitionworkers, BrokerValues),
|
|
|
|
- KafkaTopic = proplists:get_value(payloadtopic, BrokerValues),
|
|
|
|
|
|
+ {ok, AddressList} = application:get_env(emqx_plugin_kafka, kafka_address_list),
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]KafkaAddressList = ~s~n", [AddressList]),
|
|
|
|
+ {ok, KafkaConfig} = application:get_env(emqx_plugin_kafka, kafka_config),
|
|
|
|
+ ?LOG_INFO("[KAFKA PLUGIN]KafkaConfig = ~s~n", [KafkaConfig]),
|
|
|
|
+ {ok, KafkaTopic} = application:get_env(emqx_plugin_kafka, topic),
|
|
?LOG_INFO("[KAFKA PLUGIN]KafkaTopic = ~s~n", [KafkaTopic]),
|
|
?LOG_INFO("[KAFKA PLUGIN]KafkaTopic = ~s~n", [KafkaTopic]),
|
|
- application:set_env(ekaf, ekaf_bootstrap_broker, {KafkaHost, list_to_integer(KafkaPort)}),
|
|
|
|
- application:set_env(ekaf, ekaf_partition_strategy, list_to_atom(KafkaPartitionStrategy)),
|
|
|
|
- application:set_env(ekaf, ekaf_per_partition_workers, KafkaPartitionWorkers),
|
|
|
|
- application:set_env(ekaf, ekaf_bootstrap_topics, list_to_binary(KafkaTopic)),
|
|
|
|
- application:set_env(ekaf, ekaf_buffer_ttl, 10),
|
|
|
|
- application:set_env(ekaf, ekaf_max_downtime_buffer_size, 5),
|
|
|
|
- % {ok, _} = application:ensure_all_started(kafkamocker),
|
|
|
|
- {ok, _} = application:ensure_all_started(gproc),
|
|
|
|
- % {ok, _} = application:ensure_all_started(ranch),
|
|
|
|
- {ok, _} = application:ensure_all_started(ekaf).
|
|
|
|
-
|
|
|
|
-ekaf_get_topic() ->
|
|
|
|
- {ok, Topic} = application:get_env(ekaf, ekaf_bootstrap_topics),
|
|
|
|
|
|
+ ok = brod:start_client(AddressList, emqx_repost_worker, KafkaConfig),
|
|
|
|
+ ok = brod:start_producer(emqx_repost_worker, KafkaTopic, []).
|
|
|
|
+
|
|
|
|
+get_kafka_topic() ->
|
|
|
|
+ {ok, Topic} = application:get_env(emqx_plugin_kafka, topic),
|
|
Topic.
|
|
Topic.
|
|
|
|
|
|
|
|
|
|
@@ -293,7 +282,7 @@ format_payload(Message) ->
|
|
Tail = string:right(binary_to_list(Topic), 4),
|
|
Tail = string:right(binary_to_list(Topic), 4),
|
|
RawType = string:equal(Tail, <<"_raw">>),
|
|
RawType = string:equal(Tail, <<"_raw">>),
|
|
% ?LOG_INFO("[KAFKA PLUGIN]Tail= ~s , RawType= ~s~n",[Tail,RawType]),
|
|
% ?LOG_INFO("[KAFKA PLUGIN]Tail= ~s , RawType= ~s~n",[Tail,RawType]),
|
|
-
|
|
|
|
|
|
+ ClientId = Message#message.from,
|
|
MsgPayload = Message#message.payload,
|
|
MsgPayload = Message#message.payload,
|
|
% ?LOG_INFO("[KAFKA PLUGIN]MsgPayload : ~s~n", [MsgPayload]),
|
|
% ?LOG_INFO("[KAFKA PLUGIN]MsgPayload : ~s~n", [MsgPayload]),
|
|
if
|
|
if
|
|
@@ -304,13 +293,13 @@ format_payload(Message) ->
|
|
MsgPayload64 = MsgPayload
|
|
MsgPayload64 = MsgPayload
|
|
end,
|
|
end,
|
|
Payload = [{action, message_publish},
|
|
Payload = [{action, message_publish},
|
|
- {device_id, Message#message.from},
|
|
|
|
|
|
+ {device_id, ClientId},
|
|
{username, Username},
|
|
{username, Username},
|
|
{topic, Topic},
|
|
{topic, Topic},
|
|
{payload, MsgPayload64},
|
|
{payload, MsgPayload64},
|
|
{ts, Message#message.timestamp}],
|
|
{ts, Message#message.timestamp}],
|
|
|
|
|
|
- {ok, Payload}.
|
|
|
|
|
|
+ {ok, ClientId, Payload}.
|
|
|
|
|
|
|
|
|
|
%% Called when the plugin application stop
|
|
%% Called when the plugin application stop
|
|
@@ -335,12 +324,12 @@ unload() ->
|
|
emqx:unhook('message.acked', {?MODULE, on_message_acked}),
|
|
emqx:unhook('message.acked', {?MODULE, on_message_acked}),
|
|
emqx:unhook('message.dropped', {?MODULE, on_message_dropped}).
|
|
emqx:unhook('message.dropped', {?MODULE, on_message_dropped}).
|
|
|
|
|
|
-produce_kafka_payload(Message) ->
|
|
|
|
- Topic = ekaf_get_topic(),
|
|
|
|
|
|
+produce_kafka_payload(Key, Message) ->
|
|
|
|
+ Topic = get_kafka_topic(),
|
|
{ok, MessageBody} = emqx_json:safe_encode(Message),
|
|
{ok, MessageBody} = emqx_json:safe_encode(Message),
|
|
% ?LOG_INFO("[KAFKA PLUGIN]Message = ~s~n",[MessageBody]),
|
|
% ?LOG_INFO("[KAFKA PLUGIN]Message = ~s~n",[MessageBody]),
|
|
Payload = iolist_to_binary(MessageBody),
|
|
Payload = iolist_to_binary(MessageBody),
|
|
- ekaf:produce_async_batched(Topic, Payload).
|
|
|
|
|
|
+ {ok, _} = brod:produce(emqx_repost_worker, Topic, 0, Key, Payload).
|
|
|
|
|
|
ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
|
|
ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
|
|
inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
|
|
inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
|