Sfoglia il codice sorgente

Merge branch 'dev/ee5.0' into refactor-mqtt-bridge

JianBo He 3 anni fa
parent
commit
a5ac5b6f3a
28 ha cambiato i file con 647 aggiunte e 230 eliminazioni
  1. 1 1
      apps/emqx/src/emqx_metrics_worker.erl
  2. 138 14
      apps/emqx_bridge/i18n/emqx_bridge_schema.conf
  3. 99 0
      apps/emqx_bridge/include/emqx_bridge.hrl
  4. 77 31
      apps/emqx_bridge/src/emqx_bridge_api.erl
  5. 18 3
      apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
  6. 51 13
      apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl
  7. 22 13
      apps/emqx_connector/src/emqx_connector_http.erl
  8. 28 18
      apps/emqx_connector/src/emqx_connector_mqtt.erl
  9. 7 0
      apps/emqx_connector/src/emqx_connector_mysql.erl
  10. 2 2
      apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
  11. 1 1
      apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf
  12. 7 4
      apps/emqx_resource/include/emqx_resource.hrl
  13. 3 14
      apps/emqx_resource/src/emqx_resource.erl
  14. 28 1
      apps/emqx_resource/src/emqx_resource_manager.erl
  15. 94 39
      apps/emqx_resource/src/emqx_resource_worker.erl
  16. 3 3
      apps/emqx_resource/src/emqx_resource_worker_sup.erl
  17. 6 6
      apps/emqx_resource/src/schema/emqx_resource_schema.erl
  18. 1 1
      apps/emqx_resource/test/emqx_connector_demo.erl
  19. 4 4
      apps/emqx_resource/test/emqx_resource_SUITE.erl
  20. 7 7
      apps/emqx_rule_engine/include/rule_engine.hrl
  21. 27 13
      apps/emqx_rule_engine/src/emqx_rule_runtime.erl
  22. 10 8
      apps/emqx_rule_engine/src/emqx_rule_sqltester.erl
  23. 0 18
      lib-ee/emqx_ee_bridge/include/emqx_ee_bridge.hrl
  24. 1 1
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl
  25. 1 1
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl
  26. 1 1
      lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl
  27. 4 2
      lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl
  28. 6 11
      lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl

+ 1 - 1
apps/emqx/src/emqx_metrics_worker.erl

@@ -173,7 +173,7 @@ get_metrics(Name, Id) ->
 inc(Name, Id, Metric) ->
     inc(Name, Id, Metric, 1).
 
--spec inc(handler_name(), metric_id(), atom(), pos_integer()) -> ok.
+-spec inc(handler_name(), metric_id(), atom(), integer()) -> ok.
 inc(Name, Id, Metric, Val) ->
     counters:add(get_ref(Name, Id), idx_metric(Name, Id, Metric), Val).
 

+ 138 - 14
apps/emqx_bridge/i18n/emqx_bridge_schema.conf

@@ -78,36 +78,149 @@ emqx_bridge_schema {
                           }
                   }
 
+    metric_batched {
+                   desc {
+                         en: """Count of messages that are currently accumulated in memory waiting for sending in one batch."""
+                         zh: """当前积压在内存里,等待批量发送的消息个数"""
+                        }
+                   label: {
+                           en: "Batched"
+                           zh: "等待批量发送"
+                          }
+                  }
+
+    metric_dropped {
+                   desc {
+                         en: """Count of messages dropped."""
+                         zh: """被丢弃的消息个数。"""
+                        }
+                   label: {
+                           en: "Dropped"
+                           zh: "丢弃"
+                          }
+                  }
+
+    metric_dropped_other {
+                   desc {
+                         en: """Count of messages dropped due to other reasons."""
+                         zh: """因为其他原因被丢弃的消息个数。"""
+                        }
+                   label: {
+                           en: "Dropped Other"
+                           zh: "其他丢弃"
+                          }
+                  }
+    metric_dropped_queue_full {
+                   desc {
+                         en: """Count of messages dropped due to the queue is full."""
+                         zh: """因为队列已满被丢弃的消息个数。"""
+                        }
+                   label: {
+                           en: "Dropped Queue Full"
+                           zh: "队列已满被丢弃"
+                          }
+                  }
+    metric_dropped_queue_not_enabled {
+                   desc {
+                         en: """Count of messages dropped due to the queue is not enabled."""
+                         zh: """因为队列未启用被丢弃的消息个数。"""
+                        }
+                   label: {
+                           en: "Dropped Queue Disabled"
+                           zh: "队列未启用被丢弃"
+                          }
+                  }
+    metric_dropped_resource_not_found {
+                   desc {
+                         en: """Count of messages dropped due to the resource is not found."""
+                         zh: """因为资源不存在被丢弃的消息个数。"""
+                        }
+                   label: {
+                           en: "Dropped Resource NotFound"
+                           zh: "资源不存在被丢弃"
+                          }
+                  }
+    metric_dropped_resource_stopped {
+                   desc {
+                         en: """Count of messages dropped due to the resource is stopped."""
+                         zh: """因为资源已停用被丢弃的消息个数。"""
+                        }
+                   label: {
+                           en: "Dropped Resource Stopped"
+                           zh: "资源停用被丢弃"
+                          }
+                  }
     metric_matched {
                    desc {
-                         en: """Count of this bridge is queried"""
-                         zh: """Bridge 执行操作的次数"""
+                         en: """Count of this bridge is matched and queried."""
+                         zh: """Bridge 被匹配到(被请求)的次数。"""
+                        }
+                   label: {
+                           en: "Matched"
+                           zh: "匹配次数"
+                          }
+                  }
+
+     metric_queued {
+                   desc {
+                         en: """Count of messages that are currently queued."""
+                         zh: """当前被缓存到磁盘队列的消息个数。"""
+                        }
+                   label: {
+                           en: "Queued"
+                           zh: "被缓存"
+                          }
+                  }
+     metric_sent {
+                   desc {
+                         en: """Count of messages that are sent by this bridge."""
+                         zh: """已经发送出去的消息个数。"""
                         }
                    label: {
-                           en: "Bridge Matched"
-                           zh: "Bridge 执行操作的次数"
+                           en: "Sent"
+                           zh: "已发送"
+                          }
+                  }
+     metric_sent_exception {
+                   desc {
+                         en: """Count of messages that were sent but exceptions occur."""
+                         zh: """发送出现异常的消息个数。"""
+                        }
+                   label: {
+                           en: "Sent Exception"
+                           zh: "发送异常"
                           }
                   }
 
