Просмотр исходного кода

feat: use the new metrics to bridge APIs

Shawn 3 лет назад
Родитель
Сommit
73e19d84ee

+ 138 - 14
apps/emqx_bridge/i18n/emqx_bridge_schema.conf

@@ -78,36 +78,149 @@ emqx_bridge_schema {
                           }
                   }
 
+    metric_batched {
+                   desc {
+                         en: """Count of messages that are currently accumulated in memory waiting for sending in one batch."""
+                         zh: """当前积压在内存里,等待批量发送的消息个数"""
+                        }
+                   label: {
+                           en: "Batched"
+                           zh: "等待批量发送"
+                          }
+                  }
+
+    metric_dropped {
+                   desc {
+                         en: """Count of messages dropped."""
+                         zh: """被丢弃的消息个数。"""
+                        }
+                   label: {
+                           en: "Dropped"
+                           zh: "丢弃"
+                          }
+                  }
+
+    metric_dropped_other {
+                   desc {
+                         en: """Count of messages dropped due to other reasons."""
+                         zh: """因为其他原因被丢弃的消息个数。"""
+                        }
+                   label: {
+                           en: "Dropped Other"
+                           zh: "其他丢弃"
+                          }
+                  }
+    metric_dropped_queue_full {
+                   desc {
+                         en: """Count of messages dropped due to the queue is full."""
+                         zh: """因为队列已满被丢弃的消息个数。"""
+                        }
+                   label: {
+                           en: "Dropped Queue Full"
+                           zh: "队列已满被丢弃"
+                          }
+                  }
+    metric_dropped_queue_not_enabled {
+                   desc {
+                         en: """Count of messages dropped due to the queue is not enabled."""
+                         zh: """因为队列未启用被丢弃的消息个数。"""
+                        }
+                   label: {
+                           en: "Dropped Queue Disabled"
+                           zh: "队列未启用被丢弃"
+                          }
+                  }
+    metric_dropped_resource_not_found {
+                   desc {
+                         en: """Count of messages dropped due to the resource is not found."""
+                         zh: """因为资源不存在被丢弃的消息个数。"""
+                        }
+                   label: {
+                           en: "Dropped Resource NotFound"
+                           zh: "资源不存在被丢弃"
+                          }
+                  }
+    metric_dropped_resource_stopped {
+                   desc {
+                         en: """Count of messages dropped due to the resource is stopped."""
+                         zh: """因为资源已停用被丢弃的消息个数。"""
+                        }
+                   label: {
+                           en: "Dropped Resource Stopped"
+                           zh: "资源停用被丢弃"
+                          }
+                  }
     metric_matched {
                    desc {
-                         en: """Count of this bridge is queried"""
-                         zh: """Bridge 执行操作的次数"""
+                         en: """Count of this bridge is matched and queried."""
+                         zh: """Bridge 被匹配到(被请求)的次数。"""
+                        }
+                   label: {
+                           en: "Matched"
+                           zh: "匹配次数"
+                          }
+                  }
+
+     metric_queued {
+                   desc {
+                         en: """Count of messages that are currently queued."""
+                         zh: """当前被缓存到磁盘队列的消息个数。"""
+                        }
+                   label: {
+                           en: "Queued"
+                           zh: "被缓存"
+                          }
+                  }
+     metric_sent {
+                   desc {
+                         en: """Count of messages that are sent by this bridge."""
+                         zh: """已经发送出去的消息个数。"""
                         }
                    label: {
-                           en: "Bridge Matched"
-                           zh: "Bridge 执行操作的次数"
+                           en: "Sent"
+                           zh: "已发送"
+                          }
+                  }
+     metric_sent_exception {
+                   desc {
+                         en: """Count of messages that were sent but exceptions occur."""
+                         zh: """发送出现异常的消息个数。"""
+                        }
+                   label: {
+                           en: "Sent Exception"
+                           zh: "发送异常"
                           }
                   }
 
