Просмотр исходного кода

fix(opentsdb): Enhanced the type support for template data

firest 2 лет назад
Родитель
Сommit
28867d07e6

+ 16 - 2
apps/emqx_bridge_opents/src/emqx_bridge_opents.erl

@@ -3,6 +3,7 @@
 %%--------------------------------------------------------------------
 -module(emqx_bridge_opents).
 
+-include_lib("emqx/include/logger.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
@@ -156,12 +157,25 @@ fields(action_parameters_data) ->
                 binary(),
                 #{
                     required => true,
-                    desc => ?DESC("config_parameters_tags")
+                    desc => ?DESC("config_parameters_tags"),
+                    validator => fun(Tmpl) ->
+                        case emqx_placeholder:preproc_tmpl(Tmpl) of
+                            [{var, _}] ->
+                                true;
+                            _ ->
+                                ?SLOG(warning, #{
+                                    msg => "invalid_tags_template",
+                                    path => "opents.parameters.data.tags",
+                                    data => Tmpl
+                                }),
+                                false
+                        end
+                    end
                 }
             )},
         {value,
             mk(
-                binary(),
+                hoconsc:union([integer(), float(), binary()]),
                 #{
                     required => true,
                     desc => ?DESC("config_parameters_value")

+ 20 - 7
apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl

@@ -304,9 +304,14 @@ render_channel_message(Msg, #{data := DataList}, Acc) ->
             ValueVal =
                 case ValueTk of
                     [_] ->
+                        %% just one element, maybe is a variable or a plain text
+                        %% we should keep it as it is
                         erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts));
-                    _ ->
-                        emqx_placeholder:proc_tmpl(ValueTk, Msg)
+                    Tks when is_list(Tks) ->
+                        emqx_placeholder:proc_tmpl(ValueTk, Msg);
+                    Raw ->
+                        %% not a token list, just a raw value
+                        Raw
                 end,
             Base = #{metric => MetricVal, tags => TagsVal, value => ValueVal},
             [
@@ -328,12 +333,20 @@ preproc_data_template([]) ->
 preproc_data_template(DataList) ->
     lists:map(
         fun(Data) ->
-            maps:map(
-                fun(_Key, Value) ->
-                    emqx_placeholder:preproc_tmpl(Value)
+            {Value, Data2} = maps:take(value, Data),
+            Template = maps:map(
+                fun(_Key, Val) ->
+                    emqx_placeholder:preproc_tmpl(Val)
                 end,
-                Data
-            )
+                Data2
+            ),
+
+            case Value of
+                Text when is_binary(Text) ->
+                    Template#{value => emqx_placeholder:preproc_tmpl(Text)};
+                Raw ->
+                    Template#{value => Raw}
+            end
         end,
         DataList
     ).

+ 87 - 1
apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl

@@ -83,7 +83,7 @@ init_per_testcase(TestCase, Config0) ->
         {bridge_config, ActionConfig}
         | Config0
     ],
-    %%    iotdb_reset(Config),
+    emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
     ok = snabbkaffe:start_trace(),
     Config.
 
@@ -253,3 +253,89 @@ t_query_invalid_data(Config) ->
     ok = emqx_bridge_v2_testlib:t_sync_query(
         Config, MakeMessageFun, fun is_error_check/1, opents_bridge_on_query
     ).
+
+t_tags_validator(Config) ->
+    %% Create without data  configured
+    ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
+
+    ?assertMatch(
+        {ok, _},
+        emqx_bridge_v2_testlib:update_bridge_api(Config, #{
+            <<"parameters">> => #{
+                <<"data">> => [
+                    #{
+                        <<"metric">> => <<"${metric}">>,
+                        <<"tags">> => <<"${tags}">>,
+                        <<"value">> => <<"${payload.value}">>
+                    }
+                ]
+            }
+        })
+    ),
+
+    ?assertMatch(
+        {error, _},
+        emqx_bridge_v2_testlib:update_bridge_api(Config, #{
+            <<"parameters">> => #{
+                <<"data">> => [
+                    #{
+                        <<"metric">> => <<"${metric}">>,
+                        <<"tags">> => <<"text">>,
+                        <<"value">> => <<"${payload.value}">>
+                    }
+                ]
+            }
+        })
+    ).
+
+t_raw_int_value(Config) ->
+    raw_value_test(<<"t_raw_int_value">>, 42, Config).
+
+t_raw_float_value(Config) ->
+    raw_value_test(<<"t_raw_float_value">>, 42.5, Config).
+
+raw_value_test(Metric, RawValue, Config) ->
+    ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
+    ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
+    BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
+    ?retry(
+        _Sleep = 1_000,
+        _Attempts = 10,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+
+    ?assertMatch(
+        {ok, _},
+        emqx_bridge_v2_testlib:update_bridge_api(Config, #{
+            <<"parameters">> => #{
+                <<"data">> => [
+                    #{
+                        <<"metric">> => <<"${metric}">>,
+                        <<"tags">> => <<"${tags}">>,
+                        <<"value">> => RawValue
+                    }
+                ]
+            }
+        })
+    ),
+
+    Value = 12,
+    MakeMessageFun = fun() -> make_data(Metric, Value) end,
+
+    is_success_check(
+        emqx_resource:simple_sync_query(ResourceId, {BridgeId, MakeMessageFun()})
+    ),
+
+    {ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric),
+    QResult = emqx_utils_json:decode(IoTDBResult),
+    ?assertMatch(
+        [
+            #{
+                <<"metric">> := Metric,
+                <<"dps">> := _
+            }
+        ],
+        QResult
+    ),
+    [#{<<"dps">> := Dps}] = QResult,
+    ?assertMatch([RawValue | _], maps:values(Dps)).

+ 1 - 1
rel/i18n/emqx_bridge_opents.hocon

@@ -49,7 +49,7 @@ config_parameters_metric.label:
 """Metric"""
 
 config_parameters_tags.desc:
-"""Data Type, Placeholders in format of ${var} is supported"""
+"""Tags. Only supports with placeholder to extract tags from a variable"""
 
 config_parameters_tags.label:
 """Tags"""