-    metric_success {
+     metric_sent_failed {
                    desc {
-                         en: """Count of query success"""
-                         zh: """Bridge 执行操作成功的次数"""
+                         en: """Count of messages that sent failed."""
+                         zh: """发送失败的消息个数。"""
                         }
                    label: {
-                           en: "Bridge Success"
-                           zh: "Bridge 执行操作成功的次数"
+                           en: "Sent Failed"
+                           zh: "发送失败"
                           }
                   }
 
-    metric_failed {
+    metric_sent_inflight {
+                   desc {
+                         en: """Count of messages that were sent asynchronously but ACKs are not received."""
+                         zh: """已异步地发送但没有收到 ACK 的消息个数。"""
+                        }
+                   label: {
+                           en: "Sent Inflight"
+                           zh: "已发送未确认"
+                          }
+                  }
+    metric_sent_success {
                    desc {
-                         en: """Count of query failed"""
-                         zh: """Bridge 执行操作失败的次数"""
+                         en: """Count of messages that sent successfully."""
+                         zh: """已经发送成功的消息个数。"""
                         }
                    label: {
-                           en: "Bridge Failed"
-                           zh: "Bridge 执行操作失败的次数"
+                           en: "Sent Success"
+                           zh: "发送成功"
                           }
                   }
 
@@ -144,6 +257,17 @@ emqx_bridge_schema {
                           }
                   }
 
+    metric_received {
+                   desc {
+                         en: """Count of messages that is received from the remote system."""
+                         zh: """从远程系统收到的消息个数。"""
+                        }
+                   label: {
+                           en: "Received"
+                           zh: "已接收"
+                          }
+                  }
+
     desc_bridges {
                    desc {
                          en: """Configuration for MQTT bridges."""

+ 99 - 0
apps/emqx_bridge/include/emqx_bridge.hrl

@@ -0,0 +1,99 @@
+-define(EMPTY_METRICS,
+    ?METRICS(
+        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
+    )
+).
+
+-define(METRICS(
+    Batched,
+    Dropped,
+    DroppedOther,
+    DroppedQueueFull,
+    DroppedQueueNotEnabled,
+    DroppedResourceNotFound,
+    DroppedResourceStopped,
+    Matched,
+    Queued,
+    Sent,
+    SentExcpt,
+    SentFailed,
+    SentInflight,
+    SentSucc,
+    RATE,
+    RATE_5,
+    RATE_MAX,
+    Rcvd
+),
+    #{
+        'batched' => Batched,
+        'dropped' => Dropped,
+        'dropped.other' => DroppedOther,
+        'dropped.queue_full' => DroppedQueueFull,
+        'dropped.queue_not_enabled' => DroppedQueueNotEnabled,
+        'dropped.resource_not_found' => DroppedResourceNotFound,
+        'dropped.resource_stopped' => DroppedResourceStopped,
+        'matched' => Matched,
+        'queued' => Queued,
+        'sent' => Sent,
+        'sent.exception' => SentExcpt,
+        'sent.failed' => SentFailed,
+        'sent.inflight' => SentInflight,
+        'sent.success' => SentSucc,
+        rate => RATE,
+        rate_last5m => RATE_5,
+        rate_max => RATE_MAX,
+        received => Rcvd
+    }
+).
+
+-define(metrics(
+    Batched,
+    Dropped,
+    DroppedOther,
+    DroppedQueueFull,
+    DroppedQueueNotEnabled,
+    DroppedResourceNotFound,
+    DroppedResourceStopped,
+    Matched,
+    Queued,
+    Sent,
+    SentExcpt,
+    SentFailed,
+    SentInflight,
+    SentSucc,
+    RATE,
+    RATE_5,
+    RATE_MAX,
+    Rcvd
+),
+    #{
+        'batched' := Batched,
+        'dropped' := Dropped,
+        'dropped.other' := DroppedOther,
+        'dropped.queue_full' := DroppedQueueFull,
+        'dropped.queue_not_enabled' := DroppedQueueNotEnabled,
+        'dropped.resource_not_found' := DroppedResourceNotFound,
+        'dropped.resource_stopped' := DroppedResourceStopped,
+        'matched' := Matched,
+        'queued' := Queued,
+        'sent' := Sent,
+        'sent.exception' := SentExcpt,
+        'sent.failed' := SentFailed,
+        'sent.inflight' := SentInflight,
+        'sent.success' := SentSucc,
+        rate := RATE,
+        rate_last5m := RATE_5,
+        rate_max := RATE_MAX,
+        received := Rcvd
+    }
+).
+
+-define(METRICS_EXAMPLE, #{
+    metrics => ?EMPTY_METRICS,
+    node_metrics => [
+        #{
+            node => node(),
+            metrics => ?EMPTY_METRICS
+        }
+    ]
+}).

+ 77 - 31
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -21,6 +21,7 @@
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
 
 -import(hoconsc, [mk/2, array/1, enum/1]).
 
@@ -58,23 +59,6 @@
     end
 ).
 
--define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{
-    matched => MATCH,
-    success => SUCC,
-    failed => FAILED,
-    rate => RATE,
-    rate_last5m => RATE_5,
-    rate_max => RATE_MAX
-}).
--define(metrics(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{
-    matched := MATCH,
-    success := SUCC,
-    failed := FAILED,
-    rate := RATE,
-    rate_last5m := RATE_5,
-    rate_max := RATE_MAX
-}).
-
 namespace() -> "bridge".
 
 api_spec() ->
@@ -194,11 +178,11 @@ method_example(_Type, put) ->
 
 maybe_with_metrics_example(TypeNameExam, get) ->
     TypeNameExam#{
-        metrics => ?METRICS(0, 0, 0, 0, 0, 0),
+        metrics => ?EMPTY_METRICS,
         node_metrics => [
             #{
                 node => node(),
-                metrics => ?METRICS(0, 0, 0, 0, 0, 0)
+                metrics => ?EMPTY_METRICS
             }
         ]
     };
@@ -218,7 +202,16 @@ info_example_basic(webhook) ->
         ssl => #{enable => false},
         local_topic => <<"emqx_webhook/#">>,
         method => post,
-        body => <<"${payload}">>
+        body => <<"${payload}">>,
+        resource_opts => #{
+            worker_pool_size => 1,
+            health_check_interval => 15000,
+            auto_restart_interval => 15000,
+            query_mode => sync,
+            async_inflight_window => 100,
+            enable_queue => true,
+            max_queue_bytes => 1024 * 1024 * 1024
+        }
     };
 info_example_basic(mqtt) ->
     (mqtt_main_example())#{
@@ -627,19 +620,37 @@ collect_metrics(Bridges) ->
     [maps:with([node, metrics], B) || B <- Bridges].
 
 aggregate_metrics(AllMetrics) ->
-    InitMetrics = ?METRICS(0, 0, 0, 0, 0, 0),
+    InitMetrics = ?EMPTY_METRICS,
     lists:foldl(
         fun(
-            #{metrics := ?metrics(Match1, Succ1, Failed1, Rate1, Rate5m1, RateMax1)},
-            ?metrics(Match0, Succ0, Failed0, Rate0, Rate5m0, RateMax0)
+            #{
+                metrics := ?metrics(
+                    M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17, M18
+                )
+            },
+            ?metrics(
+                N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17, N18
+            )
         ) ->
             ?METRICS(
-                Match1 + Match0,
-                Succ1 + Succ0,
-                Failed1 + Failed0,
-                Rate1 + Rate0,
-                Rate5m1 + Rate5m0,
-                RateMax1 + RateMax0
+                M1 + N1,
+                M2 + N2,
+                M3 + N3,
+                M4 + N4,
+                M5 + N5,
+                M6 + N6,
+                M7 + N7,
+                M8 + N8,
+                M9 + N9,
+                M10 + N10,
+                M11 + N11,
+                M12 + N12,
+                M13 + N13,
+                M14 + N14,
+                M15 + N15,
+                M16 + N16,
+                M17 + N17,
+                M18 + N18
             )
         end,
         InitMetrics,
