|
|
@@ -129,7 +129,7 @@ on_start(InstanceId, Config) ->
|
|
|
}),
|
|
|
throw(failed_to_start_kafka_client)
|
|
|
end,
|
|
|
- start_subscriber(Config, InstanceId, ClientID).
|
|
|
+ start_consumer(Config, InstanceId, ClientID).
|
|
|
|
|
|
-spec on_stop(manager_id(), state()) -> ok.
|
|
|
on_stop(_InstanceID, State) ->
|
|
|
@@ -169,44 +169,45 @@ handle_message(Message, State) ->
|
|
|
?tp_span(
|
|
|
kafka_consumer_handle_message,
|
|
|
#{message => Message, state => State},
|
|
|
- begin
|
|
|
- #{
|
|
|
- resource_id := ResourceId,
|
|
|
- hookpoint := Hookpoint,
|
|
|
- kafka_topic := KafkaTopic,
|
|
|
- mqtt := #{
|
|
|
- topic := MQTTTopic,
|
|
|
- payload := MQTTPayload,
|
|
|
- qos := MQTTQoS
|
|
|
- }
|
|
|
- } = State,
|
|
|
- FullMessage = #{
|
|
|
- offset => Message#kafka_message.offset,
|
|
|
- key => Message#kafka_message.key,
|
|
|
- value => Message#kafka_message.value,
|
|
|
- ts => Message#kafka_message.ts,
|
|
|
- ts_type => Message#kafka_message.ts_type,
|
|
|
- headers => maps:from_list(Message#kafka_message.headers),
|
|
|
- topic => KafkaTopic
|
|
|
- },
|
|
|
- Payload =
|
|
|
- case MQTTPayload of
|
|
|
- full_message ->
|
|
|
- FullMessage;
|
|
|
- message_value ->
|
|
|
- Message#kafka_message.value
|
|
|
- end,
|
|
|
- EncodedPayload = emqx_json:encode(Payload),
|
|
|
- MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, EncodedPayload),
|
|
|
- _ = emqx:publish(MQTTMessage),
|
|
|
- emqx:run_hook(Hookpoint, [FullMessage]),
|
|
|
- emqx_resource_metrics:received_inc(ResourceId),
|
|
|
- %% note: just `ack' does not commit the offset to the
|
|
|
- %% kafka consumer group.
|
|
|
- {ok, commit, State}
|
|
|
- end
|
|
|
+ do_handle_message(Message, State)
|
|
|
).
|
|
|
|
|
|
+do_handle_message(Message, State) ->
|
|
|
+ #{
|
|
|
+ resource_id := ResourceId,
|
|
|
+ hookpoint := Hookpoint,
|
|
|
+ kafka_topic := KafkaTopic,
|
|
|
+ mqtt := #{
|
|
|
+ topic := MQTTTopic,
|
|
|
+ payload := MQTTPayload,
|
|
|
+ qos := MQTTQoS
|
|
|
+ }
|
|
|
+ } = State,
|
|
|
+ FullMessage = #{
|
|
|
+ offset => Message#kafka_message.offset,
|
|
|
+ key => Message#kafka_message.key,
|
|
|
+ value => Message#kafka_message.value,
|
|
|
+ ts => Message#kafka_message.ts,
|
|
|
+ ts_type => Message#kafka_message.ts_type,
|
|
|
+ headers => maps:from_list(Message#kafka_message.headers),
|
|
|
+ topic => KafkaTopic
|
|
|
+ },
|
|
|
+ Payload =
|
|
|
+ case MQTTPayload of
|
|
|
+ full_message ->
|
|
|
+ FullMessage;
|
|
|
+ message_value ->
|
|
|
+ Message#kafka_message.value
|
|
|
+ end,
|
|
|
+ EncodedPayload = emqx_json:encode(Payload),
|
|
|
+ MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, EncodedPayload),
|
|
|
+ _ = emqx:publish(MQTTMessage),
|
|
|
+ emqx:run_hook(Hookpoint, [FullMessage]),
|
|
|
+ emqx_resource_metrics:received_inc(ResourceId),
|
|
|
+ %% note: just `ack' does not commit the offset to the
|
|
|
+ %% kafka consumer group.
|
|
|
+ {ok, commit, State}.
|
|
|
+
|
|
|
%%-------------------------------------------------------------------------------------
|
|
|
%% Helper fns
|
|
|
%%-------------------------------------------------------------------------------------
|
|
|
@@ -241,8 +242,8 @@ ensure_consumer_supervisor_started() ->
|
|
|
ok
|
|
|
end.
|
|
|
|
|
|
--spec start_subscriber(config(), manager_id(), brod:client_id()) -> {ok, state()}.
|
|
|
-start_subscriber(Config, InstanceId, ClientID) ->
|
|
|
+-spec start_consumer(config(), manager_id(), brod:client_id()) -> {ok, state()}.
|
|
|
+start_consumer(Config, InstanceId, ClientID) ->
|
|
|
#{
|
|
|
bootstrap_hosts := BootstrapHosts0,
|
|
|
bridge_name := BridgeName,
|
|
|
@@ -256,7 +257,7 @@ start_subscriber(Config, InstanceId, ClientID) ->
|
|
|
},
|
|
|
mqtt := #{topic := MQTTTopic, qos := MQTTQoS, payload := MQTTPayload}
|
|
|
} = Config,
|
|
|
- ensure_consumer_supervisor_started(),
|
|
|
+ ok = ensure_consumer_supervisor_started(),
|
|
|
InitialState = #{
|
|
|
resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName),
|
|
|
mqtt => #{
|