Quellcode durchsuchen

fix: update existing testcases for new emqx_resource

Shawn vor 3 Jahren
Ursprung
Commit
0377d3cf61

+ 1 - 1
apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl

@@ -164,7 +164,7 @@ authenticate(
 ) ->
     Filter = emqx_authn_utils:render_deep(FilterTemplate, Credential),
     case emqx_resource:query(ResourceId, {find_one, Collection, Filter, #{}}) of
-        undefined ->
+        {ok, undefined} ->
             ignore;
         {error, Reason} ->
             ?TRACE_AUTHN_PROVIDER(error, "mongodb_query_failed", #{

+ 2 - 2
apps/emqx_authz/src/emqx_authz_mongodb.erl

@@ -92,9 +92,9 @@ authorize(
                 resource_id => ResourceID
             }),
             nomatch;
-        [] ->
+        {ok, []} ->
             nomatch;
-        Rows ->
+        {ok, Rows} ->
             Rules = [
                 emqx_authz_rule:compile({Permission, all, Action, Topics})
              || #{

+ 66 - 50
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -24,7 +24,7 @@
 -include_lib("common_test/include/ct.hrl").
 -define(CONF_DEFAULT, <<"bridges: {}">>).
 -define(BRIDGE_TYPE, <<"webhook">>).
--define(BRIDGE_NAME, <<"test_bridge">>).
+-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
 -define(URL(PORT, PATH),
     list_to_binary(
         io_lib:format(
@@ -78,8 +78,12 @@ set_special_configs(_) ->
 
 init_per_testcase(_, Config) ->
     {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
-    Config.
-end_per_testcase(_, _Config) ->
+    {Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2),
+    [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config].
+end_per_testcase(_, Config) ->
+    Sock = ?config(sock, Config),
+    Acceptor = ?config(acceptor, Config),
+    stop_http_server(Sock, Acceptor),
     clear_resources(),
     ok.
 
@@ -95,31 +99,39 @@ clear_resources() ->
 %% HTTP server for testing
 %%------------------------------------------------------------------------------
 start_http_server(HandleFun) ->
+    process_flag(trap_exit, true),
     Parent = self(),
-    spawn_link(fun() ->
-        {Port, Sock} = listen_on_random_port(),
-        Parent ! {port, Port},
-        loop(Sock, HandleFun, Parent)
+    {Port, Sock} = listen_on_random_port(),
+    Acceptor = spawn_link(fun() ->
+        accept_loop(Sock, HandleFun, Parent)
     end),
-    receive
-        {port, Port} -> Port
-    after 2000 -> error({timeout, start_http_server})
-    end.
+    timer:sleep(100),
+    {Port, Sock, Acceptor}.
+
+stop_http_server(Sock, Acceptor) ->
+    exit(Acceptor, kill),
+    gen_tcp:close(Sock).
 
 listen_on_random_port() ->
     Min = 1024,
     Max = 65000,
+    rand:seed(exsplus, erlang:timestamp()),
     Port = rand:uniform(Max - Min) + Min,
-    case gen_tcp:listen(Port, [{active, false}, {reuseaddr, true}, binary]) of
+    case
+        gen_tcp:listen(Port, [
+            binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}
+        ])
+    of
         {ok, Sock} -> {Port, Sock};
         {error, eaddrinuse} -> listen_on_random_port()
     end.
 
-loop(Sock, HandleFun, Parent) ->
+accept_loop(Sock, HandleFun, Parent) ->
+    process_flag(trap_exit, true),
     {ok, Conn} = gen_tcp:accept(Sock),
-    Handler = spawn(fun() -> HandleFun(Conn, Parent) end),
+    Handler = spawn_link(fun() -> HandleFun(Conn, Parent) end),
     gen_tcp:controlling_process(Conn, Handler),
-    loop(Sock, HandleFun, Parent).
+    accept_loop(Sock, HandleFun, Parent).
 
 make_response(CodeStr, Str) ->
     B = iolist_to_binary(Str),
@@ -138,7 +150,9 @@ handle_fun_200_ok(Conn, Parent) ->
             Parent ! {http_server, received, Req},
             gen_tcp:send(Conn, make_response("200 OK", "Request OK")),
             handle_fun_200_ok(Conn, Parent);
-        {error, closed} ->
+        {error, Reason} ->
+            ct:pal("the http handler recv error: ~p", [Reason]),
+            timer:sleep(100),
             gen_tcp:close(Conn)
     end.
 
@@ -153,24 +167,25 @@ parse_http_request(ReqStr0) ->
 %% Testcases
 %%------------------------------------------------------------------------------
 
-t_http_crud_apis(_) ->
-    Port = start_http_server(fun handle_fun_200_ok/2),
+t_http_crud_apis(Config) ->
+    Port = ?config(port, Config),
     %% assert we there's no bridges at first
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
 
     %% then we add a webhook bridge, using POST
     %% POST /bridges/ will create a bridge
     URL1 = ?URL(Port, "path1"),
+    Name = ?BRIDGE_NAME,
     {ok, 201, Bridge} = request(
         post,
         uri(["bridges"]),
-        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)
+        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
     ),
 
     %ct:pal("---bridge: ~p", [Bridge]),
     #{
         <<"type">> := ?BRIDGE_TYPE,
-        <<"name">> := ?BRIDGE_NAME,
+        <<"name">> := Name,
         <<"enable">> := true,
         <<"status">> := _,
         <<"node_status">> := [_ | _],
@@ -179,7 +194,7 @@ t_http_crud_apis(_) ->
         <<"url">> := URL1
     } = jsx:decode(Bridge),
 
-    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
     %% send an message to emqx and the message should be forwarded to the HTTP server
     Body = <<"my msg">>,
     emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)),
@@ -203,12 +218,12 @@ t_http_crud_apis(_) ->
     {ok, 200, Bridge2} = request(
         put,
         uri(["bridges", BridgeID]),
-        ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)
+        ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, Name)
     ),
     ?assertMatch(
         #{
             <<"type">> := ?BRIDGE_TYPE,
-            <<"name">> := ?BRIDGE_NAME,
+            <<"name">> := Name,
             <<"enable">> := true,
             <<"status">> := _,
             <<"node_status">> := [_ | _],
@@ -225,7 +240,7 @@ t_http_crud_apis(_) ->
         [
             #{
                 <<"type">> := ?BRIDGE_TYPE,
-                <<"name">> := ?BRIDGE_NAME,
+                <<"name">> := Name,
                 <<"enable">> := true,
                 <<"status">> := _,
                 <<"node_status">> := [_ | _],
@@ -242,7 +257,7 @@ t_http_crud_apis(_) ->
     ?assertMatch(
         #{
             <<"type">> := ?BRIDGE_TYPE,
-            <<"name">> := ?BRIDGE_NAME,
+            <<"name">> := Name,
             <<"enable">> := true,
             <<"status">> := _,
             <<"node_status">> := [_ | _],
@@ -275,7 +290,7 @@ t_http_crud_apis(_) ->
     {ok, 404, ErrMsg2} = request(
         put,
         uri(["bridges", BridgeID]),
-        ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)
+        ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, Name)
     ),
     ?assertMatch(
         #{
@@ -286,29 +301,28 @@ t_http_crud_apis(_) ->
     ),
     ok.
 
-t_start_stop_bridges(_) ->
-    lists:foreach(
-        fun(Type) ->
-            do_start_stop_bridges(Type)
-        end,
-        [node, cluster]
-    ).
+t_start_stop_bridges_node(Config) ->
+    do_start_stop_bridges(node, Config).
+
+t_start_stop_bridges_cluster(Config) ->
+    do_start_stop_bridges(cluster, Config).
 
-do_start_stop_bridges(Type) ->
+do_start_stop_bridges(Type, Config) ->
     %% assert we there's no bridges at first
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
 
-    Port = start_http_server(fun handle_fun_200_ok/2),
+    Port = ?config(port, Config),
     URL1 = ?URL(Port, "abc"),
+    Name = atom_to_binary(Type),
     {ok, 201, Bridge} = request(
         post,
         uri(["bridges"]),
-        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)
+        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
     ),
     %ct:pal("the bridge ==== ~p", [Bridge]),
     #{
         <<"type">> := ?BRIDGE_TYPE,
-        <<"name">> := ?BRIDGE_NAME,
+        <<"name">> := Name,
         <<"enable">> := true,
         <<"status">> := <<"connected">>,
         <<"node_status">> := [_ | _],
@@ -316,11 +330,11 @@ do_start_stop_bridges(Type) ->
         <<"node_metrics">> := [_ | _],
         <<"url">> := URL1
     } = jsx:decode(Bridge),