@@ -668,12 +679,47 @@ format_resp(
     }.
 
 format_metrics(#{
-    counters := #{failed := Failed, exception := Ex, matched := Match, success := Succ},
+    counters := #{
+        'batched' := Batched,
+        'dropped' := Dropped,
+        'dropped.other' := DroppedOther,
+        'dropped.queue_full' := DroppedQueueFull,
+        'dropped.queue_not_enabled' := DroppedQueueNotEnabled,
+        'dropped.resource_not_found' := DroppedResourceNotFound,
+        'dropped.resource_stopped' := DroppedResourceStopped,
+        'matched' := Matched,
+        'queued' := Queued,
+        'sent' := Sent,
+        'sent.exception' := SentExcpt,
+        'sent.failed' := SentFailed,
+        'sent.inflight' := SentInflight,
+        'sent.success' := SentSucc,
+        'received' := Rcvd
+    },
     rate := #{
         matched := #{current := Rate, last5m := Rate5m, max := RateMax}
     }
 }) ->
-    ?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax).
+    ?METRICS(
+        Batched,
+        Dropped,
+        DroppedOther,
+        DroppedQueueFull,
+        DroppedQueueNotEnabled,
+        DroppedResourceNotFound,
+        DroppedResourceStopped,
+        Matched,
+        Queued,
+        Sent,
+        SentExcpt,
+        SentFailed,
+        SentInflight,
+        SentSucc,
+        Rate,
+        Rate5m,
+        RateMax,
+        Rcvd
+    ).
 
 fill_defaults(Type, RawConf) ->
     PackedConf = pack_bridge_conf(Type, RawConf),

+ 18 - 3
apps/emqx_bridge/src/schema/emqx_bridge_schema.erl

