Просмотр исходного кода

Merge pull request #12424 from lafirest/fix/opentsdb

feat(opentsdb): supports more flexible tags schema
lafirest 2 лет назад
Родитель
Сommit
c7f1e44513

+ 40 - 14
apps/emqx_bridge_opents/src/emqx_bridge_opents.erl

@@ -130,7 +130,7 @@ fields(action_parameters) ->
                 array(ref(?MODULE, action_parameters_data)),
                 #{
                     desc => ?DESC("action_parameters_data"),
-                    default => <<"[]">>
+                    default => []
                 }
             )}
     ];
@@ -154,22 +154,27 @@ fields(action_parameters_data) ->
             )},
         {tags,
             mk(
-                binary(),
+                hoconsc:union([array(ref(?MODULE, action_parameters_data_tags)), binary()]),
                 #{
                     required => true,
                     desc => ?DESC("config_parameters_tags"),
-                    validator => fun(Tmpl) ->
-                        case emqx_placeholder:preproc_tmpl(Tmpl) of
-                            [{var, _}] ->
-                                true;
-                            _ ->
-                                ?SLOG(warning, #{
-                                    msg => "invalid_tags_template",
-                                    path => "opents.parameters.data.tags",
-                                    data => Tmpl
-                                }),
-                                false
-                        end
+                    validator => fun
+                        (Tmpl) when is_binary(Tmpl) ->
+                            case emqx_placeholder:preproc_tmpl(Tmpl) of
+                                [{var, _}] ->
+                                    true;
+                                _ ->
+                                    ?SLOG(warning, #{
+                                        msg => "invalid_tags_template",
+                                        path => "opents.parameters.data.tags",
+                                        data => Tmpl
+                                    }),
+                                    false
+                            end;
+                        ([_ | _] = Tags) when is_list(Tags) ->
+                            true;
+                        (_) ->
+                            false
                     end
                 }
             )},
@@ -182,6 +187,25 @@ fields(action_parameters_data) ->
                 }
             )}
     ];
+fields(action_parameters_data_tags) ->
+    [
+        {tag,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("tags_tag")
+                }
+            )},
+        {value,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC("tags_value")
+                }
+            )}
+    ];
 fields("post_bridge_v2") ->
     emqx_bridge_schema:type_and_name_fields(enum([opents])) ++ fields(action_config);
 fields("put_bridge_v2") ->
@@ -197,6 +221,8 @@ desc(action_parameters) ->
     ?DESC("action_parameters");
 desc(action_parameters_data) ->
     ?DESC("action_parameters_data");
+desc(action_parameters_data_tags) ->
+    ?DESC("action_parameters_data_tags");
 desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
     ["Configuration for OpenTSDB using `", string:to_upper(Method), "` method."];
 desc(_) ->

+ 47 - 14
apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl

@@ -294,13 +294,26 @@ render_channel_message(Msg, #{data := DataList}, Acc) ->
     lists:foldl(
         fun(#{metric := MetricTk, tags := TagsTk, value := ValueTk} = Data, InAcc) ->
             MetricVal = emqx_placeholder:proc_tmpl(MetricTk, Msg),
+
             TagsVal =
-                case emqx_placeholder:proc_tmpl(TagsTk, Msg, RawOpts) of
-                    [undefined] ->
-                        #{};
-                    [Any] ->
-                        Any
+                case TagsTk of
+                    [tags | TagTkList] ->
+                        maps:from_list([
+                            {
+                                emqx_placeholder:proc_tmpl(TagName, Msg),
+                                emqx_placeholder:proc_tmpl(TagValue, Msg)
+                            }
+                         || {TagName, TagValue} <- TagTkList
+                        ]);
+                    TagsTks ->
+                        case emqx_placeholder:proc_tmpl(TagsTks, Msg, RawOpts) of
+                            [undefined] ->
+                                #{};
+                            [Any] ->
+                                Any
+                        end
                 end,
+
             ValueVal =
                 case ValueTk of
                     [_] ->
@@ -308,7 +321,7 @@ render_channel_message(Msg, #{data := DataList}, Acc) ->
                         %% we should keep it as it is
                         erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts));
                     Tks when is_list(Tks) ->
-                        emqx_placeholder:proc_tmpl(ValueTk, Msg);
+                        emqx_placeholder:proc_tmpl(Tks, Msg);
                     Raw ->
                         %% not a token list, just a raw value
                         Raw
@@ -332,8 +345,8 @@ preproc_data_template([]) ->
     preproc_data_template(emqx_bridge_opents:default_data_template());
 preproc_data_template(DataList) ->
     lists:map(
-        fun(Data) ->
-            {Value, Data2} = maps:take(value, Data),
+        fun(#{tags := Tags, value := Value} = Data) ->
+            Data2 = maps:without([tags, value], Data),
             Template = maps:map(
                 fun(_Key, Val) ->
                     emqx_placeholder:preproc_tmpl(Val)
@@ -341,12 +354,32 @@ preproc_data_template(DataList) ->
                 Data2
             ),
 
-            case Value of
-                Text when is_binary(Text) ->
-                    Template#{value => emqx_placeholder:preproc_tmpl(Text)};
-                Raw ->
-                    Template#{value => Raw}
-            end
+            TagsTk =
+                case Tags of
+                    Tmpl when is_binary(Tmpl) ->
+                        emqx_placeholder:preproc_tmpl(Tmpl);
+                    List ->
+                        [
+                            tags
+                            | [
+                                {
+                                    emqx_placeholder:preproc_tmpl(TagName),
+                                    emqx_placeholder:preproc_tmpl(TagValue)
+                                }
+                             || #{tag := TagName, value := TagValue} <- List
+                            ]
+                        ]
+                end,
+
+            ValueTk =
+                case Value of
+                    Text when is_binary(Text) ->
+                        emqx_placeholder:preproc_tmpl(Text);
+                    Raw ->
+                        Raw
+                end,
+
+            Template#{tags => TagsTk, value => ValueTk}
         end,
         DataList
     ).

+ 90 - 0
apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl

@@ -294,6 +294,96 @@ t_raw_int_value(Config) ->
 t_raw_float_value(Config) ->
     raw_value_test(<<"t_raw_float_value">>, 42.5, Config).
 
+t_list_tags(Config) ->
+    ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
+    ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
+    BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
+    ?retry(
+        _Sleep = 1_000,
+        _Attempts = 10,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+
+    ?assertMatch(
+        {ok, _},
+        emqx_bridge_v2_testlib:update_bridge_api(Config, #{
+            <<"parameters">> => #{
+                <<"data">> => [
+                    #{
+                        <<"metric">> => <<"${metric}">>,
+                        <<"tags">> => [#{<<"tag">> => <<"host">>, <<"value">> => <<"valueA">>}],
+                        value => <<"${value}">>
+                    }
+                ]
+            }
+        })
+    ),
+
+    Metric = <<"t_list_tags">>,
+    Value = 12,
+    MakeMessageFun = fun() -> make_data(Metric, Value) end,
+
+    is_success_check(
+        emqx_resource:simple_sync_query(ResourceId, {BridgeId, MakeMessageFun()})
+    ),
+
+    {ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric),
+    QResult = emqx_utils_json:decode(IoTDBResult),
+    ?assertMatch(
+        [
+            #{
+                <<"metric">> := Metric,
+                <<"tags">> := #{<<"host">> := <<"valueA">>}
+            }
+        ],
+        QResult
+    ).
+
+t_list_tags_with_var(Config) ->
+    ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
+    ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
+    BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
+    ?retry(
+        _Sleep = 1_000,
+        _Attempts = 10,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+
+    ?assertMatch(
+        {ok, _},
+        emqx_bridge_v2_testlib:update_bridge_api(Config, #{
+            <<"parameters">> => #{
+                <<"data">> => [
+                    #{
+                        <<"metric">> => <<"${metric}">>,
+                        <<"tags">> => [#{<<"tag">> => <<"host">>, <<"value">> => <<"${value}">>}],
+                        value => <<"${value}">>
+                    }
+                ]
+            }
+        })
+    ),
+
+    Metric = <<"t_list_tags_with_var">>,
+    Value = 12,
+    MakeMessageFun = fun() -> make_data(Metric, Value) end,
+
+    is_success_check(
+        emqx_resource:simple_sync_query(ResourceId, {BridgeId, MakeMessageFun()})
+    ),
+
+    {ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric),
+    QResult = emqx_utils_json:decode(IoTDBResult),
+    ?assertMatch(
+        [
+            #{
+                <<"metric">> := Metric,
+                <<"tags">> := #{<<"host">> := <<"12">>}
+            }
+        ],
+        QResult
+    ).
+
 raw_value_test(Metric, RawValue, Config) ->
     ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
     ResourceId = emqx_bridge_v2_testlib:resource_id(Config),

+ 19 - 1
rel/i18n/emqx_bridge_opents.hocon

@@ -49,7 +49,7 @@ config_parameters_metric.label:
 """Metric"""
 
 config_parameters_tags.desc:
-"""Tags. Only supports with placeholder to extract tags from a variable"""
+"""Tags. Only supports with placeholder to extract tags from a variable or a list of tags"""
 
 config_parameters_tags.label:
 """Tags"""
@@ -60,4 +60,22 @@ config_parameters_value.desc:
 config_parameters_value.label:
 """Value"""
 
+action_parameters_data_tags.desc:
+"""OpenTSDB data tags"""
+
+action_parameters_data_tags.label:
+"""Tags"""
+
+tags_tag.desc:
+"""The name of this tag. Placeholders in format of ${var} is supported"""
+
+tags_tag.label:
+"""Tag"""
+
+tags_value.desc:
+"""The value of this tag. Placeholders in format of ${var} is supported"""
+
+tags_value.label:
+"""Value"""
+
 }