-    metric_success {
+     metric_sent_failed {
                    desc {
-                         en: """Count of query success"""
-                         zh: """Bridge 执行操作成功的次数"""
+                         en: """Count of messages that sent failed."""
+                         zh: """发送失败的消息个数。"""
                         }
                    label: {
-                           en: "Bridge Success"
-                           zh: "Bridge 执行操作成功的次数"
+                           en: "Sent Failed"
+                           zh: "发送失败"
                           }
                   }
 
-    metric_failed {
+    metric_sent_inflight {
+                   desc {
+                         en: """Count of messages that were sent asynchronously but ACKs are not received."""
+                         zh: """已异步地发送但没有收到 ACK 的消息个数。"""
+                        }
+                   label: {
+                           en: "Sent Inflight"
+                           zh: "已发送未确认"
+                          }
+                  }
+    metric_sent_success {
                    desc {
-                         en: """Count of query failed"""
-                         zh: """Bridge 执行操作失败的次数"""
+                         en: """Count of messages that sent successfully."""
+                         zh: """已经发送成功的消息个数。"""
                         }
                    label: {
-                           en: "Bridge Failed"
-                           zh: "Bridge 执行操作失败的次数"
+                           en: "Sent Success"
+                           zh: "发送成功"
                           }
                   }
 
@@ -144,6 +257,17 @@ emqx_bridge_schema {
                           }
                   }
 
+    metric_received {
+                   desc {
+                         en: """Count of messages that is received from the remote system."""
+                         zh: """从远程系统收到的消息个数。"""
+                        }
+                   label: {
+                           en: "Received"
+                           zh: "已接收"
+                          }
+                  }
+
     desc_bridges {
                    desc {
                          en: """Configuration for MQTT bridges."""

+ 89 - 0
apps/emqx_bridge/include/emqx_bridge.hrl

@@ -0,0 +1,89 @@
+-define(EMPTY_METRICS,
+    ?METRICS(
+        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
+    )
+).
+
+-define(METRICS(
+    Batched,
+    Dropped,
+    DroppedOther,
+    DroppedQueueFull,
+    DroppedQueueNotEnabled,
+    DroppedResourceNotFound,
+    DroppedResourceStopped,
+    Matched,
+    Queued,
+    Sent,
+    SentExcpt,
+    SentFailed,
+    SentInflight,
+    SentSucc,
+    RATE,
+    RATE_5,
+    RATE_MAX,
+    Rcvd
+),
+    #{
+        'batched' => Batched,
+        'dropped' => Dropped,
+        'dropped.other' => DroppedOther,
+        'dropped.queue_full' => DroppedQueueFull,
+        'dropped.queue_not_enabled' => DroppedQueueNotEnabled,
+        'dropped.resource_not_found' => DroppedResourceNotFound,
+        'dropped.resource_stopped' => DroppedResourceStopped,
+        'matched' => Matched,
+        'queued' => Queued,
+        'sent' => Sent,
+        'sent.exception' => SentExcpt,
+        'sent.failed' => SentFailed,
+        'sent.inflight' => SentInflight,
+        'sent.success' => SentSucc,
+        rate => RATE,
+        rate_last5m => RATE_5,
+        rate_max => RATE_MAX,
+        received => Rcvd
+    }
+).
+
+-define(metrics(
+    Batched,
+    Dropped,
+    DroppedOther,
+    DroppedQueueFull,
+    DroppedQueueNotEnabled,
+    DroppedResourceNotFound,
+    DroppedResourceStopped,
+    Matched,
+    Queued,
+    Sent,
+    SentExcpt,
+    SentFailed,
+    SentInflight,
+    SentSucc,
+    RATE,
+    RATE_5,
+    RATE_MAX,
+    Rcvd
+),
+    #{
+        'batched' := Batched,
+        'dropped' := Dropped,
+        'dropped.other' := DroppedOther,
+        'dropped.queue_full' := DroppedQueueFull,
+        'dropped.queue_not_enabled' := DroppedQueueNotEnabled,
+        'dropped.resource_not_found' := DroppedResourceNotFound,
+        'dropped.resource_stopped' := DroppedResourceStopped,
+        'matched' := Matched,
+        'queued' := Queued,
+        'sent' := Sent,
+        'sent.exception' := SentExcpt,
+        'sent.failed' := SentFailed,
+        'sent.inflight' := SentInflight,
+        'sent.success' := SentSucc,
+        rate := RATE,
+        rate_last5m := RATE_5,
+        rate_max := RATE_MAX,
+        received := Rcvd
+    }
+).

+ 5 - 94
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -20,6 +20,7 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
 
 -import(hoconsc, [mk/2, array/1, enum/1]).
 
@@ -57,96 +58,6 @@
     end
 ).
 
