|
@@ -818,78 +818,6 @@ t_dispatch_qos0(Config) when is_list(Config) ->
|
|
|
emqtt:stop(ConnPid2),
|
|
emqtt:stop(ConnPid2),
|
|
|
ok.
|
|
ok.
|
|
|
|
|
|
|
|
-t_redispatch_when_kicked({init, Config}) when is_list(Config) ->
|
|
|
|
|
- Config;
|
|
|
|
|
-t_redispatch_when_kicked({'end', Config}) when is_list(Config) ->
|
|
|
|
|
- snabbkaffe:stop(),
|
|
|
|
|
- ok;
|
|
|
|
|
-t_redispatch_when_kicked(_) ->
|
|
|
|
|
- ok = ensure_config(sticky, true),
|
|
|
|
|
-
|
|
|
|
|
- Group = <<"group1">>,
|
|
|
|
|
- Topic = <<"foo/bar">>,
|
|
|
|
|
- ClientId1 = <<"ClientId1">>,
|
|
|
|
|
- ClientId2 = <<"ClientId2">>,
|
|
|
|
|
- Parent = self(),
|
|
|
|
|
-
|
|
|
|
|
- %% emqx_cm:kick_session will cause the emqtt to exit abnormally
|
|
|
|
|
- %% so here need to isolate emqtt with a separate process
|
|
|
|
|
- MkSub = fun(ClientId) ->
|
|
|
|
|
- {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}, {auto_ack, false}]),
|
|
|
|
|
- {ok, _} = emqtt:connect(ConnPid),
|
|
|
|
|
- {ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, {
|
|
|
|
|
- <<"$share/", Group/binary, "/foo/bar">>, 1
|
|
|
|
|
- }),
|
|
|
|
|
-
|
|
|
|
|
- Loop = fun(Self) ->
|
|
|
|
|
- receive
|
|
|
|
|
- wait ->
|
|
|
|
|
- case last_message(<<"hello1">>, [ConnPid], 6000) of
|
|
|
|
|
- {true, _} ->
|
|
|
|
|
- Parent ! {got, ClientId};
|
|
|
|
|
- _ ->
|
|
|
|
|
- Parent ! nothing
|
|
|
|
|
- end,
|
|
|
|
|
- Self(Self);
|
|
|
|
|
- stop ->
|
|
|
|
|
- stop
|
|
|
|
|
- end
|
|
|
|
|
- end,
|
|
|
|
|
-
|
|
|
|
|
- Loop(Loop)
|
|
|
|
|
- end,
|
|
|
|
|
-
|
|
|
|
|
- Receive = fun() ->
|
|
|
|
|
- receive
|
|
|
|
|
- {got, _} = Got -> Got;
|
|
|
|
|
- nothing -> nothing
|
|
|
|
|
- after 6000 ->
|
|
|
|
|
- nothing
|
|
|
|
|
- end
|
|
|
|
|
- end,
|
|
|
|
|
-
|
|
|
|
|
- Subs = [erlang:spawn(fun() -> MkSub(ClientId) end) || ClientId <- [ClientId1, ClientId2]],
|
|
|
|
|
- timer:sleep(500),
|
|
|
|
|
-
|
|
|
|
|
- ok = snabbkaffe:start_trace(),
|
|
|
|
|
-
|
|
|
|
|
- Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>),
|
|
|
|
|
- emqx:publish(Message),
|
|
|
|
|
-
|
|
|
|
|
- [Sub ! wait || Sub <- Subs],
|
|
|
|
|
- Res = Receive(),
|
|
|
|
|
- ?assertMatch({got, _}, Res),
|
|
|
|
|
-
|
|
|
|
|
- {got, ClientId} = Res,
|
|
|
|
|
- emqx_cm:kick_session(ClientId),
|
|
|
|
|
-
|
|
|
|
|
- Trace = snabbkaffe:collect_trace(500),
|
|
|
|
|
- ?assertMatch([#{reason := kicked}], ?of_kind(ignore_redispatch_shared_messages, Trace)),
|
|
|
|
|
-
|
|
|
|
|
- [Sub ! stop || Sub <- Subs],
|
|
|
|
|
- snabbkaffe:stop(),
|
|
|
|
|
- ok.
|
|
|
|
|
-
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
|
%% help functions
|
|
%% help functions
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|