@@ -102,16 +102,31 @@ fields(bridges) ->
     ] ++ ee_fields_bridges();
 fields("metrics") ->
     [
+        {"batched", mk(integer(), #{desc => ?DESC("metric_batched")})},
+        {"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})},
+        {"dropped.other", mk(integer(), #{desc => ?DESC("metric_dropped_other")})},
+        {"dropped.queue_full", mk(integer(), #{desc => ?DESC("metric_dropped_queue_full")})},
+        {"dropped.queue_not_enabled",
+            mk(integer(), #{desc => ?DESC("metric_dropped_queue_not_enabled")})},
+        {"dropped.resource_not_found",
+            mk(integer(), #{desc => ?DESC("metric_dropped_resource_not_found")})},
+        {"dropped.resource_stopped",
+            mk(integer(), #{desc => ?DESC("metric_dropped_resource_stopped")})},
         {"matched", mk(integer(), #{desc => ?DESC("metric_matched")})},
-        {"success", mk(integer(), #{desc => ?DESC("metric_success")})},
-        {"failed", mk(integer(), #{desc => ?DESC("metric_failed")})},
+        {"queued", mk(integer(), #{desc => ?DESC("metric_queued")})},
+        {"sent", mk(integer(), #{desc => ?DESC("metric_sent")})},
+        {"sent.exception", mk(integer(), #{desc => ?DESC("metric_sent_exception")})},
+        {"sent.failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})},
+        {"sent.inflight", mk(integer(), #{desc => ?DESC("metric_sent_inflight")})},
+        {"sent.success", mk(integer(), #{desc => ?DESC("metric_sent_success")})},
         {"rate", mk(float(), #{desc => ?DESC("metric_rate")})},
         {"rate_max", mk(float(), #{desc => ?DESC("metric_rate_max")})},
         {"rate_last5m",
             mk(
                 float(),
                 #{desc => ?DESC("metric_rate_last5m")}
-            )}
+            )},
+        {"received", mk(float(), #{desc => ?DESC("metric_received")})}
     ];
 fields("node_metrics") ->
     [

+ 51 - 13
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -66,15 +66,6 @@
     }
 }).
 
--define(metrics(MATCH, SUCC, FAILED, SPEED, SPEED5M, SPEEDMAX), #{
-    <<"matched">> := MATCH,
-    <<"success">> := SUCC,
-    <<"failed">> := FAILED,
-    <<"rate">> := SPEED,
-    <<"rate_last5m">> := SPEED5M,
-    <<"rate_max">> := SPEEDMAX
-}).
-
 inspect(Selected, _Envs, _Args) ->
     persistent_term:put(?MODULE, #{inspect => Selected}).
 
@@ -185,6 +176,23 @@ t_mqtt_conn_bridge_ingress(_) ->
         end
     ),
 
+    %% verify the metrics of the bridge
+    {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []),
+    ?assertMatch(
+        #{
+            <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
+            <<"node_metrics">> :=
+                [
+                    #{
+                        <<"node">> := _,
+                        <<"metrics">> :=
+                            #{<<"matched">> := 0, <<"received">> := 1}
+                    }
+                ]
+        },
+        jsx:decode(BridgeStr)
+    ),
+
     %% delete the bridge
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
     {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
@@ -237,9 +245,15 @@ t_mqtt_conn_bridge_egress(_) ->
     {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
     ?assertMatch(
         #{
-            <<"metrics">> := ?metrics(1, 1, 0, _, _, _),
+            <<"metrics">> := #{<<"matched">> := 1, <<"sent.success">> := 1, <<"sent.failed">> := 0},
             <<"node_metrics">> :=
-                [#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}]
+                [
+                    #{
+                        <<"node">> := _,
+                        <<"metrics">> :=
+                            #{<<"matched">> := 1, <<"sent.success">> := 1, <<"sent.failed">> := 0}
+                    }
+                ]
         },
         jsx:decode(BridgeStr)
     ),
@@ -337,6 +351,23 @@ t_ingress_mqtt_bridge_with_rules(_) ->
         persistent_term:get(?MODULE)
     ),
 
+    %% verify the metrics of the bridge
+    {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []),
+    ?assertMatch(
+        #{
+            <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
+            <<"node_metrics">> :=
+                [
+                    #{
+                        <<"node">> := _,
+                        <<"metrics">> :=
+                            #{<<"matched">> := 0, <<"received">> := 1}
+                    }
+                ]
+        },
+        jsx:decode(BridgeStr)
+    ),
+
     {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
     {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []).
 
@@ -433,9 +464,16 @@ t_egress_mqtt_bridge_with_rules(_) ->
     {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
     ?assertMatch(
         #{
-            <<"metrics">> := ?metrics(2, 2, 0, _, _, _),
+            <<"metrics">> := #{<<"matched">> := 2, <<"sent.success">> := 2, <<"sent.failed">> := 0},
             <<"node_metrics">> :=
-                [#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}]
+                [
+                    #{
+                        <<"node">> := _,
+                        <<"metrics">> := #{
+                            <<"matched">> := 2, <<"sent.success">> := 2, <<"sent.failed">> := 0
+                        }
+                    }
+                ]
         },
         jsx:decode(BridgeStr)
     ),

+ 22 - 13
apps/emqx_connector/src/emqx_connector_http.erl

@@ -275,7 +275,7 @@ on_query(
     ),
     NRequest = formalize_request(Method, BasePath, Request),
     case
-        Result = ehttpc:request(
+        ehttpc:request(
             case KeyOrNum of
                 undefined -> PoolName;
                 _ -> {PoolName, KeyOrNum}
@@ -286,33 +286,42 @@ on_query(
             Retry
         )
     of
-        {error, Reason} ->
+        {error, econnrefused} ->
+            ?SLOG(warning, #{
+                msg => "http_connector_do_request_failed",
+                reason => econnrefused,
+                connector => InstId
+            }),
+            {recoverable_error, econnrefused};
+        {error, Reason} = Result ->
             ?SLOG(error, #{
                 msg => "http_connector_do_request_failed",
                 request => NRequest,
                 reason => Reason,
                 connector => InstId
-            });
-        {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
-            ok;
-        {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
-            ok;
-        {ok, StatusCode, _} ->
+            }),
+            Result;
+        {ok, StatusCode, _} = Result when StatusCode >= 200 andalso StatusCode < 300 ->
+            Result;
+        {ok, StatusCode, _, _} = Result when StatusCode >= 200 andalso StatusCode < 300 ->
+            Result;
+        {ok, StatusCode, Headers} ->
             ?SLOG(error, #{
                 msg => "http connector do request, received error response",
                 request => NRequest,
                 connector => InstId,
                 status_code => StatusCode
-            });
-        {ok, StatusCode, _, _} ->
+            }),
+            {error, #{status_code => StatusCode, headers => Headers}};
+        {ok, StatusCode, Headers, Body} ->
             ?SLOG(error, #{
                 msg => "http connector do request, received error response",
                 request => NRequest,
                 connector => InstId,
                 status_code => StatusCode
-            })
-    end,
-    Result.
+            }),
+            {error, #{status_code => StatusCode, headers => Headers, body => Body}}
+    end.
 
 on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
     case maps:get(request, State, undefined) of

+ 28 - 18
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -136,8 +136,7 @@ drop_bridge(Name) ->
 %% When use this bridge as a data source, ?MODULE:on_message_received will be called
 %% if the bridge received msgs from the remote broker.
 on_message_received(Msg, HookPoint, ResId) ->
-    emqx_resource:inc_matched(ResId),
-    emqx_resource:inc_success(ResId),
+    emqx_resource:inc_received(ResId),
     emqx:run_hook(HookPoint, [Msg]).
 
 %% ===================================================================
@@ -236,20 +235,20 @@ make_forward_confs(undefined) ->
 make_forward_confs(FrowardConf) ->
     FrowardConf.
 
-basic_config(#{
-    server := Server,
-    reconnect_interval := ReconnIntv,
-    proto_ver := ProtoVer,
-    bridge_mode := BridgeMode,
-    username := User,
-    password := Password,
-    clean_start := CleanStart,
-    keepalive := KeepAlive,
-    retry_interval := RetryIntv,
-    max_inflight := MaxInflight,
-    ssl := #{enable := EnableSsl} = Ssl
-}) ->
+basic_config(
     #{
+        server := Server,
+        reconnect_interval := ReconnIntv,
+        proto_ver := ProtoVer,
+        bridge_mode := BridgeMode,
+        clean_start := CleanStart,
+        keepalive := KeepAlive,
+        retry_interval := RetryIntv,
+        max_inflight := MaxInflight,
+        ssl := #{enable := EnableSsl} = Ssl
+    } = Conf
+) ->
+    BaiscConf = #{
         %% connection opts
         server => Server,
         %% 30s
@@ -263,8 +262,6 @@ basic_config(#{
         %% non-standard mqtt connection packets will be filtered out by LB.
         %% So let's disable bridge_mode.
         bridge_mode => BridgeMode,
-        username => User,
-        password => Password,
         clean_start => CleanStart,
         keepalive => ms_to_s(KeepAlive),
         retry_interval => RetryIntv,
@@ -272,7 +269,20 @@ basic_config(#{
         ssl => EnableSsl,
         ssl_opts => maps:to_list(maps:remove(enable, Ssl)),
         if_record_metrics => true
-    }.
+    },
+    maybe_put_fields([username, password], Conf, BaiscConf).
+
+maybe_put_fields(Fields, Conf, Acc0) ->
+    lists:foldl(
+        fun(Key, Acc) ->
+            case maps:find(Key, Conf) of
+                error -> Acc;
+                {ok, Val} -> Acc#{Key => Val}
+            end
+        end,
+        Acc0,
+        Fields
+    ).
 
 ms_to_s(Ms) ->
     erlang:ceil(Ms / 1000).

+ 7 - 0
apps/emqx_connector/src/emqx_connector_mysql.erl

@@ -414,6 +414,13 @@ on_sql_query(
                 LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}
             ),
             Error;
+        {error, {1053, <<"08S01">>, Reason}} ->
+            %% mysql sql server shutdown in progress
+            ?SLOG(
+                error,
+                LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
+            ),
+            {recoverable_error, Reason};
         {error, Reason} ->
             ?SLOG(
                 error,

+ 2 - 2
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl

@@ -101,7 +101,7 @@ fields("server_configs") ->
             mk(
                 binary(),
                 #{
-                    default => "emqx",
+                    default => undefined,
                     desc => ?DESC("username")
                 }
             )},
@@ -109,7 +109,7 @@ fields("server_configs") ->
             mk(
                 binary(),
                 #{
-                    default => "emqx",
+                    default => undefined,
                     format => <<"password">>,
                     desc => ?DESC("password")
                 }

+ 1 - 1
apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf

@@ -143,7 +143,7 @@ emqx_resource_schema {
     }
   }
 
-  queue_max_bytes {
+  max_queue_bytes {
     desc {
       en: """Maximum queue storage."""
       zh: """消息队列的最大长度。"""

+ 7 - 4
apps/emqx_resource/include/emqx_resource.hrl

@@ -68,7 +68,7 @@
     batch_size => pos_integer(),
     batch_time => pos_integer(),
     enable_queue => boolean(),
-    queue_max_bytes => pos_integer(),
+    max_queue_bytes => pos_integer(),
     query_mode => query_mode(),
     resume_interval => pos_integer(),
     async_inflight_window => pos_integer()
@@ -77,12 +77,15 @@
     ok
     | {ok, term()}
     | {error, term()}
-    | {resource_down, term()}.
+    | {recoverable_error, term()}.
 
 -define(WORKER_POOL_SIZE, 16).
 
--define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024).
--define(DEFAULT_QUEUE_SIZE_RAW, <<"1GB">>).
+-define(DEFAULT_QUEUE_SEG_SIZE, 10 * 1024 * 1024).
+-define(DEFAULT_QUEUE_SEG_SIZE_RAW, <<"10MB">>).
+
+-define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024 * 1024).
+-define(DEFAULT_QUEUE_SIZE_RAW, <<"100GB">>).
 
 %% count
 -define(DEFAULT_BATCH_SIZE, 100).

+ 3 - 14
apps/emqx_resource/src/emqx_resource.erl

@@ -110,7 +110,7 @@
     list_group_instances/1
 ]).
 
--export([inc_metrics_funcs/1, inc_matched/1, inc_success/1, inc_failed/1]).
+-export([inc_received/1]).
 
 -optional_callbacks([
     on_query/3,
@@ -443,19 +443,8 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
 
 %% =================================================================================
 
-inc_matched(ResId) ->
-    emqx_metrics_worker:inc(?RES_METRICS, ResId, matched).
-
-inc_success(ResId) ->
-    emqx_metrics_worker:inc(?RES_METRICS, ResId, success).
-
-inc_failed(ResId) ->
-    emqx_metrics_worker:inc(?RES_METRICS, ResId, failed).
+inc_received(ResId) ->
+    emqx_metrics_worker:inc(?RES_METRICS, ResId, 'received').
 
 filter_instances(Filter) ->
     [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
-
-inc_metrics_funcs(ResId) ->
-    OnSucc = [{fun ?MODULE:inc_success/1, ResId}],
-    OnFailed = [{fun ?MODULE:inc_failed/1, ResId}],
-    {OnSucc, OnFailed}.

+ 28 - 1
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -128,7 +128,23 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
     ok = emqx_metrics_worker:create_metrics(
         ?RES_METRICS,
         ResId,
-        [matched, success, failed, exception, resource_down],
+        [
+            'matched',
+            'sent',
+            'dropped',
+            'queued',
+            'batched',
+            'sent.success',
+            'sent.failed',
+            'sent.exception',
+            'sent.inflight',
+            'dropped.queue_not_enabled',
+            'dropped.queue_full',
+            'dropped.resource_not_found',
+            'dropped.resource_stopped',
+            'dropped.other',
+            'received'
+        ],
         [matched]
     ),
     ok = emqx_resource_worker_sup:start_workers(ResId, Opts),
@@ -539,6 +555,7 @@ with_health_check(Data, Func) ->
     HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state),
     {Status, NewState, Err} = parse_health_check_result(HCRes, Data),
     _ = maybe_alarm(Status, ResId),
+    ok = maybe_resume_resource_workers(Status),
     UpdatedData = Data#data{
         state = NewState, status = Status, error = Err
     },
@@ -559,6 +576,16 @@ maybe_alarm(_Status, ResId) ->
         <<"resource down: ", ResId/binary>>
     ).
 
+maybe_resume_resource_workers(connected) ->
+    lists:foreach(
+        fun({_, Pid, _, _}) ->
+            emqx_resource_worker:resume(Pid)
+        end,
+        supervisor:which_children(emqx_resource_worker_sup)
+    );
+maybe_resume_resource_workers(_) ->
+    ok.
+
 maybe_clear_alarm(<<?TEST_ID_PREFIX, _/binary>>) ->
     ok;
 maybe_clear_alarm(ResId) ->

+ 94 - 39
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -77,23 +77,27 @@ start_link(Id, Index, Opts) ->
 sync_query(Id, Request, Opts) ->
     PickKey = maps:get(pick_key, Opts, self()),
     Timeout = maps:get(timeout, Opts, infinity),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
 
 -spec async_query(id(), request(), query_opts()) -> Result :: term().
 async_query(Id, Request, Opts) ->
     PickKey = maps:get(pick_key, Opts, self()),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     pick_cast(Id, PickKey, {query, Request, Opts}).
 
 %% simple query the resource without batching and queuing messages.
 -spec simple_sync_query(id(), request()) -> Result :: term().
 simple_sync_query(Id, Request) ->
     Result = call_query(sync, Id, ?QUERY(self(), Request), #{}),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     _ = handle_query_result(Id, Result, false),
     Result.
 
 -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
 simple_async_query(Id, Request, ReplyFun) ->
     Result = call_query(async, Id, ?QUERY(ReplyFun, Request), #{}),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'matched'),
     _ = handle_query_result(Id, Result, false),
     Result.
 
@@ -119,13 +123,15 @@ init({Id, Index, Opts}) ->
             true ->
                 replayq:open(#{
                     dir => disk_queue_dir(Id, Index),
-                    seg_bytes => maps:get(queue_max_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
+                    seg_bytes => maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE),
+                    max_total_bytes => maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
                     sizer => fun ?MODULE:estimate_size/1,
                     marshaller => fun ?MODULE:queue_item_marshaller/1
                 });
             false ->
                 undefined
         end,
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', queue_count(Queue)),
     ok = inflight_new(Name),
     St = #{
         id => Id,
@@ -149,8 +155,10 @@ running(cast, resume, _St) ->
     keep_state_and_data;
 running(cast, block, St) ->
     {next_state, block, St};
-running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) ->
-    Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]),
+running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when
+    is_list(Batch)
+->
+    Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]),
     {next_state, block, St#{queue := Q1}};
 running({call, From}, {query, Request, _Opts}, St) ->
     query_or_acc(From, Request, St);
@@ -169,8 +177,10 @@ blocked(enter, _, #{resume_interval := ResumeT} = _St) ->
     {keep_state_and_data, {state_timeout, ResumeT, resume}};
 blocked(cast, block, _St) ->
     keep_state_and_data;
-blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) ->
-    Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]),
+blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{id := Id, queue := Q} = St) when
+    is_list(Batch)
+->
+    Q1 = maybe_append_queue(Id, Q, [?Q_ITEM(Query) || Query <- Batch]),
     {keep_state, St#{queue := Q1}};
 blocked(cast, resume, St) ->
     do_resume(St);
@@ -179,12 +189,12 @@ blocked(state_timeout, resume, St) ->
 blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) ->
     Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
     _ = reply_caller(Id, ?REPLY(From, Request, Error)),
-    {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}};
+    {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request))])}};
 blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) ->
     ReplayFun = maps:get(async_reply_fun, Opts, undefined),
     Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
     _ = reply_caller(Id, ?REPLY(ReplayFun, Request, Error)),
-    {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(ReplayFun, Request))])}}.
+    {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplayFun, Request))])}}.
 
 terminate(_Reason, #{id := Id, index := Index}) ->
     gproc_pool:disconnect_worker(Id, {Id, Index}).
@@ -206,10 +216,10 @@ estimate_size(QItem) ->
         Pid when is_pid(Pid) ->
             EXPR;
         _ ->
-            ?RESOURCE_ERROR(not_created, "resource not created")
+            ?RESOURCE_ERROR(worker_not_created, "resource not created")
     catch
         error:badarg ->
-            ?RESOURCE_ERROR(not_created, "resource not created");
+            ?RESOURCE_ERROR(worker_not_created, "resource not created");
         exit:{timeout, _} ->
             ?RESOURCE_ERROR(timeout, "call resource timeout")
     end
@@ -255,18 +265,20 @@ retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = S
                         inflight_drop(Name, Ref),
                         St0;
                     _ ->
-                        St0#{queue => drop_head(Q)}
+                        St0#{queue => drop_head(Id, Q)}
                 end,
             {keep_state, St, {state_timeout, 0, resume}}
     end.
 
-drop_head(Q) ->
+drop_head(Id, Q) ->
     {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
     ok = replayq:ack(Q1, AckRef),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -1),
     Q1.
 
-query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left} = St0) ->
+query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) ->
     Acc1 = [?QUERY(From, Request) | Acc],
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched'),
     St = St0#{acc := Acc1, acc_left := Left - 1},
     case Left =< 1 of
         true -> flush(St);
