|
|
@@ -43,7 +43,8 @@ all() ->
|
|
|
{group, metrics},
|
|
|
{group, metrics_simple},
|
|
|
{group, metrics_fail},
|
|
|
- {group, metrics_fail_simple}
|
|
|
+ {group, metrics_fail_simple},
|
|
|
+ {group, tracing}
|
|
|
].
|
|
|
|
|
|
suite() ->
|
|
|
@@ -142,6 +143,9 @@ groups() ->
|
|
|
{metrics_fail_simple, [], [
|
|
|
t_rule_metrics_sync_fail,
|
|
|
t_rule_metrics_async_fail
|
|
|
+ ]},
|
|
|
+ {tracing, [], [
|
|
|
+ t_trace_rule_id
|
|
|
]}
|
|
|
].
|
|
|
|
|
|
@@ -154,13 +158,13 @@ init_per_suite(Config) ->
|
|
|
emqx_rule_funcs_demo:module_info(),
|
|
|
application:load(emqx_conf),
|
|
|
ok = emqx_common_test_helpers:start_apps(
|
|
|
- [emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge],
|
|
|
+ [emqx, emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge],
|
|
|
fun set_special_configs/1
|
|
|
),
|
|
|
Config.
|
|
|
|
|
|
end_per_suite(_Config) ->
|
|
|
- emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]),
|
|
|
+ emqx_common_test_helpers:stop_apps([emqx, emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge]),
|
|
|
ok.
|
|
|
|
|
|
set_special_configs(emqx_auth) ->
|
|
|
@@ -3632,6 +3636,111 @@ create_bridge(Type, Name, Config) ->
|
|
|
{ok, _Bridge} = emqx_bridge:create(Type, Name, Config),
|
|
|
emqx_bridge_resource:bridge_id(Type, Name).
|
|
|
|
|
|
+create_rule(Name, SQL) ->
|
|
|
+ Rule = emqx_rule_engine_SUITE:make_simple_rule(Name, SQL),
|
|
|
+ {ok, _} = emqx_rule_engine:create_rule(Rule).
|
|
|
+
|
|
|
+emqtt_client_config() ->
|
|
|
+ [
|
|
|
+ {host, "localhost"},
|
|
|
+ {clientid, <<"client">>},
|
|
|
+ {username, <<"testuser">>},
|
|
|
+ {password, <<"pass">>}
|
|
|
+ ].
|
|
|
+
|
|
|
+filesync(Name, Type) ->
|
|
|
+ ct:sleep(50),
|
|
|
+ filesync(Name, Type, 5).
|
|
|
+
|
|
|
+%% sometime the handler process is not started yet.
|
|
|
+filesync(Name, Type, 0) ->
|
|
|
+ ct:fail("Handler process not started ~p ~p", [Name, Type]);
|
|
|
+filesync(Name0, Type, Retry) ->
|
|
|
+ Name =
|
|
|
+ case is_binary(Name0) of
|
|
|
+ true -> Name0;
|
|
|
+ false -> list_to_binary(Name0)
|
|
|
+ end,
|
|
|
+ try
|
|
|
+ Handler = binary_to_atom(<<"trace_", (atom_to_binary(Type))/binary, "_", Name/binary>>),
|
|
|
+ ok = logger_disk_log_h:filesync(Handler)
|
|
|
+ catch
|
|
|
+ E:R ->
|
|
|
+ ct:pal("Filesync error:~p ~p~n", [{Name, Type, Retry}, {E, R}]),
|
|
|
+ ct:sleep(100),
|
|
|
+ filesync(Name, Type, Retry - 1)
|
|
|
+ end.
|
|
|
+
|
|
|
+t_trace_rule_id(_Config) ->
|
|
|
+ %% Start MQTT Client
|
|
|
+ emqx_trace_SUITE:reload(),
|
|
|
+ {ok, T} = emqtt:start_link(emqtt_client_config()),
|
|
|
+ emqtt:connect(T),
|
|
|
+ %% Create rules
|
|
|
+ create_rule(
|
|
|
+ <<"test_rule_id_1">>,
|
|
|
+ <<"select 1 as rule_number from \"rule_1_topic\"">>
|
|
|
+ ),
|
|
|
+ create_rule(
|
|
|
+ <<"test_rule_id_2">>,
|
|
|
+ <<"select 2 as rule_number from \"rule_2_topic\"">>
|
|
|
+ ),
|
|
|
+ %% Start tracing
|
|
|
+ ok = emqx_trace_handler:install(
|
|
|
+ "CLI-RULE-1", ruleid, <<"test_rule_id_1">>, all, "tmp/rule_trace_1.log"
|
|
|
+ ),
|
|
|
+ ok = emqx_trace_handler:install(
|
|
|
+ "CLI-RULE-2", ruleid, <<"test_rule_id_2">>, all, "tmp/rule_trace_2.log"
|
|
|
+ ),
|
|
|
+ emqx_trace:check(),
|
|
|
+ ok = filesync("CLI-RULE-1", ruleid),
|
|
|
+ ok = filesync("CLI-RULE-2", ruleid),
|
|
|
+
|
|
|
+ %% Verify the tracing file exits
|
|
|
+ ?assert(filelib:is_regular("tmp/rule_trace_1.log")),
|
|
|
+ ?assert(filelib:is_regular("tmp/rule_trace_2.log")),
|
|
|
+
|
|
|
+ %% Get current traces
|
|
|
+ ?assertMatch(
|
|
|
+ [
|
|
|
+ #{
|
|
|
+ type := ruleid,
|
|
|
+ filter := <<"test_rule_id_1">>,
|
|
|
+ level := debug,
|
|
|
+ dst := "tmp/rule_trace_1.log",
|
|
|
+ name := <<"CLI-RULE-1">>
|
|
|
+ },
|
|
|
+ #{
|
|
|
+ type := ruleid,
|
|
|
+ filter := <<"test_rule_id_2">>,
|
|
|
+ name := <<"CLI-RULE-2">>,
|
|
|
+ level := debug,
|
|
|
+ dst := "tmp/rule_trace_2.log"
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ emqx_trace_handler:running()
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% Trigger rule
|
|
|
+ emqtt:publish(T, <<"rule_1_topic">>, <<"my_traced_message">>),
|
|
|
+ ?retry(
|
|
|
+ 100,
|
|
|
+ 5,
|
|
|
+ begin
|
|
|
+ ok = filesync("CLI-RULE-1", ruleid),
|
|
|
+ {ok, Bin} = file:read_file("tmp/rule_trace_1.log"),
|
|
|
+ ?assertNotEqual(nomatch, binary:match(Bin, [<<"my_traced_message">>]))
|
|
|
+ end
|
|
|
+ ),
|
|
|
+ ok = filesync("CLI-RULE-2", ruleid),
|
|
|
+ ?assert(filelib:file_size("tmp/rule_trace_2.log") =:= 0),
|
|
|
+
|
|
|
+ %% Stop tracing
|
|
|
+ ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-1">>),
|
|
|
+ ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-2">>),
|
|
|
+ ?assertEqual([], emqx_trace_handler:running()),
|
|
|
+ emqtt:disconnect(T).
|
|
|
+
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Internal helpers
|
|
|
%%------------------------------------------------------------------------------
|