emqx_batch.erl 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. %% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
  2. %%
  3. %% Licensed under the Apache License, Version 2.0 (the "License");
  4. %% you may not use this file except in compliance with the License.
  5. %% You may obtain a copy of the License at
  6. %%
  7. %% http://www.apache.org/licenses/LICENSE-2.0
  8. %%
  9. %% Unless required by applicable law or agreed to in writing, software
  10. %% distributed under the License is distributed on an "AS IS" BASIS,
  11. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. %% See the License for the specific language governing permissions and
  13. %% limitations under the License.
  14. -module(emqx_batch).
  15. %% APIs
  16. -export([ init/1
  17. , push/2
  18. , commit/1
  19. , size/1
  20. , items/1
  21. ]).
  22. -record(batch,
  23. { batch_size :: non_neg_integer()
  24. , batch_q :: list(any())
  25. , linger_ms :: pos_integer()
  26. , linger_timer :: reference() | undefined
  27. , commit_fun :: function()
  28. }).
  29. -type(options() ::
  30. #{ batch_size => non_neg_integer()
  31. , linger_ms => pos_integer()
  32. , commit_fun := function()
  33. }).
  34. -opaque(batch() :: #batch{}).
  35. -export_type([options/0]).
  36. -export_type([batch/0]).
  37. %%------------------------------------------------------------------------------
  38. %% APIs
  39. %%------------------------------------------------------------------------------
  40. -spec(init(options()) -> batch()).
  41. init(Opts) when is_map(Opts) ->
  42. #batch{batch_size = maps:get(batch_size, Opts, 1000),
  43. batch_q = [],
  44. linger_ms = maps:get(linger_ms, Opts, 1000),
  45. commit_fun = maps:get(commit_fun, Opts)}.
  46. -spec(push(any(), batch()) -> batch()).
  47. push(El, Batch = #batch{batch_q = Q, linger_ms = Ms, linger_timer = undefined}) when length(Q) == 0 ->
  48. Batch#batch{batch_q = [El], linger_timer = erlang:send_after(Ms, self(), batch_linger_expired)};
  49. %% no limit.
  50. push(El, Batch = #batch{batch_size = 0, batch_q = Q}) ->
  51. Batch#batch{batch_q = [El|Q]};
  52. push(El, Batch = #batch{batch_size = MaxSize, batch_q = Q}) when length(Q) >= MaxSize ->
  53. commit(Batch#batch{batch_q = [El|Q]});
  54. push(El, Batch = #batch{batch_q = Q}) ->
  55. Batch#batch{batch_q = [El|Q]}.
  56. -spec(commit(batch()) -> batch()).
  57. commit(Batch = #batch{batch_q = Q, commit_fun = Commit}) ->
  58. _ = Commit(lists:reverse(Q)),
  59. reset(Batch).
  60. reset(Batch = #batch{linger_timer = TRef}) ->
  61. _ = emqx_misc:cancel_timer(TRef),
  62. Batch#batch{batch_q = [], linger_timer = undefined}.
  63. -spec(size(batch()) -> non_neg_integer()).
  64. size(#batch{batch_q = Q}) ->
  65. length(Q).
  66. -spec(items(batch()) -> list(any())).
  67. items(#batch{batch_q = Q}) ->
  68. lists:reverse(Q).