-    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
     %% stop it
     {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
     {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
-    ?assertMatch(#{<<"status">> := <<"disconnected">>}, jsx:decode(Bridge2)),
+    ?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)),
     %% start again
     {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
     {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
@@ -339,21 +353,22 @@ do_start_stop_bridges(Type) ->
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
 
-t_enable_disable_bridges(_) ->
+t_enable_disable_bridges(Config) ->
     %% assert we there's no bridges at first
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
 
-    Port = start_http_server(fun handle_fun_200_ok/2),
+    Name = ?BRIDGE_NAME,
+    Port = ?config(port, Config),
     URL1 = ?URL(Port, "abc"),
     {ok, 201, Bridge} = request(
         post,
         uri(["bridges"]),
-        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)
+        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
     ),
     %ct:pal("the bridge ==== ~p", [Bridge]),
     #{
         <<"type">> := ?BRIDGE_TYPE,
-        <<"name">> := ?BRIDGE_NAME,
+        <<"name">> := Name,
         <<"enable">> := true,
         <<"status">> := <<"connected">>,
         <<"node_status">> := [_ | _],
@@ -361,11 +376,11 @@ t_enable_disable_bridges(_) ->
         <<"node_metrics">> := [_ | _],
         <<"url">> := URL1
     } = jsx:decode(Bridge),
-    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
     %% disable it
     {ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>),
     {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
-    ?assertMatch(#{<<"status">> := <<"disconnected">>}, jsx:decode(Bridge2)),
+    ?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)),
     %% enable again
     {ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>),
     {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
@@ -391,21 +406,22 @@ t_enable_disable_bridges(_) ->
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
 
-t_reset_bridges(_) ->
+t_reset_bridges(Config) ->
     %% assert we there's no bridges at first
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
 
-    Port = start_http_server(fun handle_fun_200_ok/2),
+    Name = ?BRIDGE_NAME,
+    Port = ?config(port, Config),
     URL1 = ?URL(Port, "abc"),
     {ok, 201, Bridge} = request(
         post,
         uri(["bridges"]),
-        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)
+        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
     ),
     %ct:pal("the bridge ==== ~p", [Bridge]),
     #{
         <<"type">> := ?BRIDGE_TYPE,
-        <<"name">> := ?BRIDGE_NAME,
+        <<"name">> := Name,
         <<"enable">> := true,
         <<"status">> := <<"connected">>,
         <<"node_status">> := [_ | _],
@@ -413,7 +429,7 @@ t_reset_bridges(_) ->
         <<"node_metrics">> := [_ | _],
         <<"url">> := URL1
     } = jsx:decode(Bridge),
