|
|
@@ -539,7 +539,7 @@ check_topic_and_leader_connections(ClientId, KafkaTopic) ->
|
|
|
kafka_client => ClientId,
|
|
|
kafka_topic => KafkaTopic
|
|
|
});
|
|
|
- {error, restarting} ->
|
|
|
+ {error, client_supervisor_not_initialized} ->
|
|
|
throw(#{
|
|
|
reason => restarting,
|
|
|
kafka_client => ClientId,
|
|
|
@@ -620,16 +620,19 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
|
|
|
partition_count_refresh_interval := PCntRefreshInterval,
|
|
|
max_inflight := MaxInflight,
|
|
|
buffer := #{
|
|
|
- mode := BufferMode,
|
|
|
+ mode := BufferMode0,
|
|
|
per_partition_limit := PerPartitionLimit,
|
|
|
segment_bytes := SegmentBytes,
|
|
|
- memory_overload_protection := MemOLP0
|
|
|
+ memory_overload_protection := MemOLP
|
|
|
}
|
|
|
} = Input,
|
|
|
- MemOLP =
|
|
|
- case os:type() of
|
|
|
- {unix, linux} -> MemOLP0;
|
|
|
- _ -> false
|
|
|
+ %% avoid creating dirs for probing producers
|
|
|
+ BufferMode =
|
|
|
+ case IsDryRun of
|
|
|
+ true ->
|
|
|
+ memory;
|
|
|
+ false ->
|
|
|
+ BufferMode0
|
|
|
end,
|
|
|
{OffloadMode, ReplayqDir} =
|
|
|
case BufferMode of
|
|
|
@@ -638,7 +641,6 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
|
|
|
hybrid -> {true, replayq_dir(BridgeType, BridgeName)}
|
|
|
end,
|
|
|
#{
|
|
|
- name => make_producer_name(BridgeType, BridgeName, IsDryRun),
|
|
|
partitioner => partitioner(PartitionStrategy),
|
|
|
partition_count_refresh_interval_seconds => PCntRefreshInterval,
|
|
|
replayq_dir => ReplayqDir,
|
|
|
@@ -669,18 +671,6 @@ replayq_dir(BridgeType, BridgeName) ->
|
|
|
]),
|
|
|
filename:join([emqx:data_dir(), "kafka", DirName]).
|
|
|
|
|
|
-%% Producer name must be an atom which will be used as a ETS table name for
|
|
|
-%% partition worker lookup.
|
|
|
-make_producer_name(_BridgeType, _BridgeName, true = _IsDryRun) ->
|
|
|
- %% It is a dry run and we don't want to leak too many atoms
|
|
|
- %% so we use the default producer name instead of creating
|
|
|
- %% an unique name.
|
|
|
- probing_wolff_producers;
|
|
|
-make_producer_name(BridgeType, BridgeName, _IsDryRun) ->
|
|
|
- %% Woff needs an atom for ets table name registration. The assumption here is
|
|
|
- %% that bridges with new names are not often created.
|
|
|
- binary_to_atom(iolist_to_binary([BridgeType, "_", bin(BridgeName)])).
|
|
|
-
|
|
|
with_log_at_error(Fun, Log) ->
|
|
|
try
|
|
|
Fun()
|