Просмотр исходного кода

Merge pull request #8722 from JimMoen/feat-influxdb-batch-async

feat: influxdb support `async`/`batch_async` query
JimMoen 3 лет назад
Родитель
Сommit
350b39edca

+ 7 - 16
apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf

@@ -24,13 +24,8 @@ emqx_resource_schema {
 
   start_timeout {
     desc {
-      en: """
-If 'start_after_created' enabled, how long time do we wait for the
-resource get started, in milliseconds.
-"""
-      zh: """
-如果选择了创建后立即启动资源,此选项用来设置等待资源启动的超时时间,单位毫秒。
-"""
+      en: """If 'start_after_created' enabled, how long time do we wait for the resource get started, in milliseconds."""
+      zh: """如果选择了创建后立即启动资源,此选项用来设置等待资源启动的超时时间,单位毫秒。"""
     }
     label {
       en: """Start Timeout"""
@@ -40,12 +35,8 @@ resource get started, in milliseconds.
 
   auto_restart_interval {
     desc {
-      en: """
-The auto restart interval after the resource is disconnected, in milliseconds.
-"""
-      zh: """
-资源断开以后,自动重连的时间间隔,单位毫秒。
-"""
+      en: """The auto restart interval after the resource is disconnected, in milliseconds."""
+      zh: """资源断开以后,自动重连的时间间隔,单位毫秒。"""
     }
     label {
       en: """Auto Restart Interval"""
@@ -89,7 +80,7 @@ The auto restart interval after the resource is disconnected, in milliseconds.
   resume_interval {
     desc {
       en: """Resume time interval when resource down."""
-      zh: """资源不可用时的重试时间"""
+      zh: """资源不可用时的重试时间"""
     }
     label {
       en: """Resume interval"""
@@ -100,7 +91,7 @@ The auto restart interval after the resource is disconnected, in milliseconds.
   async_inflight_window {
     desc {
       en: """Async query inflight window."""
-      zh: """异步请求飞行队列窗口大小"""
+      zh: """异步请求飞行队列窗口大小"""
     }
     label {
       en: """Async inflight window"""
@@ -111,7 +102,7 @@ The auto restart interval after the resource is disconnected, in milliseconds.
   batch_size {
     desc {
       en: """Maximum batch count."""
-      zh: """批量请求大小"""
+      zh: """批量请求大小"""
     }
     label {
       en: """Batch size"""

+ 10 - 0
apps/emqx_resource/src/emqx_resource.erl

@@ -115,6 +115,7 @@
     on_query/3,
     on_batch_query/3,
     on_query_async/4,
+    on_batch_query_async/4,
     on_get_status/2
 ]).
 
@@ -131,6 +132,7 @@
 %% when calling emqx_resource:on_batch_query/3
 -callback on_batch_query(resource_id(), Request :: term(), resource_state()) -> query_result().
 
+%% when calling emqx_resource:on_query_async/4
 -callback on_query_async(
     resource_id(),
     Request :: term(),
@@ -138,6 +140,14 @@
     resource_state()
 ) -> query_result().
 
+%% when calling emqx_resource:on_batch_query_async/4
+-callback on_batch_query_async(
+    resource_id(),
+    Request :: term(),
+    {ReplyFun :: function(), Args :: list()},
+    resource_state()
+) -> query_result().
+
 %% when calling emqx_resource:health_check/2
 -callback on_get_status(resource_id(), resource_state()) ->
     resource_status()

+ 13 - 17
lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf

@@ -1,15 +1,13 @@
 emqx_ee_bridge_influxdb {
     local_topic {
         desc {
-            en: """
-The MQTT topic filter to be forwarded to the InfluxDB. All MQTT 'PUBLISH' messages with the topic
+            en: """The MQTT topic filter to be forwarded to the InfluxDB. All MQTT 'PUBLISH' messages with the topic
 matching the local_topic will be forwarded.</br>
 NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
 configured, then both the data got from the rule and the MQTT messages that match local_topic
 will be forwarded.
 """
-            zh: """
-发送到 'local_topic' 的消息都会转发到 InfluxDB。 </br>
+            zh: """发送到 'local_topic' 的消息都会转发到 InfluxDB。 </br>
 注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发到 InfluxDB。
 """
         }
@@ -20,20 +18,18 @@ will be forwarded.
     }
     write_syntax {
         desc {
-            en: """
-Conf of InfluxDB line protocol to write data points. It is a text-based format that provides the measurement, tag set, field set, and timestamp of a data point, and placeholder supported.
+            en: """Conf of InfluxDB line protocol to write data points. It is a text-based format that provides the measurement, tag set, field set, and timestamp of a data point, and placeholder supported.
 See also [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) and
 [InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) </br>
-TLDR:
+TLDR:</br>
 ```
 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
 ```
 """
-            zh: """
-使用 InfluxDB API Line Protocol 写入 InfluxDB 的数据,支持占位符</br>
+            zh: """使用 InfluxDB API Line Protocol 写入 InfluxDB 的数据,支持占位符</br>
 参考 [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) 及
 [InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) </br>
-TLDR:
+TLDR: </br>
 ```
 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
 ```
@@ -46,8 +42,8 @@ TLDR:
     }
     config_enable {
         desc {
-            en: """Enable or disable this bridge"""
-            zh: """启用/禁用桥接"""
+            en: """Enable or disable this bridge."""
+            zh: """启用/禁用桥接"""
         }
         label {
             en: "Enable Or Disable Bridge"
@@ -56,8 +52,8 @@ TLDR:
         }
     config_direction {
         desc {
-            en: """The direction of this bridge, MUST be 'egress'"""
-            zh: """桥接的方向,必须是 egress"""
+            en: """The direction of this bridge, MUST be 'egress'."""
+            zh: """桥接的方向,必须是 egress"""
         }
         label {
             en: "Bridge Direction"
@@ -68,7 +64,7 @@ TLDR:
     desc_config {
         desc {
             en: """Configuration for an InfluxDB bridge."""
-            zh: """InfluxDB 桥接配置"""
+            zh: """InfluxDB 桥接配置"""
         }
         label: {
             en: "InfluxDB Bridge Configuration"
@@ -78,8 +74,8 @@ TLDR:
 
     desc_type {
         desc {
-            en: """The Bridge Type"""
-            zh: """Bridge 类型"""
+            en: """The Bridge Type."""
+            zh: """Bridge 类型"""
         }
         label {
             en: "Bridge Type"

+ 8 - 8
lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_influxdb.conf

@@ -42,8 +42,8 @@ emqx_ee_connector_influxdb {
     }
     protocol {
         desc {
-                en: """InfluxDB's protocol. UDP or HTTP API or HTTP API V2"""
-                zh: """InfluxDB 协议。UDP 或 HTTP API 或 HTTP API V2"""
+                en: """InfluxDB's protocol. UDP or HTTP API or HTTP API V2."""
+                zh: """InfluxDB 协议。UDP 或 HTTP API 或 HTTP API V2"""
         }
         label: {
                 en: """Protocol"""
@@ -53,7 +53,7 @@ emqx_ee_connector_influxdb {
     influxdb_udp {
         desc {
                 en: """InfluxDB's UDP protocol."""
-                zh: """InfluxDB UDP 协议"""
+                zh: """InfluxDB UDP 协议"""
         }
         label: {
                 en: """UDP Protocol"""
@@ -63,7 +63,7 @@ emqx_ee_connector_influxdb {
     influxdb_api_v1 {
         desc {
                 en: """InfluxDB's protocol. Support InfluxDB v1.8 and before."""
-                zh: """InfluxDB HTTP API 协议。支持 Influxdb v1.8 以及之前的版本"""
+                zh: """InfluxDB HTTP API 协议。支持 Influxdb v1.8 以及之前的版本"""
         }
         label: {
                 en: """HTTP API Protocol"""
@@ -73,7 +73,7 @@ emqx_ee_connector_influxdb {
     influxdb_api_v2 {
         desc {
                 en: """InfluxDB's protocol. Support InfluxDB v2.0 and after."""
-                zh: """InfluxDB HTTP API V2 协议。支持 Influxdb v2.0 以及之后的版本"""
+                zh: """InfluxDB HTTP API V2 协议。支持 Influxdb v2.0 以及之后的版本"""
         }
         label: {
                 en: """HTTP API V2 Protocol"""
@@ -113,7 +113,7 @@ emqx_ee_connector_influxdb {
     bucket {
         desc {
                 en: "InfluxDB bucket name."
-                zh: "InfluxDB bucket 名称"
+                zh: "InfluxDB bucket 名称"
         }
         label: {
                 en: "Bucket"
@@ -152,8 +152,8 @@ emqx_ee_connector_influxdb {
     }
     pool_size {
         desc {
-                en: """InfluxDB Pool Size"""
-                zh: """InfluxDB 连接池大小"""
+                en: """InfluxDB Pool Size. Default value is CPU threads."""
+                zh: """InfluxDB 连接池大小,默认为 CPU 线程数。"""
             }
         label {
                 en: """InfluxDB Pool Size"""

+ 32 - 15
lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl

@@ -19,6 +19,7 @@
     on_query/3,
     on_batch_query/3,
     on_query_async/4,
+    on_batch_query_async/4,
     on_get_status/2
 ]).
 
@@ -31,7 +32,7 @@
 
 %% -------------------------------------------------------------------------------------------------
 %% resource callback
-callback_mode() -> always_sync.
+callback_mode() -> async_if_possible.
 
 on_start(InstId, Config) ->
     start_client(InstId, Config).
@@ -50,17 +51,12 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c
 
 %% Once a Batched Data trans to points failed.
 %% This batch query failed
-on_batch_query(InstId, BatchData, State = #{write_syntax := SyntaxLines, client := Client}) ->
-    case on_get_status(InstId, State) of
-        connected ->
-            case parse_batch_data(InstId, BatchData, SyntaxLines) of
-                {ok, Points} ->
-                    do_query(InstId, Client, Points);
-                {error, Reason} ->
-                    {error, Reason}
-            end;
-        disconnected ->
-            {resource_down, disconnected}
+on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) ->
+    case parse_batch_data(InstId, BatchData, SyntaxLines) of
+        {ok, Points} ->
+            do_query(InstId, Client, Points);
+        {error, Reason} ->
+            {error, Reason}
     end.
 
 on_query_async(
@@ -77,6 +73,24 @@ on_query_async(
             Err
     end.
 
+on_batch_query_async(
+    InstId,
+    BatchData,
+    {ReplayFun, Args},
+    State = #{write_syntax := SyntaxLines, client := Client}
+) ->
+    case on_get_status(InstId, State) of
+        connected ->
+            case parse_batch_data(InstId, BatchData, SyntaxLines) of
+                {ok, Points} ->
+                    do_async_query(InstId, Client, Points, {ReplayFun, Args});
+                {error, Reason} ->
+                    {error, Reason}
+            end;
+        disconnected ->
+            {resource_down, disconnected}
+    end.
+
 on_get_status(_InstId, #{client := Client}) ->
     case influxdb:is_alive(Client) of
         true ->
@@ -122,7 +136,7 @@ fields(basic) ->
             mk(enum([ns, us, ms, s, m, h]), #{
                 required => false, default => ms, desc => ?DESC("precision")
             })},
-        {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})}
+        {pool_size, mk(pos_integer(), #{desc => ?DESC("pool_size")})}
     ];
 fields(influxdb_udp) ->
     fields(basic);
@@ -488,7 +502,8 @@ maps_config_to_data(K, V, {Data, Res}) ->
     case {NK, NV} of
         {[undefined], _} ->
             {Data, Res};
-        {_, [undefined]} ->
+        %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>]
+        {_, [undefined | _]} ->
             {Data, Res};
         _ ->
             {Data, Res#{NK => value_type(NV)}}
@@ -498,7 +513,9 @@ value_type([Int, <<"i">>]) when
     is_integer(Int)
 ->
     {int, Int};
-value_type([UInt, <<"u">>]) ->
+value_type([UInt, <<"u">>]) when
+    is_integer(UInt)
+->
     {uint, UInt};
 value_type([<<"t">>]) ->
     't';