|
|
@@ -19,87 +19,333 @@
|
|
|
-compile(export_all).
|
|
|
-compile(nowarn_export_all).
|
|
|
|
|
|
+-include("emqx_mqtt.hrl").
|
|
|
+-include_lib("proper/include/proper.hrl").
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
|
+-define(mock_modules,
|
|
|
+ [ emqx_metrics
|
|
|
+ , emqx_broker
|
|
|
+ , emqx_misc
|
|
|
+ , emqx_message
|
|
|
+ , emqx_hooks
|
|
|
+ , emqx_zone
|
|
|
+ , emqx_pd
|
|
|
+ ]).
|
|
|
+
|
|
|
all() -> emqx_ct:all(?MODULE).
|
|
|
|
|
|
-init_per_suite(Config) ->
|
|
|
- emqx_ct_helpers:start_apps([]),
|
|
|
- Config.
|
|
|
-
|
|
|
-end_per_suite(_Config) ->
|
|
|
- emqx_ct_helpers:stop_apps([]).
|
|
|
-
|
|
|
-t_info(_) ->
|
|
|
- 'TODO'.
|
|
|
-
|
|
|
-t_attrs(_) ->
|
|
|
- 'TODO'.
|
|
|
-
|
|
|
-t_stats(_) ->
|
|
|
- 'TODO'.
|
|
|
-
|
|
|
-t_subscribe(_) ->
|
|
|
- 'TODO'.
|
|
|
-
|
|
|
-t_unsubscribe(_) ->
|
|
|
- 'TODO'.
|
|
|
-
|
|
|
-t_publish(_) ->
|
|
|
- 'TODO'.
|
|
|
-
|
|
|
-t_puback(_) ->
|
|
|
- 'TODO'.
|
|
|
-
|
|
|
-t_pubrec(_) ->
|
|
|
- 'TODO'.
|
|
|
-
|
|
|
-t_pubrel(_) ->
|
|
|
- 'TODO'.
|
|
|
-
|
|
|
-t_pubcomp(_) ->
|
|
|
- 'TODO'.
|
|
|
-
|
|
|
-t_deliver(_) ->
|
|
|
- 'TODO'.
|
|
|
-
|
|
|
-t_timeout(_) ->
|
|
|
- 'TODO'.
|
|
|
-
|
|
|
-ignore_loop(_Config) ->
|
|
|
- emqx_zone:set_env(external, ignore_loop_deliver, true),
|
|
|
- {ok, Client} = emqx_client:start_link(),
|
|
|
- {ok, _} = emqx_client:connect(Client),
|
|
|
- TestTopic = <<"Self">>,
|
|
|
- {ok, _, [2]} = emqx_client:subscribe(Client, TestTopic, qos2),
|
|
|
- ok = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 0),
|
|
|
- {ok, _} = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 1),
|
|
|
- {ok, _} = emqx_client:publish(Client, TestTopic, <<"testmsg">>, 2),
|
|
|
- ?assertEqual(0, length(emqx_client_SUITE:receive_messages(3))),
|
|
|
- ok = emqx_client:disconnect(Client),
|
|
|
- emqx_zone:set_env(external, ignore_loop_deliver, false).
|
|
|
-
|
|
|
-session_all(_) ->
|
|
|
- emqx_zone:set_env(internal, idle_timeout, 1000),
|
|
|
- ClientId = <<"ClientId">>,
|
|
|
- {ok, ConnPid} = emqx_mock_client:start_link(ClientId),
|
|
|
- {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),
|
|
|
- Message1 = emqx_message:make(<<"ClientId">>, 2, <<"topic">>, <<"hello">>),
|
|
|
- emqx_session:subscribe(SPid, [{<<"topic">>, #{qos => 2}}]),
|
|
|
- emqx_session:subscribe(SPid, [{<<"topic">>, #{qos => 1}}]),
|
|
|
- timer:sleep(200),
|
|
|
- [{<<"topic">>, _}] = emqx:subscriptions(SPid),
|
|
|
- emqx_session:publish(SPid, 1, Message1),
|
|
|
- timer:sleep(200),
|
|
|
- [{publish, 1, _}] = emqx_mock_client:get_last_message(ConnPid),
|
|
|
- Attrs = emqx_session:attrs(SPid),
|
|
|
- Info = emqx_session:info(SPid),
|
|
|
- Stats = emqx_session:stats(SPid),
|
|
|
- ClientId = proplists:get_value(client_id, Attrs),
|
|
|
- ClientId = proplists:get_value(client_id, Info),
|
|
|
- 1 = proplists:get_value(subscriptions_count, Stats),
|
|
|
- emqx_session:unsubscribe(SPid, [<<"topic">>]),
|
|
|
- timer:sleep(200),
|
|
|
- [] = emqx:subscriptions(SPid),
|
|
|
- emqx_mock_client:close_session(ConnPid).
|
|
|
+t_proper_session(_) ->
|
|
|
+ Opts = [{numtests, 1000}, {to_file, user}],
|
|
|
+ ok = emqx_logger:set_log_level(emergency),
|
|
|
+ ok = before_proper(),
|
|
|
+ ?assert(proper:quickcheck(prop_session(), Opts)),
|
|
|
+ ok = after_proper().
|
|
|
+
|
|
|
+before_proper() ->
|
|
|
+ load(?mock_modules).
|
|
|
+
|
|
|
+after_proper() ->
|
|
|
+ unload(?mock_modules),
|
|
|
+ emqx_logger:set_log_level(error).
|
|
|
+
|
|
|
+prop_session() ->
|
|
|
+ ?FORALL({Session, OpList}, {session(), session_op_list()},
|
|
|
+ begin
|
|
|
+ try
|
|
|
+ apply_ops(Session, OpList),
|
|
|
+ true
|
|
|
+ after
|
|
|
+ true
|
|
|
+ end
|
|
|
+ end).
|
|
|
+
|
|
|
+%%%%%%%%%%%%%%%
|
|
|
+%%% Helpers %%%
|
|
|
+%%%%%%%%%%%%%%%
|
|
|
+
|
|
|
+apply_ops(Session, []) ->
|
|
|
+ ?assertEqual(session, element(1, Session));
|
|
|
+apply_ops(Session, [Op | Rest]) ->
|
|
|
+ NSession = apply_op(Session, Op),
|
|
|
+ apply_ops(NSession, Rest).
|
|
|
+
|
|
|
+apply_op(Session, info) ->
|
|
|
+ Info = emqx_session:info(Session),
|
|
|
+ ?assert(is_map(Info)),
|
|
|
+ ?assertEqual(16, maps:size(Info)),
|
|
|
+ Session;
|
|
|
+apply_op(Session, attrs) ->
|
|
|
+ Attrs = emqx_session:attrs(Session),
|
|
|
+ ?assert(is_map(Attrs)),
|
|
|
+ ?assertEqual(3, maps:size(Attrs)),
|
|
|
+ Session;
|
|
|
+apply_op(Session, stats) ->
|
|
|
+ Stats = emqx_session:stats(Session),
|
|
|
+ ?assert(is_list(Stats)),
|
|
|
+ ?assertEqual(9, length(Stats)),
|
|
|
+ Session;
|
|
|
+apply_op(Session, {info, InfoArg}) ->
|
|
|
+ _Ret = emqx_session:info(InfoArg, Session),
|
|
|
+ Session;
|
|
|
+apply_op(Session, {subscribe, {Client, TopicFilter, SubOpts}}) ->
|
|
|
+ case emqx_session:subscribe(Client, TopicFilter, SubOpts, Session) of
|
|
|
+ {ok, NSession} ->
|
|
|
+ NSession;
|
|
|
+ {error, ?RC_QUOTA_EXCEEDED} ->
|
|
|
+ Session
|
|
|
+ end;
|
|
|
+apply_op(Session, {unsubscribe, {Client, TopicFilter}}) ->
|
|
|
+ case emqx_session:unsubscribe(Client, TopicFilter, Session) of
|
|
|
+ {ok, NSession} ->
|
|
|
+ NSession;
|
|
|
+ {error, ?RC_NO_SUBSCRIPTION_EXISTED} ->
|
|
|
+ Session
|
|
|
+ end;
|
|
|
+apply_op(Session, {publish, {PacketId, Msg}}) ->
|
|
|
+ case emqx_session:publish(PacketId, Msg, Session) of
|
|
|
+ {ok, _Msg} ->
|
|
|
+ Session;
|
|
|
+ {ok, _Deliver, NSession} ->
|
|
|
+ NSession;
|
|
|
+ {error, _ErrorCode} ->
|
|
|
+ Session
|
|
|
+ end;
|
|
|
+apply_op(Session, {puback, PacketId}) ->
|
|
|
+ case emqx_session:puback(PacketId, Session) of
|
|
|
+ {ok, _Msg} ->
|
|
|
+ Session;
|
|
|
+ {ok, _Deliver, NSession} ->
|
|
|
+ NSession;
|
|
|
+ {error, _ErrorCode} ->
|
|
|
+ Session
|
|
|
+ end;
|
|
|
+apply_op(Session, {pubrec, PacketId}) ->
|
|
|
+ case emqx_session:pubrec(PacketId, Session) of
|
|
|
+ {ok, NSession} ->
|
|
|
+ NSession;
|
|
|
+ {error, _ErrorCode} ->
|
|
|
+ Session
|
|
|
+ end;
|
|
|
+apply_op(Session, {pubrel, PacketId}) ->
|
|
|
+ case emqx_session:pubrel(PacketId, Session) of
|
|
|
+ {ok, NSession} ->
|
|
|
+ NSession;
|
|
|
+ {error, _ErrorCode} ->
|
|
|
+ Session
|
|
|
+ end;
|
|
|
+apply_op(Session, {pubcomp, PacketId}) ->
|
|
|
+ case emqx_session:pubcomp(PacketId, Session) of
|
|
|
+ {ok, _Msgs} ->
|
|
|
+ Session;
|
|
|
+ {ok, _Msgs, NSession} ->
|
|
|
+ NSession;
|
|
|
+ {error, _ErrorCode} ->
|
|
|
+ Session
|
|
|
+ end;
|
|
|
+apply_op(Session, {deliver, Delivers}) ->
|
|
|
+ {ok, _Msgs, NSession} = emqx_session:deliver(Delivers, Session),
|
|
|
+ NSession;
|
|
|
+apply_op(Session, {timeout, {TRef, TimeoutMsg}}) ->
|
|
|
+ case emqx_session:timeout(TRef, TimeoutMsg, Session) of
|
|
|
+ {ok, NSession} ->
|
|
|
+ NSession;
|
|
|
+ {ok, _Msg, NSession} ->
|
|
|
+ NSession
|
|
|
+ end.
|
|
|
+
|
|
|
+%%%%%%%%%%%%%%%%%%
|
|
|
+%%% Generators %%%
|
|
|
+%%%%%%%%%%%%%%%%%%
|
|
|
+session_op_list() ->
|
|
|
+ Union = [info,
|
|
|
+ attrs,
|
|
|
+ stats,
|
|
|
+ {info, info_args()},
|
|
|
+ {subscribe, sub_args()},
|
|
|
+ {unsubscribe, unsub_args()},
|
|
|
+ {publish, publish_args()},
|
|
|
+ {puback, puback_args()},
|
|
|
+ {pubrec, pubrec_args()},
|
|
|
+ {pubrel, pubrel_args()},
|
|
|
+ {pubcomp, pubcomp_args()},
|
|
|
+ {deliver, deliver_args()},
|
|
|
+ {timeout, timeout_args()}
|
|
|
+ ],
|
|
|
+ list(?LAZY(oneof(Union))).
|
|
|
+
|
|
|
+deliver_args() ->
|
|
|
+ list({deliver, topic(), message()}).
|
|
|
+
|
|
|
+timeout_args() ->
|
|
|
+ {tref(), timeout_msg()}.
|
|
|
+
|
|
|
+info_args() ->
|
|
|
+ oneof([clean_start,
|
|
|
+ subscriptions,
|
|
|
+ max_subscriptions,
|
|
|
+ upgrade_qos,
|
|
|
+ inflight,
|
|
|
+ max_inflight,
|
|
|
+ retry_interval,
|
|
|
+ mqueue_len,
|
|
|
+ max_mqueue,
|
|
|
+ mqueue_dropped,
|
|
|
+ next_pkt_id,
|
|
|
+ awaiting_rel,
|
|
|
+ max_awaiting_rel,
|
|
|
+ await_rel_timeout,
|
|
|
+ expiry_interval,
|
|
|
+ created_at
|
|
|
+ ]).
|
|
|
+
|
|
|
+sub_args() ->
|
|
|
+ ?LET({ClientId, TopicFilter, SubOpts},
|
|
|
+ {clientid(), topic(), sub_opts()},
|
|
|
+ {#{client_id => ClientId}, TopicFilter, SubOpts}).
|
|
|
+
|
|
|
+unsub_args() ->
|
|
|
+ ?LET({ClientId, TopicFilter},
|
|
|
+ {clientid(), topic()},
|
|
|
+ {#{client_id => ClientId}, TopicFilter}).
|
|
|
+
|
|
|
+publish_args() ->
|
|
|
+ ?LET({PacketId, Message},
|
|
|
+ {packetid(), message()},
|
|
|
+ {PacketId, Message}).
|
|
|
+
|
|
|
+puback_args() ->
|
|
|
+ packetid().
|
|
|
+
|
|
|
+pubrec_args() ->
|
|
|
+ packetid().
|
|
|
+
|
|
|
+pubrel_args() ->
|
|
|
+ packetid().
|
|
|
+
|
|
|
+pubcomp_args() ->
|
|
|
+ packetid().
|
|
|
+
|
|
|
+timeout_msg() ->
|
|
|
+ oneof([retry_delivery, check_awaiting_rel]).
|
|
|
+
|
|
|
+tref() -> oneof([tref, undefined]).
|
|
|
+
|
|
|
+sub_opts() ->
|
|
|
+ ?LET({RH, RAP, NL, QOS, SHARE, SUBID},
|
|
|
+ {rh(), rap(), nl(), qos(), share(), subid()}
|
|
|
+ , make_subopts(RH, RAP, NL, QOS, SHARE, SUBID)).
|
|
|
+
|
|
|
+message() ->
|
|
|
+ ?LET({QoS, Topic, Payload},
|
|
|
+ {qos(), topic(), payload()},
|
|
|
+ emqx_message:make(proper, QoS, Topic, Payload)).
|
|
|
+
|
|
|
+subid() -> integer().
|
|
|
+
|
|
|
+rh() -> oneof([0, 1, 2]).
|
|
|
+
|
|
|
+rap() -> oneof([0, 1]).
|
|
|
+
|
|
|
+nl() -> oneof([0, 1]).
|
|
|
+
|
|
|
+qos() -> oneof([0, 1, 2]).
|
|
|
+
|
|
|
+share() -> binary().
|
|
|
+
|
|
|
+clientid() -> binary().
|
|
|
+
|
|
|
+topic() -> ?LET(No, choose(1, 10),
|
|
|
+ begin
|
|
|
+ NoBin = integer_to_binary(No),
|
|
|
+ <<"topic/", NoBin/binary>>
|
|
|
+ end).
|
|
|
+
|
|
|
+payload() -> binary().
|
|
|
+
|
|
|
+packetid() -> choose(1, 30).
|
|
|
+
|
|
|
+zone() ->
|
|
|
+ ?LET(Zone, [{max_subscriptions, max_subscription()},
|
|
|
+ {upgrade_qos, upgrade_qos()},
|
|
|
+ {retry_interval, retry_interval()},
|
|
|
+ {max_awaiting_rel, max_awaiting_rel()},
|
|
|
+ {await_rel_timeout, await_rel_timeout()}]
|
|
|
+ , maps:from_list(Zone)).
|
|
|
+
|
|
|
+max_subscription() ->
|
|
|
+ frequency([{33, 0},
|
|
|
+ {33, 1},
|
|
|
+ {34, choose(0,10)}]).
|
|
|
+
|
|
|
+upgrade_qos() -> bool().
|
|
|
+
|
|
|
+retry_interval() -> ?LET(Interval, choose(0, 20), Interval*1000).
|
|
|
+
|
|
|
+max_awaiting_rel() -> choose(0, 10).
|
|
|
+
|
|
|
+await_rel_timeout() -> ?LET(Interval, choose(0, 150), Interval*1000).
|
|
|
+
|
|
|
+max_inflight() -> choose(0, 10).
|
|
|
+
|
|
|
+expiry_interval() -> ?LET(EI, choose(1, 10), EI * 3600).
|
|
|
+
|
|
|
+option() ->
|
|
|
+ ?LET(Option, [{max_inflight, max_inflight()},
|
|
|
+ {expiry_interval, expiry_interval()}]
|
|
|
+ , maps:from_list(Option)).
|
|
|
+
|
|
|
+cleanstart() -> bool().
|
|
|
+
|
|
|
+session() ->
|
|
|
+ ?LET({CleanStart, Zone, Options},
|
|
|
+ {cleanstart(), zone(), option()},
|
|
|
+ begin
|
|
|
+ Session = emqx_session:init(CleanStart, #{zone => Zone}, Options),
|
|
|
+ emqx_session:set_pkt_id(Session, 16#ffff)
|
|
|
+ end).
|
|
|
+
|
|
|
+%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
|
+%%% Internal functions %%%
|
|
|
+%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
|
+
|
|
|
+make_subopts(RH, RAP, NL, QOS, SHARE, SubId) ->
|
|
|
+ #{rh => RH,
|
|
|
+ rap => RAP,
|
|
|
+ nl => NL,
|
|
|
+ qos => QOS,
|
|
|
+ share => SHARE,
|
|
|
+ subid => SubId}.
|
|
|
+
|
|
|
+
|
|
|
+load(Modules) ->
|
|
|
+ [mock(Module) || Module <- Modules],
|
|
|
+ ok.
|
|
|
+
|
|
|
+unload(Modules) ->
|
|
|
+ lists:foreach(fun(Module) ->
|
|
|
+ ok = meck:unload(Module)
|
|
|
+ end, Modules).
|
|
|
+
|
|
|
+mock(Module) ->
|
|
|
+ ok = meck:new(Module, [passthrough, no_history]),
|
|
|
+ do_mock(Module).
|
|
|
|
|
|
+do_mock(emqx_metrics) ->
|
|
|
+ meck:expect(emqx_metrics, inc, fun(_Anything) -> ok end);
|
|
|
+do_mock(emqx_broker) ->
|
|
|
+ meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
|
|
|
+ meck:expect(emqx_broker, set_subopts, fun(_, _) -> ok end),
|
|
|
+ meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end),
|
|
|
+ meck:expect(emqx_broker, publish, fun(_) -> ok end);
|
|
|
+do_mock(emqx_misc) ->
|
|
|
+ meck:expect(emqx_misc, start_timer, fun(_, _) -> tref end);
|
|
|
+do_mock(emqx_message) ->
|
|
|
+ meck:expect(emqx_message, set_header, fun(_Hdr, _Val, Msg) -> Msg end),
|
|
|
+ meck:expect(emqx_message, is_expired, fun(_Msg) -> (rand:uniform(16) > 8) end);
|
|
|
+do_mock(emqx_hooks) ->
|
|
|
+ meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end);
|
|
|
+do_mock(emqx_zone) ->
|
|
|
+ meck:expect(emqx_zone, get_env, fun(Env, Key, Default) -> maps:get(Key, Env, Default) end);
|
|
|
+do_mock(emqx_pd) ->
|
|
|
+ meck:expect(emqx_pd, update_counter, fun(_stats, _num) -> ok end).
|