Просмотр исходного кода

test(webhook): ignore messages from other clients

Zaiming Shi 4 лет назад
Родитель
Сommit
186dfd04a7
1 измененных файлов с 56 добавлено и 45 удалено
  1. 56 45
      apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl

+ 56 - 45
apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl

@@ -128,12 +128,13 @@ test_full_flow(Config) ->
     after 1000 -> error(timeout)
     after 1000 -> error(timeout)
     end,
     end,
     application:set_env(emqx_web_hook, headers, [{"k1","K1"}, {"k2", "K2"}]),
     application:set_env(emqx_web_hook, headers, [{"k1","K1"}, {"k2", "K2"}]),
-    {ok, C} = emqtt:start_link([ {clientid, <<"simpleClient">>}
+    ClientId = iolist_to_binary(["client-", integer_to_list(erlang:system_time())]),
+    {ok, C} = emqtt:start_link([ {clientid, ClientId}
                                , {proto_ver, v5}
                                , {proto_ver, v5}
                                , {keepalive, 60}
                                , {keepalive, 60}
                                ]),
                                ]),
     try
     try
-        do_test_full_flow(C)
+        do_test_full_flow(C, ClientId)
     after
     after
         Ref = erlang:monitor(process, ServerPid),
         Ref = erlang:monitor(process, ServerPid),
         http_server:stop(ServerPid),
         http_server:stop(ServerPid),
@@ -142,27 +143,34 @@ test_full_flow(Config) ->
         end
         end
     end.
     end.
 
 
-do_test_full_flow(C) ->
+do_test_full_flow(C, ClientId) ->
     {ok, _} = emqtt:connect(C),
     {ok, _} = emqtt:connect(C),
     {ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos2),
     {ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos2),
     {ok, _} = emqtt:publish(C, <<"TopicA">>, <<"Payload...">>, qos2),
     {ok, _} = emqtt:publish(C, <<"TopicA">>, <<"Payload...">>, qos2),
     {ok, _, _} = emqtt:unsubscribe(C, <<"TopicA">>),
     {ok, _, _} = emqtt:unsubscribe(C, <<"TopicA">>),
     emqtt:disconnect(C),
     emqtt:disconnect(C),
-    validate_params_and_headers(undefined).
+    validate_params_and_headers(undefined, ClientId).
 
 
-validate_params_and_headers(ClientState) ->
+validate_params_and_headers(ClientState, ClientId) ->
     receive
     receive
         {http_server, {Params0, _Bool}, Headers} ->
         {http_server, {Params0, _Bool}, Headers} ->
             Params = emqx_json:decode(Params0, [return_maps]),
             Params = emqx_json:decode(Params0, [return_maps]),
-            validate_hook_resp(Params),
-            validate_hook_headers(Headers),
-            case maps:get(<<"action">>, Params) of
-                <<"session_terminated">> ->
-                    ok;
-                <<"client_connect">> ->
-                    validate_params_and_headers(connected);
-                _ ->
-                    validate_params_and_headers(ClientState) %% continue looping
+            try
+                validate_hook_resp(ClientId, Params),
+                validate_hook_headers(Headers),
+                case maps:get(<<"action">>, Params) of
+                    <<"session_terminated">> ->
+                        ok;
+                    <<"client_connect">> ->
+                        validate_params_and_headers(connected, ClientId);
+                    _ ->
+                        validate_params_and_headers(ClientState, ClientId) %% continue looping
+                end
+            catch
+                throw : {unknown_client, Other} ->
+                    ct:pal("ignored_event_from_other_client ~p~n~p~n~p",
+                           [Other, Params, Headers]),
+                    validate_params_and_headers(ClientState, ClientId) %% continue looping
             end
             end
     after
     after
         5000 ->
         5000 ->
@@ -198,66 +206,69 @@ validate_hook_headers(Headers) ->
     ?assertEqual(<<"K1">>, maps:get(<<"k1">>, Headers)),
     ?assertEqual(<<"K1">>, maps:get(<<"k1">>, Headers)),
     ?assertEqual(<<"K2">>, maps:get(<<"k2">>, Headers)).
     ?assertEqual(<<"K2">>, maps:get(<<"k2">>, Headers)).
 
 
-validate_hook_resp(Body = ?ACTION(<<"client_connect">>)) ->
+validate_hook_resp(ClientId, Body = ?ACTION(<<"client_connect">>)) ->
+    assert_username_clientid(ClientId, Body),
     ?assertEqual(5,  maps:get(<<"proto_ver">>, Body)),
     ?assertEqual(5,  maps:get(<<"proto_ver">>, Body)),
     ?assertEqual(60, maps:get(<<"keepalive">>, Body)),
     ?assertEqual(60, maps:get(<<"keepalive">>, Body)),
     ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
     ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
     ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
     ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
-    assert_username_clientid(Body);
-validate_hook_resp(Body = ?ACTION(<<"client_connack">>)) ->
+    ok;
+validate_hook_resp(ClientId, Body = ?ACTION(<<"client_connack">>)) ->
+    assert_username_clientid(ClientId, Body),
     ?assertEqual(5,  maps:get(<<"proto_ver">>, Body)),
     ?assertEqual(5,  maps:get(<<"proto_ver">>, Body)),
     ?assertEqual(60, maps:get(<<"keepalive">>, Body)),
     ?assertEqual(60, maps:get(<<"keepalive">>, Body)),
     ?assertEqual(<<"success">>, maps:get(<<"conn_ack">>, Body)),
     ?assertEqual(<<"success">>, maps:get(<<"conn_ack">>, Body)),
     ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
     ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
     ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
     ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