@@ -277,18 +289,15 @@ query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St)
         inflight_name => maps:get(name, St),
         inflight_window => maps:get(async_inflight_window, St)
     },
-    case send_query(From, Request, Id, QueryOpts) of
+    Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts),
+    case reply_caller(Id, ?REPLY(From, Request, Result)) of
         true ->
             Query = ?QUERY(From, Request),
-            {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Query)])}};
+            {next_state, blocked, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(Query)])}};
         false ->
             {keep_state, St}
     end.
 
-send_query(From, Request, Id, QueryOpts) ->
-    Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts),
-    reply_caller(Id, ?REPLY(From, Request, Result)).
-
 flush(#{acc := []} = St) ->
     {keep_state, St};
 flush(
@@ -303,18 +312,39 @@ flush(
         inflight_name => maps:get(name, St),
         inflight_window => maps:get(async_inflight_window, St)
     },
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'batched', -length(Batch)),
     Result = call_query(configured, Id, Batch, QueryOpts),
     St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
     case batch_reply_caller(Id, Result, Batch) of
         true ->
-            Q1 = maybe_append_queue(Q0, [?Q_ITEM(Query) || Query <- Batch]),
+            Q1 = maybe_append_queue(Id, Q0, [?Q_ITEM(Query) || Query <- Batch]),
             {next_state, blocked, St1#{queue := Q1}};
         false ->
             {keep_state, St1}
     end.
 
-maybe_append_queue(undefined, _Items) -> undefined;
-maybe_append_queue(Q, Items) -> replayq:append(Q, Items).
+maybe_append_queue(Id, undefined, _Items) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'),
+    undefined;
+maybe_append_queue(Id, Q, Items) ->
+    Q2 =
+        case replayq:overflow(Q) of
+            Overflow when Overflow =< 0 ->
+                Q;
+            Overflow ->
+                PopOpts = #{bytes_limit => Overflow, count_limit => 999999999},
+                {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts),
+                ok = replayq:ack(Q1, QAckRef),
+                Dropped = length(Items2),
+                emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -Dropped),
+                emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
+                emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'),
+                ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
+                Q1
+        end,
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'),
+    replayq:append(Q2, Items).
 
 batch_reply_caller(Id, BatchResult, Batch) ->
     lists:foldl(
@@ -344,29 +374,41 @@ reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) ->
     handle_query_result(Id, Result, BlockWorker).
 
 handle_query_result(Id, ?RESOURCE_ERROR_M(exception, _), BlockWorker) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, exception),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.exception'),
     BlockWorker;
 handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _) when
     NotWorking == not_connected; NotWorking == blocked
 ->
     true;
