|
|
@@ -164,7 +164,7 @@ init({Id, Index, Opts}) ->
|
|
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
|
|
|
emqx_resource_metrics:inflight_set(Id, Index, 0),
|
|
|
InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
|
|
|
- {ok, InflightTID} = inflight_new(InfltWinSZ, Id, Index),
|
|
|
+ InflightTID = inflight_new(InfltWinSZ, Id, Index),
|
|
|
HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
|
|
|
St = #{
|
|
|
id => Id,
|
|
|
@@ -765,7 +765,7 @@ inflight_new(InfltWinSZ, Id, Index) ->
|
|
|
%% we use this counter because we might deal with batches as
|
|
|
%% elements.
|
|
|
inflight_append(TableId, ?SIZE_REF, 0, Id, Index),
|
|
|
- {ok, TableId}.
|
|
|
+ TableId.
|
|
|
|
|
|
inflight_get_first(InflightTID) ->
|
|
|
case ets:next(InflightTID, ?MAX_SIZE_REF) of
|