Kaynağa Gözat

fix(iotdb): improve the schema for IotDB

firest 2 yıl önce
ebeveyn
işleme
2bf7ac00a0

+ 3 - 13
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl

@@ -113,7 +113,7 @@ fields(action_parameters_data) ->
     [
         {timestamp,
             mk(
-                binary(),
+                hoconsc:union([now, now_ms, now_ns, now_us, binary()]),
                 #{
                     desc => ?DESC("config_parameters_timestamp"),
                     default => <<"now">>
@@ -129,20 +129,10 @@ fields(action_parameters_data) ->
             )},
         {data_type,
             mk(
-                binary(),
+                hoconsc:union([text, boolean, int32, int64, float, double, binary()]),
                 #{
                     required => true,
-                    desc => ?DESC("config_parameters_data_type"),
-                    validator => fun(Type) ->
-                        lists:member(Type, [
-                            <<"TEXT">>,
-                            <<"BOOLEAN">>,
-                            <<"INT32">>,
-                            <<"INT64">>,
-                            <<"FLOAT">>,
-                            <<"DOUBLE">>
-                        ])
-                    end
+                    desc => ?DESC("config_parameters_data_type")
                 }
             )},
         {value,

+ 20 - 4
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -376,7 +376,7 @@ preproc_data(
                 maps:get(<<"timestamp">>, Data, <<"now">>)
             ),
             measurement => emqx_placeholder:preproc_tmpl(Measurement),
-            data_type => DataType,
+            data_type => emqx_placeholder:preproc_tmpl(DataType),
             value => maybe_preproc_tmpl(Value)
         }
         | Acc
@@ -409,10 +409,11 @@ proc_data(PreProcessedData, Msg) ->
             #{
                 timestamp := TimestampTkn,
                 measurement := Measurement,
-                data_type := DataType,
+                data_type := DataType0,
                 value := ValueTkn
             }
         ) ->
+            DataType = emqx_placeholder:proc_tmpl(DataType0, Msg),
             #{
                 timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
                 measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
@@ -610,6 +611,12 @@ eval_response_body(Body, Resp) ->
     end.
 
 preproc_data_template(DataList) ->
+    Atom2Bin = fun
+        (Atom, Converter) when is_atom(Atom) ->
+            Converter(Atom);
+        (Bin, _) ->
+            Bin
+    end,
     lists:map(
         fun(
             #{
@@ -620,9 +627,18 @@ preproc_data_template(DataList) ->
             }
         ) ->
             #{
-                timestamp => emqx_placeholder:preproc_tmpl(Timestamp),
+                timestamp => emqx_placeholder:preproc_tmpl(
+                    Atom2Bin(Timestamp, fun erlang:atom_to_binary/1)
+                ),
                 measurement => emqx_placeholder:preproc_tmpl(Measurement),
-                data_type => DataType,
+                data_type => emqx_placeholder:preproc_tmpl(
+                    Atom2Bin(
+                        DataType,
+                        fun(Atom) ->
+                            erlang:list_to_binary(string:uppercase(erlang:atom_to_list(Atom)))
+                        end
+                    )
+                ),
                 value => emqx_placeholder:preproc_tmpl(Value)
             }
         end,

+ 2 - 1
rel/i18n/emqx_bridge_iotdb.hocon

@@ -89,7 +89,8 @@ config_parameters_measurement.label:
 """Measurement"""
 
 config_parameters_data_type.desc:
-"""Data Type, can be:</br>
+"""Data Type, an enumerated or a string. </br>
+For string placeholders in format of ${var} is supported, the final value can be:</br>
 - TEXT
 - BOOLEAN
 - INT32