emqttd_mqueue_tests.erl 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqttd_mqueue_tests).
  17. -include("emqttd.hrl").
  18. -define(Q, emqttd_mqueue).
  19. -ifdef(TEST).
  20. -include_lib("eunit/include/eunit.hrl").
  21. in_test() ->
  22. Opts = [{max_length, 5},
  23. {queue_qos0, true}],
  24. Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()),
  25. ?assertEqual(true, ?Q:is_empty(Q)),
  26. Q1 = ?Q:in(#mqtt_message{}, Q),
  27. ?assertEqual(1, ?Q:len(Q1)),
  28. Q2 = ?Q:in(#mqtt_message{qos = 1}, Q1),
  29. ?assertEqual(2, ?Q:len(Q2)),
  30. Q3 = ?Q:in(#mqtt_message{qos = 2}, Q2),
  31. Q4 = ?Q:in(#mqtt_message{}, Q3),
  32. Q5 = ?Q:in(#mqtt_message{}, Q4),
  33. ?assertEqual(5, ?Q:len(Q5)).
  34. in_qos0_test() ->
  35. Opts = [{max_length, 5},
  36. {queue_qos0, false}],
  37. Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()),
  38. Q1 = ?Q:in(#mqtt_message{}, Q),
  39. ?assertEqual(true, ?Q:is_empty(Q1)),
  40. Q2 = ?Q:in(#mqtt_message{qos = 0}, Q1),
  41. ?assertEqual(true, ?Q:is_empty(Q2)).
  42. out_test() ->
  43. Opts = [{max_length, 5},
  44. {queue_qos0, true}],
  45. Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()),
  46. ?assertMatch({empty, Q}, ?Q:out(Q)),
  47. Q1 = ?Q:in(#mqtt_message{}, Q),
  48. {Value, Q2} = ?Q:out(Q1),
  49. ?assertEqual(0, ?Q:len(Q2)),
  50. ?assertMatch({value, #mqtt_message{}}, Value).
  51. simple_mqueue_test() ->
  52. Opts = [{type, simple},
  53. {max_length, 3},
  54. {low_watermark, 0.2},
  55. {high_watermark, 0.6},
  56. {queue_qos0, false}],
  57. Q = ?Q:new("simple_queue", Opts, alarm_fun()),
  58. ?assertEqual(simple, ?Q:type(Q)),
  59. ?assertEqual(3, ?Q:max_len(Q)),
  60. ?assertEqual(<<"simple_queue">>, ?Q:name(Q)),
  61. ?assert(?Q:is_empty(Q)),
  62. Q1 = ?Q:in(#mqtt_message{qos = 1, payload = <<"1">>}, Q),
  63. Q2 = ?Q:in(#mqtt_message{qos = 1, payload = <<"2">>}, Q1),
  64. Q3 = ?Q:in(#mqtt_message{qos = 1, payload = <<"3">>}, Q2),
  65. Q4 = ?Q:in(#mqtt_message{qos = 1, payload = <<"4">>}, Q3),
  66. ?assertEqual(3, ?Q:len(Q4)),
  67. {{value, Msg}, Q5} = ?Q:out(Q4),
  68. ?assertMatch(<<"2">>, Msg#mqtt_message.payload),
  69. ?assertEqual([{len, 2}, {max_len, 3}, {dropped, 1}], ?Q:stats(Q5)).
  70. infinity_simple_mqueue_test() ->
  71. Opts = [{type, simple},
  72. {max_length, infinity},
  73. {low_watermark, 0.2},
  74. {high_watermark, 0.6},
  75. {queue_qos0, false}],
  76. Q = ?Q:new("infinity_simple_queue", Opts, alarm_fun()),
  77. ?assert(?Q:is_empty(Q)),
  78. ?assertEqual(infinity, ?Q:max_len(Q)),
  79. Qx = lists:foldl(fun(I, AccQ) ->
  80. ?Q:in(#mqtt_message{qos = 1, payload = iolist_to_binary([I])}, AccQ)
  81. end, Q, lists:seq(1, 255)),
  82. ?assertEqual(255, ?Q:len(Qx)),
  83. ?assertEqual([{len, 255}, {max_len, infinity}, {dropped, 0}], ?Q:stats(Qx)),
  84. {{value, V}, Qy} = ?Q:out(Qx),
  85. ?assertEqual(<<1>>, V#mqtt_message.payload).
  86. priority_mqueue_test() ->
  87. Opts = [{type, priority},
  88. {priority, [{<<"t">>, 10}]},
  89. {max_length, 3},
  90. {low_watermark, 0.2},
  91. {high_watermark, 0.6},
  92. {queue_qos0, false}],
  93. Q = ?Q:new("priority_queue", Opts, alarm_fun()),
  94. ?assertEqual(priority, ?Q:type(Q)),
  95. ?assertEqual(3, ?Q:max_len(Q)),
  96. ?assertEqual(<<"priority_queue">>, ?Q:name(Q)),
  97. ?assert(?Q:is_empty(Q)),
  98. Q1 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q),
  99. Q2 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t">>}, Q1),
  100. Q3 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t2">>}, Q2),
  101. ?assertEqual(3, ?Q:len(Q3)),
  102. Q4 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q3),
  103. ?assertEqual(4, ?Q:len(Q4)),
  104. Q5 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q4),
  105. ?assertEqual(5, ?Q:len(Q5)),
  106. Q6 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q5),
  107. ?assertEqual(5, ?Q:len(Q6)),
  108. {{value, Msg}, Q7} = ?Q:out(Q6),
  109. ?assertMatch(<<"t">>, Msg#mqtt_message.topic).
  110. infinity_priority_mqueue_test() ->
  111. Opts = [{type, priority},
  112. {priority, [{<<"t1">>, 10}, {<<"t2">>, 8}]},
  113. {max_length, infinity},
  114. {queue_qos0, false}],
  115. Q = ?Q:new("infinity_priority_queue", Opts, alarm_fun()),
  116. ?assertEqual(infinity, ?Q:max_len(Q)),
  117. Qx = lists:foldl(fun(I, AccQ) ->
  118. AccQ1 =
  119. ?Q:in(#mqtt_message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ),
  120. ?Q:in(#mqtt_message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1)
  121. end, Q, lists:seq(1, 255)),
  122. ?assertEqual(510, ?Q:len(Qx)),
  123. ?assertEqual([{len, 510}, {max_len, infinity}, {dropped, 0}], ?Q:stats(Qx)).
  124. priority_mqueue2_test() ->
  125. Opts = [{type, priority},
  126. {max_length, 2},
  127. {low_watermark, 0.2},
  128. {high_watermark, 0.6},
  129. {queue_qos0, false}],
  130. Q = ?Q:new("priority_queue2_test", Opts, alarm_fun()),
  131. ?assertEqual(2, ?Q:max_len(Q)),
  132. Q1 = ?Q:in(#mqtt_message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q),
  133. Q2 = ?Q:in(#mqtt_message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1),
  134. Q3 = ?Q:in(#mqtt_message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2),
  135. Q4 = ?Q:in(#mqtt_message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3),
  136. ?assertEqual(4, ?Q:len(Q4)),
  137. {{value, Val}, Q5} = ?Q:out(Q4),
  138. ?debugFmt("Val: ~p~n", [Val]),
  139. ?assertEqual(3, ?Q:len(Q5)).
  140. alarm_fun() -> fun(_, _) -> alarm_fun() end.
  141. -endif.