-    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
     {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "reset_metrics"]), []),
 
     %% delete the bridge

+ 2 - 3
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -134,7 +134,8 @@ drop_bridge(Name) ->
 %% When use this bridge as a data source, ?MODULE:on_message_received will be called
 %% if the bridge received msgs from the remote broker.
 on_message_received(Msg, HookPoint, InstId) ->
-    _ = emqx_resource:query(InstId, {message_received, Msg}),
+    emqx_resource:inc_matched(InstId),
+    emqx_resource:inc_success(InstId),
     emqx:run_hook(HookPoint, [Msg]).
 
 %% ===================================================================
@@ -181,8 +182,6 @@ on_stop(_InstId, #{name := InstanceId}) ->
             })
     end.
 
-on_query(_InstId, {message_received, _Msg}, _State) ->
-    ok;
 on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) ->
     ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
     emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),

+ 4 - 4
apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl

@@ -85,8 +85,8 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
         emqx_resource:get_instance(PoolName),
     ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
     % % Perform query as further check that the resource is working as expected
-    ?assertMatch([], emqx_resource:query(PoolName, test_query_find())),
-    ?assertMatch(undefined, emqx_resource:query(PoolName, test_query_find_one())),
+    ?assertMatch({ok, []}, emqx_resource:query(PoolName, test_query_find())),
+    ?assertMatch({ok, undefined}, emqx_resource:query(PoolName, test_query_find_one())),
     ?assertEqual(ok, emqx_resource:stop(PoolName)),
     % Resource will be listed still, but state will be changed and healthcheck will fail
     % as the worker no longer exists.
