|
@@ -103,13 +103,13 @@ end_per_testcase(_TestCase, _Config) ->
|
|
|
%% Helper fns
|
|
%% Helper fns
|
|
|
%%-------------------------------------------------------------------------------------
|
|
%%-------------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
-check_send_message_with_action(Topic, ActionName, ConnectorName) ->
|
|
|
|
|
|
|
+check_send_message_with_action(Topic, ActionName, ConnectorName, Expect) ->
|
|
|
send_message(Topic),
|
|
send_message(Topic),
|
|
|
%% ######################################
|
|
%% ######################################
|
|
|
%% Check if message is sent to es
|
|
%% Check if message is sent to es
|
|
|
%% ######################################
|
|
%% ######################################
|
|
|
timer:sleep(500),
|
|
timer:sleep(500),
|
|
|
- check_action_metrics(ActionName, ConnectorName).
|
|
|
|
|
|
|
+ check_action_metrics(ActionName, ConnectorName, Expect).
|
|
|
|
|
|
|
|
send_message(Topic) ->
|
|
send_message(Topic) ->
|
|
|
Now = emqx_utils_calendar:now_to_rfc3339(microsecond),
|
|
Now = emqx_utils_calendar:now_to_rfc3339(microsecond),
|
|
@@ -123,7 +123,7 @@ send_message(Topic) ->
|
|
|
ok = emqtt:publish(Client, Topic, Payload, [{qos, 0}]),
|
|
ok = emqtt:publish(Client, Topic, Payload, [{qos, 0}]),
|
|
|
ok.
|
|
ok.
|
|
|
|
|
|
|
|
-check_action_metrics(ActionName, ConnectorName) ->
|
|
|
|
|
|
|
+check_action_metrics(ActionName, ConnectorName, Expect) ->
|
|
|
ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName),
|
|
ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName),
|
|
|
Metrics =
|
|
Metrics =
|
|
|
#{
|
|
#{
|
|
@@ -134,13 +134,7 @@ check_action_metrics(ActionName, ConnectorName) ->
|
|
|
dropped => emqx_resource_metrics:dropped_get(ActionId)
|
|
dropped => emqx_resource_metrics:dropped_get(ActionId)
|
|
|
},
|
|
},
|
|
|
?assertEqual(
|
|
?assertEqual(
|
|
|
- #{
|
|
|
|
|
- match => 1,
|
|
|
|
|
- success => 1,
|
|
|
|
|
- dropped => 0,
|
|
|
|
|
- failed => 0,
|
|
|
|
|
- queuing => 0
|
|
|
|
|
- },
|
|
|
|
|
|
|
+ Expect,
|
|
|
Metrics,
|
|
Metrics,
|
|
|
{ActionName, ConnectorName, ActionId}
|
|
{ActionName, ConnectorName, ActionId}
|
|
|
).
|
|
).
|
|
@@ -248,7 +242,7 @@ t_create_remove_list(Config) ->
|
|
|
ok.
|
|
ok.
|
|
|
|
|
|
|
|
%% Test sending a message to a bridge V2
|
|
%% Test sending a message to a bridge V2
|
|
|
-t_send_message(Config) ->
|
|
|
|
|
|
|
+t_create_message(Config) ->
|
|
|
ConnectorConfig = connector_config(Config),
|
|
ConnectorConfig = connector_config(Config),
|
|
|
{ok, _} = emqx_connector:create(?TYPE, test_connector2, ConnectorConfig),
|
|
{ok, _} = emqx_connector:create(?TYPE, test_connector2, ConnectorConfig),
|
|
|
ActionConfig = action(<<"test_connector2">>),
|
|
ActionConfig = action(<<"test_connector2">>),
|
|
@@ -261,7 +255,8 @@ t_send_message(Config) ->
|
|
|
},
|
|
},
|
|
|
{ok, _} = emqx_rule_engine:create_rule(Rule),
|
|
{ok, _} = emqx_rule_engine:create_rule(Rule),
|
|
|
%% Use the action to send a message
|
|
%% Use the action to send a message
|
|
|
- check_send_message_with_action(<<"es/1">>, test_action_1, test_connector2),
|
|
|
|
|
|
|
+ Expect = #{match => 1, success => 1, dropped => 0, failed => 0, queuing => 0},
|
|
|
|
|
+ check_send_message_with_action(<<"es/1">>, test_action_1, test_connector2, Expect),
|
|
|
%% Create a few more bridges with the same connector and test them
|
|
%% Create a few more bridges with the same connector and test them
|
|
|
ActionNames1 =
|
|
ActionNames1 =
|
|
|
lists:foldl(
|
|
lists:foldl(
|
|
@@ -278,7 +273,7 @@ t_send_message(Config) ->
|
|
|
},
|
|
},
|
|
|
{ok, _} = emqx_rule_engine:create_rule(Rule1),
|
|
{ok, _} = emqx_rule_engine:create_rule(Rule1),
|
|
|
Topic = <<"es/", Seq/binary>>,
|
|
Topic = <<"es/", Seq/binary>>,
|
|
|
- check_send_message_with_action(Topic, ActionName, test_connector2),
|
|
|
|
|
|
|
+ check_send_message_with_action(Topic, ActionName, test_connector2, Expect),
|
|
|
[ActionName | Acc]
|
|
[ActionName | Acc]
|
|
|
end,
|
|
end,
|
|
|
[],
|
|
[],
|
|
@@ -293,6 +288,74 @@ t_send_message(Config) ->
|
|
|
ActionNames
|
|
ActionNames
|
|
|
),
|
|
),
|
|
|
emqx_connector:remove(?TYPE, test_connector2),
|
|
emqx_connector:remove(?TYPE, test_connector2),
|
|
|
|
|
+ lists:foreach(
|
|
|
|
|
+ fun(#{id := Id}) ->
|
|
|
|
|
+ emqx_rule_engine:delete_rule(Id)
|
|
|
|
|
+ end,
|
|
|
|
|
+ emqx_rule_engine:get_rules()
|
|
|
|
|
+ ),
|
|
|
|
|
+ ok.
|
|
|
|
|
+
|
|
|
|
|
+t_update_message(Config) ->
|
|
|
|
|
+ ConnectorConfig = connector_config(Config),
|
|
|
|
|
+ {ok, _} = emqx_connector:create(?TYPE, update_connector, ConnectorConfig),
|
|
|
|
|
+ ActionConfig0 = action(<<"update_connector">>),
|
|
|
|
|
+ DocId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
|
|
|
|
+ ActionConfig1 = ActionConfig0#{
|
|
|
|
|
+ <<"parameters">> => #{
|
|
|
|
|
+ <<"index">> => <<"${payload.index}">>,
|
|
|
|
|
+ <<"id">> => DocId,
|
|
|
|
|
+ <<"max_retries">> => 0,
|
|
|
|
|
+ <<"action">> => <<"update">>,
|
|
|
|
|
+ <<"doc">> => <<"${payload.doc}">>
|
|
|
|
|
+ }
|
|
|
|
|
+ },
|
|
|
|
|
+ {ok, _} = emqx_bridge_v2:create(?TYPE, update_action, ActionConfig1),
|
|
|
|
|
+ Rule = #{
|
|
|
|
|
+ id => <<"rule:t_es_1">>,
|
|
|
|
|
+ sql => <<"SELECT\n *\nFROM\n \"es/#\"">>,
|
|
|
|
|
+ actions => [<<"elasticsearch:update_action">>],
|
|
|
|
|
+ description => <<"sink doc to elasticsearch">>
|
|
|
|
|
+ },
|
|
|
|
|
+ {ok, _} = emqx_rule_engine:create_rule(Rule),
|
|
|
|
|
+ %% failed to update a nonexistent doc
|
|
|
|
|
+ Expect0 = #{match => 1, success => 0, dropped => 0, failed => 1, queuing => 0},
|
|
|
|
|
+ check_send_message_with_action(<<"es/1">>, update_action, update_connector, Expect0),
|
|
|
|
|
+ %% doc_as_upsert to insert a new doc
|
|
|
|
|
+ ActionConfig2 = ActionConfig1#{
|
|
|
|
|
+ <<"parameters">> => #{
|
|
|
|
|
+ <<"index">> => <<"${payload.index}">>,
|
|
|
|
|
+ <<"id">> => DocId,
|
|
|
|
|
+ <<"action">> => <<"update">>,
|
|
|
|
|
+ <<"doc">> => <<"${payload.doc}">>,
|
|
|
|
|
+ <<"doc_as_upsert">> => true,
|
|
|
|
|
+ <<"max_retries">> => 0
|
|
|
|
|
+ }
|
|
|
|
|
+ },
|
|
|
|
|
+ {ok, _} = emqx_bridge_v2:create(?TYPE, update_action, ActionConfig2),
|
|
|
|
|
+ Expect1 = #{match => 1, success => 1, dropped => 0, failed => 0, queuing => 0},
|
|
|
|
|
+ check_send_message_with_action(<<"es/1">>, update_action, update_connector, Expect1),
|
|
|
|
|
+ %% update without doc, use msg as default
|
|
|
|
|
+ ActionConfig3 = ActionConfig1#{
|
|
|
|
|
+ <<"parameters">> => #{
|
|
|
|
|
+ <<"index">> => <<"${payload.index}">>,
|
|
|
|
|
+ <<"id">> => DocId,
|
|
|
|
|
+ <<"action">> => <<"update">>,
|
|
|
|
|
+ <<"max_retries">> => 0
|
|
|
|
|
+ }
|
|
|
|
|
+ },
|
|
|
|
|
+ {ok, _} = emqx_bridge_v2:create(?TYPE, update_action, ActionConfig3),
|
|
|
|
|
+ Expect2 = #{match => 1, success => 1, dropped => 0, failed => 0, queuing => 0},
|
|
|
|
|
+ check_send_message_with_action(<<"es/1">>, update_action, update_connector, Expect2),
|
|
|
|
|
+ %% Clean
|
|
|
|
|
+ ok = emqx_bridge_v2:remove(?TYPE, update_action),
|
|
|
|
|
+ emqx_connector:remove(?TYPE, update_connector),
|
|
|
|
|
+ lists:foreach(
|
|
|
|
|
+ fun(#{id := Id}) ->
|
|
|
|
|
+ emqx_rule_engine:delete_rule(Id)
|
|
|
|
|
+ end,
|
|
|
|
|
+ emqx_rule_engine:get_rules()
|
|
|
|
|
+ ),
|
|
|
ok.
|
|
ok.
|
|
|
|
|
|
|
|
%% Test that we can get the status of the bridge V2
|
|
%% Test that we can get the status of the bridge V2
|