-handle_query_result(_Id, ?RESOURCE_ERROR_M(_, _), BlockWorker) ->
+handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, _), BlockWorker) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_not_found'),
+    BlockWorker;
+handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, _), BlockWorker) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.resource_stopped'),
+    BlockWorker;
+handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) ->
+    ?SLOG(error, #{msg => other_resource_error, reason => Reason}),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.other'),
     BlockWorker;
 handle_query_result(Id, {error, _}, BlockWorker) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, failed),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
     BlockWorker;
-handle_query_result(Id, {resource_down, _}, _BlockWorker) ->
-    emqx_metrics_worker:inc(?RES_METRICS, Id, resource_down),
+handle_query_result(Id, {recoverable_error, _}, _BlockWorker) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1),
     true;
 handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) ->
     true;
-handle_query_result(_Id, {async_return, {resource_down, _}}, _BlockWorker) ->
-    true;
+handle_query_result(Id, {async_return, {error, _}}, BlockWorker) ->
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'),
+    BlockWorker;
 handle_query_result(_Id, {async_return, ok}, BlockWorker) ->
     BlockWorker;
 handle_query_result(Id, Result, BlockWorker) ->
     assert_ok_result(Result),
-    emqx_metrics_worker:inc(?RES_METRICS, Id, success),
+    emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.success'),
     BlockWorker.
 
 call_query(QM0, Id, Query, QueryOpts) ->
@@ -390,8 +432,8 @@ call_query(QM0, Id, Query, QueryOpts) ->
 -define(APPLY_RESOURCE(EXPR, REQ),
     try
         %% if the callback module (connector) wants to return an error that
-        %% makes the current resource goes into the `error` state, it should
-        %% return `{resource_down, Reason}`
+        %% makes the current resource goes into the `blocked` state, it should
+        %% return `{recoverable_error, Reason}`
         EXPR
     catch
         ERR:REASON:STACKTRACE ->
@@ -406,7 +448,7 @@ call_query(QM0, Id, Query, QueryOpts) ->
 
 apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) ->
     ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'),
     ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request);
 apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
     ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
@@ -418,7 +460,8 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
                 ?tp(inflight_full, #{id => Id, wind_size => WinSize}),
                 {async_return, inflight_full};
             false ->
-                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched),
+                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'),
+                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
                 ReplyFun = fun ?MODULE:reply_after_query/6,
                 Ref = make_message_ref(),
                 Args = [self(), Id, Name, Ref, Query],
@@ -431,7 +474,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) ->
 apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) ->
     ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
     Requests = [Request || ?QUERY(_From, Request) <- Batch],
-    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)),
+    ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)),
     ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch);
 apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
     ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
@@ -443,7 +486,8 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
                 ?tp(inflight_full, #{id => Id, wind_size => WinSize}),
                 {async_return, inflight_full};
             false ->
-                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)),
+                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)),
+                ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight'),
                 ReplyFun = fun ?MODULE:batch_reply_after_query/6,
                 Ref = make_message_ref(),
                 Args = {ReplyFun, [self(), Id, Name, Ref, Batch]},
@@ -457,14 +501,20 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) ->
 
 reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) ->
     case reply_caller(Id, ?REPLY(From, Request, Result)) of
-        true -> ?MODULE:block(Pid);
-        false -> inflight_drop(Name, Ref)
+        true ->
+            ?MODULE:block(Pid);
+        false ->
+            emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -1),
+            inflight_drop(Name, Ref)
     end.
 
 batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) ->
     case batch_reply_caller(Id, Result, Batch) of
