|
|
@@ -63,6 +63,11 @@ tr_config(_Key, Value) ->
|
|
|
|
|
|
%% @doc Config schema is defined in emqx_bridge_kafka.
|
|
|
on_start(InstId, Config) ->
|
|
|
+ ?SLOG(debug, #{
|
|
|
+ msg => "kafka_client_starting",
|
|
|
+ instance_id => InstId,
|
|
|
+ config => emqx_utils:redact(Config)
|
|
|
+ }),
|
|
|
C = fun(Key) -> check_config(Key, Config) end,
|
|
|
Hosts = C(bootstrap_hosts),
|
|
|
ClientConfig = #{
|
|
|
@@ -74,36 +79,8 @@ on_start(InstId, Config) ->
|
|
|
ssl => C(ssl)
|
|
|
},
|
|
|
ClientId = InstId,
|
|
|
- ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId),
|
|
|
- case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
|
|
|
- {ok, _} ->
|
|
|
- case wolff_client_sup:find_client(ClientId) of
|
|
|
- {ok, Pid} ->
|
|
|
- case wolff_client:check_connectivity(Pid) of
|
|
|
- ok ->
|
|
|
- ok;
|
|
|
- {error, Error} ->
|
|
|
- deallocate_client(ClientId),
|
|
|
- throw({failed_to_connect, Error})
|
|
|
- end;
|
|
|
- {error, Reason} ->
|
|
|
- deallocate_client(ClientId),
|
|
|
- throw({failed_to_find_created_client, Reason})
|
|
|
- end,
|
|
|
- ?SLOG(info, #{
|
|
|
- msg => "kafka_client_started",
|
|
|
- instance_id => InstId,
|
|
|
- kafka_hosts => Hosts
|
|
|
- });
|
|
|
- {error, Reason} ->
|
|
|
- ?SLOG(error, #{
|
|
|
- msg => failed_to_start_kafka_client,
|
|
|
- instance_id => InstId,
|
|
|
- kafka_hosts => Hosts,
|
|
|
- reason => Reason
|
|
|
- }),
|
|
|
- throw(failed_to_start_kafka_client)
|
|
|
- end,
|
|
|
+ emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId),
|
|
|
+ ok = ensure_client(ClientId, Hosts, ClientConfig),
|
|
|
%% Check if this is a dry run
|
|
|
{ok, #{
|
|
|
client_id => ClientId,
|
|
|
@@ -156,7 +133,7 @@ create_producers_for_bridge_v2(
|
|
|
end,
|
|
|
ok = check_topic_and_leader_connections(ClientId, KafkaTopic),
|
|
|
WolffProducerConfig = producers_config(
|
|
|
- BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun, BridgeV2Id
|
|
|
+ BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id
|
|
|
),
|
|
|
case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
|
|
|
{ok, Producers} ->
|
|
|
@@ -215,6 +192,54 @@ on_stop(InstanceId, _State) ->
|
|
|
?tp(kafka_producer_stopped, #{instance_id => InstanceId}),
|
|
|
ok.
|
|
|
|
|
|
+ensure_client(ClientId, Hosts, ClientConfig) ->
|
|
|
+ case wolff_client_sup:find_client(ClientId) of
|
|
|
+ {ok, Pid} ->
|
|
|
+ case wolff_client:check_connectivity(Pid) of
|
|
|
+ ok ->
|
|
|
+ ok;
|
|
|
+ {error, Error} ->
|
|
|
+ deallocate_client(ClientId),
|
|
|
+ throw({failed_to_connect, Error})
|
|
|
+ end;
|
|
|
+ {error, no_such_client} ->
|
|
|
+ case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
|
|
|
+ {ok, _} ->
|
|
|
+ ok = ensure_connectivity(ClientId),
|
|
|
+ ?SLOG(info, #{
|
|
|
+ msg => "kafka_client_started",
|
|
|
+ client_id => ClientId,
|
|
|
+ kafka_hosts => Hosts
|
|
|
+ });
|
|
|
+ {error, Reason} ->
|
|
|
+ ?SLOG(error, #{
|
|
|
+ msg => failed_to_start_kafka_client,
|
|
|
+ client_id => ClientId,
|
|
|
+ kafka_hosts => Hosts,
|
|
|
+ reason => Reason
|
|
|
+ }),
|
|
|
+ throw(failed_to_start_kafka_client)
|
|
|
+ end;
|
|
|
+ {error, Reason} ->
|
|
|
+ deallocate_client(ClientId),
|
|
|
+ throw({failed_to_find_created_client, Reason})
|
|
|
+ end.
|
|
|
+
|
|
|
+ensure_connectivity(ClientId) ->
|
|
|
+ case wolff_client_sup:find_client(ClientId) of
|
|
|
+ {ok, Pid} ->
|
|
|
+ case wolff_client:check_connectivity(Pid) of
|
|
|
+ ok ->
|
|
|
+ ok;
|
|
|
+ {error, Error} ->
|
|
|
+ deallocate_client(ClientId),
|
|
|
+ throw({failed_to_connect, Error})
|
|
|
+ end;
|
|
|
+ {error, Reason} ->
|
|
|
+ deallocate_client(ClientId),
|
|
|
+ throw({failed_to_find_created_client, Reason})
|
|
|
+ end.
|
|
|
+
|
|
|
deallocate_client(ClientId) ->
|
|
|
_ = with_log_at_error(
|
|
|
fun() -> wolff:stop_and_delete_supervised_client(ClientId) end,
|
|
|
@@ -573,7 +598,7 @@ ssl(#{enable := true} = SSL) ->
|
|
|
ssl(_) ->
|
|
|
false.
|
|
|
|
|
|
-producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun, BridgeV2Id) ->
|
|
|
+producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
|
|
|
#{
|
|
|
max_batch_bytes := MaxBatchBytes,
|
|
|
compression := Compression,
|
|
|
@@ -596,8 +621,8 @@ producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun, BridgeV2Id)
|
|
|
{OffloadMode, ReplayqDir} =
|
|
|
case BufferMode of
|
|
|
memory -> {false, false};
|
|
|
- disk -> {false, replayq_dir(ClientId)};
|
|
|
- hybrid -> {true, replayq_dir(ClientId)}
|
|
|
+ disk -> {false, replayq_dir(BridgeType, BridgeName)};
|
|
|
+ hybrid -> {true, replayq_dir(BridgeType, BridgeName)}
|
|
|
end,
|
|
|
#{
|
|
|
name => make_producer_name(BridgeType, BridgeName, IsDryRun),
|
|
|
@@ -620,8 +645,11 @@ producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun, BridgeV2Id)
|
|
|
partitioner(random) -> random;
|
|
|
partitioner(key_dispatch) -> first_key_dispatch.
|
|
|
|
|
|
-replayq_dir(ClientId) ->
|
|
|
- filename:join([emqx:data_dir(), "kafka", ClientId]).
|
|
|
+replayq_dir(BridgeType, BridgeName) ->
|
|
|
+ DirName = iolist_to_binary([
|
|
|
+ emqx_bridge_lib:downgrade_type(BridgeType), ":", BridgeName, ":", atom_to_list(node())
|
|
|
+ ]),
|
|
|
+ filename:join([emqx:data_dir(), "kafka", DirName]).
|
|
|
|
|
|
%% Producer name must be an atom which will be used as a ETS table name for
|
|
|
%% partition worker lookup.
|