@@ -108,8 +108,8 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
     {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
         emqx_resource:get_instance(PoolName),
     ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
-    ?assertMatch([], emqx_resource:query(PoolName, test_query_find())),
-    ?assertMatch(undefined, emqx_resource:query(PoolName, test_query_find_one())),
+    ?assertMatch({ok, []}, emqx_resource:query(PoolName, test_query_find())),
+    ?assertMatch({ok, undefined}, emqx_resource:query(PoolName, test_query_find_one())),
     % Stop and remove the resource in one go.
     ?assertEqual(ok, emqx_resource:remove_local(PoolName)),
     ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),

+ 1 - 1
apps/emqx_resource/src/emqx_resource.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_resource, [
     {description, "Manager for all external resources"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {mod, {emqx_resource_app, []}},
     {applications, [

+ 4 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -103,7 +103,7 @@
     list_group_instances/1
 ]).
 
--export([inc_metrics_funcs/1, inc_success/1, inc_failed/1]).
+-export([inc_metrics_funcs/1, inc_matched/1, inc_success/1, inc_failed/1]).
 
 -optional_callbacks([
     on_query/3,
@@ -393,6 +393,9 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
 
 %% =================================================================================
 
+inc_matched(ResId) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ResId, matched).
+
 inc_success(ResId) ->
     emqx_metrics_worker:inc(?RES_METRICS, ResId, success).
 

+ 18 - 8
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -44,7 +44,7 @@
 
 -export([running/3, blocked/3]).
 
--export([queue_item_marshaller/1]).
+-export([queue_item_marshaller/1, estimate_size/1]).
 
 -define(RESUME_INTERVAL, 15000).
 
@@ -112,6 +112,7 @@ init({Id, Index, Opts}) ->
                 replayq:open(#{
                     dir => disk_queue_dir(Id, Index),
                     seg_bytes => 10000000,
+                    sizer => fun ?MODULE:estimate_size/1,
                     marshaller => fun ?MODULE:queue_item_marshaller/1
                 });
             false ->
@@ -172,6 +173,9 @@ queue_item_marshaller(?Q_ITEM(_) = I) ->
 queue_item_marshaller(Bin) when is_binary(Bin) ->
     binary_to_term(Bin).
 
+estimate_size(QItem) ->
+    size(queue_item_marshaller(QItem)).
+
 %%==============================================================================
 pick_query(Fun, Id, Key, Query) ->
     try gproc_pool:pick_worker(Id, Key) of
@@ -277,12 +281,6 @@ reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) ->
     gen_statem:reply(From, Result),
     handle_query_result(Id, Result, BlockWorker).
 
-handle_query_result(Id, ok, BlockWorker) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, success),
-    BlockWorker;
-handle_query_result(Id, {ok, _}, BlockWorker) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, success),
-    BlockWorker;
 handle_query_result(Id, ?RESOURCE_ERROR_M(exception, _), BlockWorker) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, exception),
     BlockWorker;
@@ -297,7 +295,12 @@ handle_query_result(Id, {error, _}, BlockWorker) ->
     BlockWorker;
 handle_query_result(Id, {resource_down, _}, _BlockWorker) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, resource_down),
-    true.
+    true;
+handle_query_result(Id, Result, BlockWorker) ->
+    %% assert
+    true = is_ok_result(Result),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, success),
+    BlockWorker.
 
 call_query(Id, Request) ->
     do_call_query(on_query, Id, Request, 1).
@@ -339,6 +342,13 @@ maybe_expand_batch_result(Result, Batch) ->
 
 %%==============================================================================
 
+is_ok_result(ok) ->
+    true;
+is_ok_result(R) when is_tuple(R) ->
+    erlang:element(1, R) == ok;
+is_ok_result(_) ->
+    false.
+
 -spec name(id(), integer()) -> atom().
 name(Id, Index) ->
     Mod = atom_to_list(?MODULE),