-        true -> ?MODULE:block(Pid);
-        false -> inflight_drop(Name, Ref)
+        true ->
+            ?MODULE:block(Pid);
+        false ->
+            emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.inflight', -length(Batch)),
+            inflight_drop(Name, Ref)
     end.
 %%==============================================================================
 %% the inflight queue for async query
@@ -518,6 +568,11 @@ assert_ok_result(R) when is_tuple(R) ->
 assert_ok_result(R) ->
     error({not_ok_result, R}).
 
+queue_count(undefined) ->
+    0;
+queue_count(Q) ->
+    replayq:count(Q).
+
 -spec name(id(), integer()) -> atom().
 name(Id, Index) ->
     Mod = atom_to_list(?MODULE),

+ 3 - 3
apps/emqx_resource/src/emqx_resource_worker_sup.erl

@@ -107,7 +107,7 @@ ensure_worker_started(ResId, Idx, Opts) ->
         type => worker,
         modules => [Mod]
     },
-    case supervisor:start_child(emqx_resource_sup, Spec) of
+    case supervisor:start_child(?SERVER, Spec) of
         {ok, _Pid} -> ok;
         {error, {already_started, _}} -> ok;
         {error, already_present} -> ok;
@@ -116,9 +116,9 @@ ensure_worker_started(ResId, Idx, Opts) ->
 
 ensure_worker_removed(ResId, Idx) ->
     ChildId = ?CHILD_ID(emqx_resource_worker, ResId, Idx),
-    case supervisor:terminate_child(emqx_resource_sup, ChildId) of
+    case supervisor:terminate_child(?SERVER, ChildId) of
         ok ->
-            Res = supervisor:delete_child(emqx_resource_sup, ChildId),
+            Res = supervisor:delete_child(?SERVER, ChildId),
             _ = gproc_pool:remove_worker(ResId, {ResId, Idx}),
             Res;
         {error, not_found} ->

+ 6 - 6
apps/emqx_resource/src/schema/emqx_resource_schema.erl

@@ -53,7 +53,7 @@ fields("creation_opts") ->
         {batch_size, fun batch_size/1},
         {batch_time, fun batch_time/1},
         {enable_queue, fun enable_queue/1},
-        {max_queue_bytes, fun queue_max_bytes/1}
+        {max_queue_bytes, fun max_queue_bytes/1}
     ].
 
 worker_pool_size(type) -> pos_integer();
@@ -110,11 +110,11 @@ batch_time(default) -> ?DEFAULT_BATCH_TIME_RAW;
 batch_time(required) -> false;
 batch_time(_) -> undefined.
 
-queue_max_bytes(type) -> emqx_schema:bytesize();
-queue_max_bytes(desc) -> ?DESC("queue_max_bytes");
-queue_max_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW;
-queue_max_bytes(required) -> false;
-queue_max_bytes(_) -> undefined.
+max_queue_bytes(type) -> emqx_schema:bytesize();
+max_queue_bytes(desc) -> ?DESC("max_queue_bytes");
+max_queue_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW;
+max_queue_bytes(required) -> false;
+max_queue_bytes(_) -> undefined.
 
 desc("creation_opts") ->
     ?DESC("creation_opts").

+ 1 - 1
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -96,7 +96,7 @@ on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
     Pid ! {From, {inc, N}},
     receive
         {ReqRef, ok} -> ok;
-        {ReqRef, incorrect_status} -> {resource_down, incorrect_status}
+        {ReqRef, incorrect_status} -> {recoverable_error, incorrect_status}
     after 1000 ->
         {error, timeout}
     end;