--define(EMPTY_METRICS,
-    ?METRICS(
-        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
-    )
-).
-
--define(METRICS(
-    Batched,
-    Dropped,
-    DroppedOther,
-    DroppedQueueFull,
-    DroppedQueueNotEnabled,
-    DroppedResourceNotFound,
-    DroppedResourceStopped,
-    Matched,
-    Queued,
-    Retried,
-    Sent,
-    SentExcpt,
-    SentFailed,
-    SentInflight,
-    SentSucc,
-    RATE,
-    RATE_5,
-    RATE_MAX
-),
-    #{
-        'batched' => Batched,
-        'dropped' => Dropped,
-        'dropped.other' => DroppedOther,
-        'dropped.queue_full' => DroppedQueueFull,
-        'dropped.queue_not_enabled' => DroppedQueueNotEnabled,
-        'dropped.resource_not_found' => DroppedResourceNotFound,
-        'dropped.resource_stopped' => DroppedResourceStopped,
-        'matched' => Matched,
-        'queued' => Queued,
-        'retried' => Retried,
-        'sent' => Sent,
-        'sent.exception' => SentExcpt,
-        'sent.failed' => SentFailed,
-        'sent.inflight' => SentInflight,
-        'sent.success' => SentSucc,
-        rate => RATE,
-        rate_last5m => RATE_5,
-        rate_max => RATE_MAX
-    }
-).
-
--define(metrics(
-    Batched,
-    Dropped,
-    DroppedOther,
-    DroppedQueueFull,
-    DroppedQueueNotEnabled,
-    DroppedResourceNotFound,
-    DroppedResourceStopped,
-    Matched,
-    Queued,
-    Retried,
-    Sent,
-    SentExcpt,
-    SentFailed,
-    SentInflight,
-    SentSucc,
-    RATE,
-    RATE_5,
-    RATE_MAX
-),
-    #{
-        'batched' := Batched,
-        'dropped' := Dropped,
-        'dropped.other' := DroppedOther,
-        'dropped.queue_full' := DroppedQueueFull,
-        'dropped.queue_not_enabled' := DroppedQueueNotEnabled,
-        'dropped.resource_not_found' := DroppedResourceNotFound,
-        'dropped.resource_stopped' := DroppedResourceStopped,
-        'matched' := Matched,
-        'queued' := Queued,
-        'retried' := Retried,
-        'sent' := Sent,
-        'sent.exception' := SentExcpt,
-        'sent.failed' := SentFailed,
-        'sent.inflight' := SentInflight,
-        'sent.success' := SentSucc,
-        rate := RATE,
-        rate_last5m := RATE_5,
-        rate_max := RATE_MAX
-    }
-).
-
 namespace() -> "bridge".
 
 api_spec() ->
@@ -770,12 +681,12 @@ format_metrics(#{
         'dropped.resource_stopped' := DroppedResourceStopped,
         'matched' := Matched,
         'queued' := Queued,
-        'retried' := Retried,
         'sent' := Sent,
         'sent.exception' := SentExcpt,
         'sent.failed' := SentFailed,
         'sent.inflight' := SentInflight,
-        'sent.success' := SentSucc
+        'sent.success' := SentSucc,
+        'received' := Rcvd
     },
     rate := #{
         matched := #{current := Rate, last5m := Rate5m, max := RateMax}
