|
@@ -137,7 +137,7 @@ pick(Topic) ->
|
|
|
init([Pool, Id]) ->
|
|
init([Pool, Id]) ->
|
|
|
rand:seed(exsplus, erlang:timestamp()),
|
|
rand:seed(exsplus, erlang:timestamp()),
|
|
|
gproc_pool:connect_worker(Pool, {Pool, Id}),
|
|
gproc_pool:connect_worker(Pool, {Pool, Id}),
|
|
|
- Batch = #batch{enabled = emqx_config:get_env(route_batch_delete, false),
|
|
|
|
|
|
|
+ Batch = #batch{enabled = emqx_config:get_env(route_batch_clean, false),
|
|
|
pending = sets:new()},
|
|
pending = sets:new()},
|
|
|
{ok, ensure_batch_timer(#state{pool = Pool, id = Id, batch = Batch})}.
|
|
{ok, ensure_batch_timer(#state{pool = Pool, id = Id, batch = Batch})}.
|
|
|
|
|
|
|
@@ -191,7 +191,7 @@ handle_cast(Msg, State) ->
|
|
|
{noreply, State}.
|
|
{noreply, State}.
|
|
|
|
|
|
|
|
handle_info({timeout, _TRef, batch_delete}, State = #state{batch = Batch}) ->
|
|
handle_info({timeout, _TRef, batch_delete}, State = #state{batch = Batch}) ->
|
|
|
- _ = del_direct_routes(Batch#batch.pending),
|
|
|
|
|
|
|
+ _ = del_direct_routes(sets:to_list(Batch#batch.pending)),
|
|
|
{noreply, ensure_batch_timer(State#state{batch = ?BATCH(true, sets:new())}), hibernate};
|
|
{noreply, ensure_batch_timer(State#state{batch = ?BATCH(true, sets:new())}), hibernate};
|
|
|
|
|
|
|
|
handle_info(Info, State) ->
|
|
handle_info(Info, State) ->
|