+ 4 - 4
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -268,7 +268,7 @@ t_query_counter_async_query(_) ->
         end
     ),
     {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
-    ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C),
+    ?assertMatch(#{matched := 1002, 'sent.success' := 1002, 'sent.failed' := 0}, C),
     ok = emqx_resource:remove_local(?ID).
 
 t_query_counter_async_callback(_) ->
@@ -309,7 +309,7 @@ t_query_counter_async_callback(_) ->
         end
     ),
     {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
-    ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C),
+    ?assertMatch(#{matched := 1002, sent := 1002, 'sent.success' := 1002, 'sent.failed' := 0}, C),
     ?assertMatch(1000, ets:info(Tab0, size)),
     ?assert(
         lists:all(
@@ -419,8 +419,8 @@ t_query_counter_async_inflight(_) ->
     {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
     ct:pal("metrics: ~p", [C]),
     ?assertMatch(
-        #{matched := M, success := S, exception := E, failed := F, resource_down := RD} when
-            M >= Sent andalso M == S + E + F + RD,
+        #{matched := M, sent := St, 'sent.success' := Ss, dropped := D} when
+            St == Ss andalso M == St + D,
         C
     ),
     ?assert(

+ 7 - 7
apps/emqx_rule_engine/include/rule_engine.hrl

@@ -88,18 +88,18 @@
 %% Logical operators
 -define(is_logical(Op), (Op =:= 'and' orelse Op =:= 'or')).
 
--define(RAISE(_EXP_, _ERROR_),
-    ?RAISE(_EXP_, _ = do_nothing, _ERROR_)
+-define(RAISE(EXP, ERROR),
+    ?RAISE(EXP, _ = do_nothing, ERROR)
 ).
 
--define(RAISE(_EXP_, _EXP_ON_FAIL_, _ERROR_),
+-define(RAISE(EXP, EXP_ON_FAIL, ERROR),
     fun() ->
         try
-            (_EXP_)
+            (EXP)
         catch
-            _EXCLASS_:_EXCPTION_:_ST_ ->
-                _EXP_ON_FAIL_,
-                throw(_ERROR_)
+            EXCLASS:EXCPTION:ST ->
+                EXP_ON_FAIL,
+                throw(ERROR)
         end
     end()
 ).

+ 27 - 13
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -42,6 +42,10 @@
 -type alias() :: atom().
 -type collection() :: {alias(), [term()]}.
 
+-elvis([
+    {elvis_style, invalid_dynamic_call, #{ignore => [emqx_rule_runtime]}}
+]).
+
 -define(ephemeral_alias(TYPE, NAME),
     iolist_to_binary(io_lib:format("_v_~ts_~p_~p", [TYPE, NAME, erlang:system_time()]))
 ).
@@ -130,13 +134,13 @@ do_apply_rule(
 ) ->
     {Selected, Collection} = ?RAISE(
         select_and_collect(Fields, Columns),
-        {select_and_collect_error, {_EXCLASS_, _EXCPTION_, _ST_}}
+        {select_and_collect_error, {EXCLASS, EXCPTION, ST}}
     ),
     ColumnsAndSelected = maps:merge(Columns, Selected),
     case
         ?RAISE(
             match_conditions(Conditions, ColumnsAndSelected),
-            {match_conditions_error, {_EXCLASS_, _EXCPTION_, _ST_}}
+            {match_conditions_error, {EXCLASS, EXCPTION, ST}}
         )
     of
         true ->
@@ -166,12 +170,12 @@ do_apply_rule(
 ) ->
     Selected = ?RAISE(
         select_and_transform(Fields, Columns),
-        {select_and_transform_error, {_EXCLASS_, _EXCPTION_, _ST_}}
+        {select_and_transform_error, {EXCLASS, EXCPTION, ST}}
     ),
     case
         ?RAISE(
             match_conditions(Conditions, maps:merge(Columns, Selected)),
-            {match_conditions_error, {_EXCLASS_, _EXCPTION_, _ST_}}
+            {match_conditions_error, {EXCLASS, EXCPTION, ST}}
         )
     of
         true ->
@@ -245,7 +249,7 @@ filter_collection(Columns, InCase, DoEach, {CollKey, CollVal}) ->
             case
                 ?RAISE(
                     match_conditions(InCase, ColumnsAndItem),
-                    {match_incase_error, {_EXCLASS_, _EXCPTION_, _ST_}}
+                    {match_incase_error, {EXCLASS, EXCPTION, ST}}
                 )
             of
                 true when DoEach == [] -> {true, ColumnsAndItem};
@@ -253,7 +257,7 @@ filter_collection(Columns, InCase, DoEach, {CollKey, CollVal}) ->
                     {true,
                         ?RAISE(
                             select_and_transform(DoEach, ColumnsAndItem),
-                            {doeach_error, {_EXCLASS_, _EXCPTION_, _ST_}}
+                            {doeach_error, {EXCLASS, EXCPTION, ST}}
                         )};
                 false ->
                     false
@@ -271,7 +275,7 @@ match_conditions({'not', Var}, Data) ->
     case eval(Var, Data) of
         Bool when is_boolean(Bool) ->
             not Bool;
-        _other ->
+        _Other ->
             false
     end;
 match_conditions({in, Var, {list, Vals}}, Data) ->
@@ -506,12 +510,22 @@ nested_put(Alias, Val, Columns0) ->
 -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found).
 inc_action_metrics(ok, RuleId) ->
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success');
-inc_action_metrics({ok, _}, RuleId) ->
-    emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success');
-inc_action_metrics({resource_down, _}, RuleId) ->
+inc_action_metrics({recoverable_error, _}, RuleId) ->
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
 inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) ->
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
-inc_action_metrics(_, RuleId) ->
-    emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
-    emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown').
+inc_action_metrics(R, RuleId) ->
+    case is_ok_result(R) of
+        false ->
+            emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
+            emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
+        true ->
+            emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success')
+    end.
+
+is_ok_result(ok) ->
+    true;
+is_ok_result(R) when is_tuple(R) ->
+    ok == erlang:element(1, R);
+is_ok_result(ok) ->
+    false.

+ 10 - 8
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -62,14 +62,16 @@ test_rule(Sql, Select, Context, EventTopics) ->
     },
     FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)),
     try emqx_rule_runtime:apply_rule(Rule, FullContext, #{}) of
-        {ok, Data} -> {ok, flatten(Data)};
-        {error, Reason} -> {error, Reason}
+        {ok, Data} ->
+            {ok, flatten(Data)};
+        {error, Reason} ->
+            {error, Reason}
     after
         ok = emqx_rule_engine:clear_metrics_for_rule(RuleId)
     end.
 
 get_selected_data(Selected, _Envs, _Args) ->
-    Selected.
+    {ok, Selected}.
 
 is_publish_topic(<<"$events/", _/binary>>) -> false;
 is_publish_topic(<<"$bridges/", _/binary>>) -> false;
@@ -77,14 +79,14 @@ is_publish_topic(_Topic) -> true.
 
 flatten([]) ->
     [];
-flatten([D1]) ->
-    D1;
-flatten([D1 | L]) when is_list(D1) ->
-    D1 ++ flatten(L).
+flatten([{ok, D}]) ->
+    D;
+flatten([D | L]) when is_list(D) ->
+    [D0 || {ok, D0} <- D] ++ flatten(L).
 
 echo_action(Data, Envs) ->
     ?TRACE("RULE", "testing_rule_sql_ok", #{data => Data, envs => Envs}),
-    Data.
+    {ok, Data}.
 
 fill_default_values(Event, Context) ->
     maps:merge(envs_examp(Event), Context).

+ 0 - 18
lib-ee/emqx_ee_bridge/include/emqx_ee_bridge.hrl

@@ -1,18 +0,0 @@
--define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{
-    matched => MATCH,
-    success => SUCC,
-    failed => FAILED,
-    rate => RATE,
-    rate_last5m => RATE_5,
-    rate_max => RATE_MAX
-}).
-
--define(METRICS_EXAMPLE, #{
-    metrics => ?METRICS(0, 0, 0, 0, 0, 0),
-    node_metrics => [
-        #{
-            node => node(),
-            metrics => ?METRICS(0, 0, 0, 0, 0, 0)
-        }
-    ]
-}).

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl

@@ -5,7 +5,7 @@
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
--include("emqx_ee_bridge.hrl").
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
 
 -import(hoconsc, [mk/2, enum/1, ref/2]).
 

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl

@@ -3,7 +3,7 @@
 %%--------------------------------------------------------------------
 -module(emqx_ee_bridge_influxdb).
 
--include("emqx_ee_bridge.hrl").
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
 -include_lib("emqx_connector/include/emqx_connector.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").

+ 1 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl

@@ -5,7 +5,7 @@
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
--include("emqx_ee_bridge.hrl").
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
 
 -import(hoconsc, [mk/2, enum/1, ref/2]).

+ 4 - 2
lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstreamdb.erl

@@ -135,13 +135,15 @@ start_client(InstId, Config) ->
         do_start_client(InstId, Config)
     catch
         E:R:S ->
-            ?SLOG(error, #{
+            Error = #{
                 msg => "start hstreamdb connector error",
                 connector => InstId,
                 error => E,
                 reason => R,
                 stack => S
-            })
+            },
+            ?SLOG(error, Error),
+            {error, Error}
     end.
 
 do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) ->

+ 6 - 11
lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl

@@ -85,18 +85,13 @@ on_batch_query_async(
     InstId,
     BatchData,
     {ReplayFun, Args},
-    State = #{write_syntax := SyntaxLines, client := Client}
+    #{write_syntax := SyntaxLines, client := Client}
 ) ->
-    case on_get_status(InstId, State) of
-        connected ->
-            case parse_batch_data(InstId, BatchData, SyntaxLines) of
-                {ok, Points} ->
-                    do_async_query(InstId, Client, Points, {ReplayFun, Args});
-                {error, Reason} ->
-                    {error, Reason}
-            end;
-        disconnected ->
-            {resource_down, disconnected}
+    case parse_batch_data(InstId, BatchData, SyntaxLines) of
+        {ok, Points} ->
+            do_async_query(InstId, Client, Points, {ReplayFun, Args});
+        {error, Reason} ->
+            {error, Reason}
     end.
 
 on_get_status(_InstId, #{client := Client}) ->