@@ -791,7 +702,6 @@ format_metrics(#{
         DroppedResourceStopped,
         Matched,
         Queued,
-        Retried,
         Sent,
         SentExcpt,
         SentFailed,
@@ -799,7 +709,8 @@ format_metrics(#{
         SentSucc,
         Rate,
         Rate5m,
-        RateMax
+        RateMax,
+        Rcvd
     ).
 
 fill_defaults(Type, RawConf) ->

+ 18 - 3
apps/emqx_bridge/src/schema/emqx_bridge_schema.erl

@@ -102,16 +102,31 @@ fields(bridges) ->
     ] ++ ee_fields_bridges();
 fields("metrics") ->
     [
+        {"batched", mk(integer(), #{desc => ?DESC("metric_batched")})},
+        {"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})},
+        {"dropped.other", mk(integer(), #{desc => ?DESC("metric_dropped_other")})},
+        {"dropped.queue_full", mk(integer(), #{desc => ?DESC("metric_dropped_queue_full")})},
+        {"dropped.queue_not_enabled",
+            mk(integer(), #{desc => ?DESC("metric_dropped_queue_not_enabled")})},
+        {"dropped.resource_not_found",
+            mk(integer(), #{desc => ?DESC("metric_dropped_resource_not_found")})},
+        {"dropped.resource_stopped",
+            mk(integer(), #{desc => ?DESC("metric_dropped_resource_stopped")})},
         {"matched", mk(integer(), #{desc => ?DESC("metric_matched")})},
-        {"success", mk(integer(), #{desc => ?DESC("metric_success")})},
-        {"failed", mk(integer(), #{desc => ?DESC("metric_failed")})},
+        {"queued", mk(integer(), #{desc => ?DESC("metric_queued")})},
+        {"sent", mk(integer(), #{desc => ?DESC("metric_sent")})},
+        {"sent.exception", mk(integer(), #{desc => ?DESC("metric_sent_exception")})},
+        {"sent.failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})},
+        {"sent.inflight", mk(integer(), #{desc => ?DESC("metric_sent_inflight")})},
+        {"sent.success", mk(integer(), #{desc => ?DESC("metric_sent_success")})},
         {"rate", mk(float(), #{desc => ?DESC("metric_rate")})},
         {"rate_max", mk(float(), #{desc => ?DESC("metric_rate_max")})},
         {"rate_last5m",
             mk(
                 float(),
                 #{desc => ?DESC("metric_rate_last5m")}
-            )}
+            )},
+        {"received", mk(float(), #{desc => ?DESC("metric_received")})}
     ];
 fields("node_metrics") ->
     [

+ 51 - 13
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -66,15 +66,6 @@
     }
 }).
 
--define(metrics(MATCH, SUCC, FAILED, SPEED, SPEED5M, SPEEDMAX), #{
-    <<"matched">> := MATCH,
-    <<"success">> := SUCC,
-    <<"failed">> := FAILED,
-    <<"rate">> := SPEED,
-    <<"rate_last5m">> := SPEED5M,
-    <<"rate_max">> := SPEEDMAX
-}).
-
 inspect(Selected, _Envs, _Args) ->
     persistent_term:put(?MODULE, #{inspect => Selected}).
 
@@ -185,6 +176,23 @@ t_mqtt_conn_bridge_ingress(_) ->
         end
     ),
 
+    %% verify the metrics of the bridge
+    {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []),
+    ?assertMatch(
+        #{
+            <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
+            <<"node_metrics">> :=
+                [
+                    #{
+                        <<"node">> := _,
+                        <<"metrics">> :=
+                            #{<<"matched">> := 0, <<"received">> := 1}
+                    }
+                ]
+        },
+        jsx:decode(BridgeStr)
+    ),
+
     %% delete the bridge
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
@@ -237,9 +245,15 @@ t_mqtt_conn_bridge_egress(_) ->
     {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
     ?assertMatch(
         #{
-            <<"metrics">> := ?metrics(1, 1, 0, _, _, _),
+            <<"metrics">> := #{<<"matched">> := 1, <<"sent.success">> := 1, <<"sent.failed">> := 0},
             <<"node_metrics">> :=
-                [#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}]
+                [
+                    #{
+                        <<"node">> := _,
+                        <<"metrics">> :=
+                            #{<<"matched">> := 1, <<"sent.success">> := 1, <<"sent.failed">> := 0}
+                    }
+                ]
         },
         jsx:decode(BridgeStr)
     ),
@@ -337,6 +351,23 @@ t_ingress_mqtt_bridge_with_rules(_) ->
         persistent_term:get(?MODULE)
     ),
 
+    %% verify the metrics of the bridge
+    {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []),
+    ?assertMatch(
+        #{
+            <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
+            <<"node_metrics">> :=
+                [
+                    #{
+                        <<"node">> := _,
+                        <<"metrics">> :=
+                            #{<<"matched">> := 0, <<"received">> := 1}
+                    }
+                ]
+        },
+        jsx:decode(BridgeStr)
+    ),
+
     {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []).
 
@@ -433,9 +464,16 @@ t_egress_mqtt_bridge_with_rules(_) ->
     {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
     ?assertMatch(
         #{
-            <<"metrics">> := ?metrics(2, 2, 0, _, _, _),
+            <<"metrics">> := #{<<"matched">> := 2, <<"sent.success">> := 2, <<"sent.failed">> := 0},
             <<"node_metrics">> :=
-                [#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}]
+                [
+                    #{
+                        <<"node">> := _,
+                        <<"metrics">> := #{
+                            <<"matched">> := 2, <<"sent.success">> := 2, <<"sent.failed">> := 0
+                        }
+                    }
+                ]
         },
         jsx:decode(BridgeStr)
     ),

+ 1 - 2
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -135,8 +135,7 @@ drop_bridge(Name) ->
 %% When use this bridge as a data source, ?MODULE:on_message_received will be called
 %% if the bridge received msgs from the remote broker.
 on_message_received(Msg, HookPoint, ResId) ->
-    emqx_resource:inc_matched(ResId),
-    emqx_resource:inc_success(ResId),
+    emqx_resource:inc_received(ResId),
     emqx:run_hook(HookPoint, [Msg]).
 
 %% ===================================================================

+ 3 - 14
apps/emqx_resource/src/emqx_resource.erl

@@ -110,7 +110,7 @@
     list_group_instances/1
 ]).
 
--export([inc_metrics_funcs/1, inc_matched/1, inc_success/1, inc_failed/1]).
+-export([inc_received/1]).
 
 -optional_callbacks([
     on_query/3,
@@ -443,19 +443,8 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
 
 %% =================================================================================
 
-inc_matched(ResId) ->
-    emqx_metrics_worker:inc(?RES_METRICS, ResId, matched).
-
-inc_success(ResId) ->
-    emqx_metrics_worker:inc(?RES_METRICS, ResId, success).
-
-inc_failed(ResId) ->
-    emqx_metrics_worker:inc(?RES_METRICS, ResId, failed).
+inc_received(ResId) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ResId, 'received').
 
 filter_instances(Filter) ->
     [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
-
-inc_metrics_funcs(ResId) ->
-    OnSucc = [{fun ?MODULE:inc_success/1, ResId}],
-    OnFailed = [{fun ?MODULE:inc_failed/1, ResId}],
-    {OnSucc, OnFailed}.

+ 2 - 2
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -134,7 +134,6 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
             'dropped',
             'queued',
             'batched',
-            'retried',
             'sent.success',
             'sent.failed',
             'sent.exception',
@@ -143,7 +142,8 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
             'dropped.queue_full',
             'dropped.resource_not_found',
             'dropped.resource_stopped',
-            'dropped.other'
+            'dropped.other',
+            'received'
         ],
         [matched]
     ),

+ 6 - 2
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -131,7 +131,7 @@ init({Id, Index, Opts}) ->
             false ->
                 undefined
         end,
-    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', replayq:count(Queue)),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', queue_count(Queue)),
     ok = inflight_new(Name),
     St = #{
         id => Id,
@@ -254,7 +254,6 @@ retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = S
     case handle_query_result(Id, Result, false) of
         %% Send failed because resource down
         true ->
-            emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'),
             {keep_state, St0, {state_timeout, ResumeT, resume}};
         %% Send ok or failed but the resource is working
         false ->
@@ -569,6 +568,11 @@ assert_ok_result(R) when is_tuple(R) ->
 assert_ok_result(R) ->
     error({not_ok_result, R}).
 
+queue_count(undefined) ->
+    0;
+queue_count(Q) ->
+    replayq:count(Q).
+
 -spec name(id(), integer()) -> atom().
 name(Id, Index) ->
     Mod = atom_to_list(?MODULE),

+ 4 - 4
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -268,7 +268,7 @@ t_query_counter_async_query(_) ->
         end
     ),
     {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
-    ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C),
+    ?assertMatch(#{matched := 1002, 'sent.success' := 1002, 'sent.failed' := 0}, C),
     ok = emqx_resource:remove_local(?ID).
 
 t_query_counter_async_callback(_) ->
@@ -309,7 +309,7 @@ t_query_counter_async_callback(_) ->
         end
     ),
     {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
-    ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C),
+    ?assertMatch(#{matched := 1002, sent := 1002, 'sent.success' := 1002, 'sent.failed' := 0}, C),
     ?assertMatch(1000, ets:info(Tab0, size)),
     ?assert(
         lists:all(
@@ -419,8 +419,8 @@ t_query_counter_async_inflight(_) ->
     {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
     ct:pal("metrics: ~p", [C]),
     ?assertMatch(
-        #{matched := M, success := S, exception := E, failed := F, recoverable_error := RD} when
-            M >= Sent andalso M == S + E + F + RD,
+        #{matched := M, sent := St, 'sent.success' := Ss, dropped := D} when
+            St == Ss andalso M == St + D,
         C
     ),
     ?assert(

+ 10 - 8
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -62,14 +62,16 @@ test_rule(Sql, Select, Context, EventTopics) ->
     },
     FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)),
     try emqx_rule_runtime:apply_rule(Rule, FullContext, #{}) of
-        {ok, Data} -> {ok, flatten(Data)};
-        {error, Reason} -> {error, Reason}
+        {ok, Data} ->
+            {ok, flatten(Data)};
+        {error, Reason} ->
+            {error, Reason}
     after
         ok = emqx_rule_engine:clear_metrics_for_rule(RuleId)
     end.
 
 get_selected_data(Selected, _Envs, _Args) ->
-    Selected.
+    {ok, Selected}.
 
 is_publish_topic(<<"$events/", _/binary>>) -> false;
 is_publish_topic(<<"$bridges/", _/binary>>) -> false;
@@ -77,14 +79,14 @@ is_publish_topic(_Topic) -> true.
 
 flatten([]) ->
     [];
-flatten([D1]) ->
-    D1;
-flatten([D1 | L]) when is_list(D1) ->
-    D1 ++ flatten(L).
+flatten([{ok, D}]) ->
+    D;
+flatten([D | L]) when is_list(D) ->
+    [D0 || {ok, D0} <- D] ++ flatten(L).
 
 echo_action(Data, Envs) ->
     ?TRACE("RULE", "testing_rule_sql_ok", #{data => Data, envs => Envs}),
-    Data.
+    {ok, Data}.
 
 fill_default_values(Event, Context) ->
     maps:merge(envs_examp(Event), Context).