-    assert_username_clientid(Body);
-validate_hook_resp(Body = ?ACTION(<<"client_connected">>)) ->
+    ok;
+validate_hook_resp(ClientId, Body = ?ACTION(<<"client_connected">>)) ->
+    assert_username_clientid(ClientId, Body),
     _ = maps:get(<<"connected_at">>, Body),
     _ = maps:get(<<"connected_at">>, Body),
     ?assertEqual(5,  maps:get(<<"proto_ver">>, Body)),
     ?assertEqual(5,  maps:get(<<"proto_ver">>, Body)),
     ?assertEqual(60, maps:get(<<"keepalive">>, Body)),
     ?assertEqual(60, maps:get(<<"keepalive">>, Body)),
     ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
     ?assertEqual(<<"127.0.0.1">>, maps:get(<<"ipaddress">>, Body)),
-    ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
-    assert_username_clientid(Body);
-validate_hook_resp(Body = ?ACTION(<<"client_disconnected">>)) ->
+    ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
+validate_hook_resp(ClientId, Body = ?ACTION(<<"client_disconnected">>)) ->
+    assert_username_clientid(ClientId, Body),
     ?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)),
     ?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)),
-    ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
-    assert_username_clientid(Body);
-validate_hook_resp(Body = ?ACTION(<<"client_subscribe">>)) ->
+    ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
+validate_hook_resp(ClientId, Body = ?ACTION(<<"client_subscribe">>)) ->
+    assert_username_clientid(ClientId, Body),
     _ = maps:get(<<"opts">>, Body),
     _ = maps:get(<<"opts">>, Body),
     ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
     ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
-    ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
-    assert_username_clientid(Body);
-validate_hook_resp(Body = ?ACTION(<<"client_unsubscribe">>)) ->
+    ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
+validate_hook_resp(ClientId, Body = ?ACTION(<<"client_unsubscribe">>)) ->
+    assert_username_clientid(ClientId, Body),
     _ = maps:get(<<"opts">>, Body),
     _ = maps:get(<<"opts">>, Body),
     ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
     ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
-    ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
-    assert_username_clientid(Body);
-validate_hook_resp(Body = ?ACTION(<<"session_subscribed">>)) ->
+    ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
+validate_hook_resp(ClientId, Body = ?ACTION(<<"session_subscribed">>)) ->
+    assert_username_clientid(ClientId, Body),
     _ = maps:get(<<"opts">>, Body),
     _ = maps:get(<<"opts">>, Body),
     ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
     ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
-    ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
-    assert_username_clientid(Body);
-validate_hook_resp(Body = ?ACTION(<<"session_unsubscribed">>)) ->
+    ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
+validate_hook_resp(ClientId, Body = ?ACTION(<<"session_unsubscribed">>)) ->
+    assert_username_clientid(ClientId, Body),
     ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
     ?assertEqual(<<"TopicA">>, maps:get(<<"topic">>, Body)),
-    ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
-    assert_username_clientid(Body);
-validate_hook_resp(Body = ?ACTION(<<"session_terminated">>)) ->
+    ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
+validate_hook_resp(ClientId, Body = ?ACTION(<<"session_terminated">>)) ->
+    assert_username_clientid(ClientId, Body),
     ?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)),
     ?assertEqual(<<"normal">>, maps:get(<<"reason">>, Body)),
-    ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
-    assert_username_clientid(Body);
-validate_hook_resp(Body = ?ACTION(<<"message_publish">>)) ->
+    ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body));
+validate_hook_resp(_ClientId, Body = ?ACTION(<<"message_publish">>)) ->
     ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
     ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
     assert_messages_attrs(Body);
     assert_messages_attrs(Body);
-validate_hook_resp(Body = ?ACTION(<<"message_delivered">>)) ->
+validate_hook_resp(_ClientId, Body = ?ACTION(<<"message_delivered">>)) ->
     ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
     ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
     assert_messages_attrs(Body);
     assert_messages_attrs(Body);
-validate_hook_resp(Body = ?ACTION(<<"message_acked">>)) ->
+validate_hook_resp(_ClientId, Body = ?ACTION(<<"message_acked">>)) ->
     ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
     ?assertEqual(<<"test@127.0.0.1">>, maps:get(<<"node">>, Body)),
     assert_messages_attrs(Body).
     assert_messages_attrs(Body).
 
 
-assert_username_clientid(#{<<"clientid">> := ClientId, <<"username">> := Username}) ->
-    ?assertEqual(<<"simpleClient">>, ClientId),
-    ?assertEqual(null, Username).
+assert_username_clientid(ClientId, #{<<"clientid">> := ClientId, <<"username">> := Username}) ->
+    ?assertEqual(null, Username);
+assert_username_clientid(_ClientId, #{<<"clientid">> := Other}) ->
+    throw({unknown_client, Other}).
 
 
 assert_messages_attrs(#{ <<"ts">> := _
 assert_messages_attrs(#{ <<"ts">> := _
                        , <<"qos">> := _
                        , <<"qos">> := _