Просмотр исходного кода

Merge pull request #12316 from lafirest/fix/iotdb

fix(iotdb): move the iot_version into IoTDB connector
lafirest 2 лет назад
Родитель
Сommit
1c0ca72877

+ 1 - 10
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl

@@ -89,14 +89,6 @@ fields(action_parameters) ->
                     desc => ?DESC("config_device_id")
                 }
             )},
-        {iotdb_version,
-            mk(
-                hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
-                #{
-                    desc => ?DESC("config_iotdb_version"),
-                    default => ?VSN_1_1_X
-                }
-            )},
         {data,
             mk(
                 array(ref(?MODULE, action_parameters_data)),
@@ -310,8 +302,7 @@ action_values() ->
                 }
             ],
             is_aligned => false,
-            device_id => <<"my_device">>,
-            iotdb_version => ?VSN_1_1_X
+            device_id => <<"my_device">>
         }
     }.
 

+ 91 - 60
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -47,6 +47,7 @@
         connect_timeout := pos_integer(),
         pool_type := random | hash,
         pool_size := pos_integer(),
+        iotdb_version := atom(),
         request => undefined | map(),
         atom() => _
     }.
@@ -57,6 +58,7 @@
         connect_timeout := pos_integer(),
         pool_type := random | hash,
         channels := map(),
+        iotdb_version := atom(),
         request => undefined | map(),
         atom() => _
     }.
@@ -88,6 +90,7 @@ connector_example_values() ->
         name => <<"iotdb_connector">>,
         type => iotdb,
         enable => true,
+        iotdb_version => ?VSN_1_1_X,
         authentication => #{
             <<"username">> => <<"root">>,
             <<"password">> => <<"*****">>
@@ -121,6 +124,14 @@ fields("connection_fields") ->
                     desc => ?DESC(emqx_bridge_iotdb, "config_base_url")
                 }
             )},
+        {iotdb_version,
+            mk(
+                hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
+                #{
+                    desc => ?DESC(emqx_bridge_iotdb, "config_iotdb_version"),
+                    default => ?VSN_1_1_X
+                }
+            )},
         {authentication,
             mk(
                 hoconsc:union([ref(?MODULE, auth_basic)]),
@@ -190,7 +201,7 @@ proplists_without(Keys, List) ->
 callback_mode() -> async_if_possible.
 
 -spec on_start(manager_id(), config()) -> {ok, state()} | no_return().
-on_start(InstanceId, Config) ->
+on_start(InstanceId, #{iotdb_version := Version} = Config) ->
     %% [FIXME] The configuration passed in here is pre-processed and transformed
     %% in emqx_bridge_resource:parse_confs/2.
     case emqx_bridge_http_connector:on_start(InstanceId, Config) of
@@ -201,7 +212,7 @@ on_start(InstanceId, Config) ->
                 request => maps:get(request, State, <<>>)
             }),
             ?tp(iotdb_bridge_started, #{instance_id => InstanceId}),
-            {ok, State#{channels => #{}}};
+            {ok, State#{iotdb_version => Version, channels => #{}}};
         {error, Reason} ->
             ?SLOG(error, #{
                 msg => "failed_to_start_iotdb_bridge",
@@ -231,7 +242,11 @@ on_get_status(InstanceId, State) ->
     {ok, pos_integer(), [term()], term()}
     | {ok, pos_integer(), [term()]}
     | {error, term()}.
-on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = State) ->
+on_query(
+    InstanceId,
+    {ChannelId, _Message} = Req,
+    #{iotdb_version := IoTDBVsn, channels := Channels} = State
+) ->
     ?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
         msg => "iotdb_bridge_on_query_called",
@@ -240,7 +255,7 @@ on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = Stat
         state => emqx_utils:redact(State)
     }),
 
-    case try_render_message(Req, Channels) of
+    case try_render_message(Req, IoTDBVsn, Channels) of
         {ok, IoTDBPayload} ->
             handle_response(
                 emqx_bridge_http_connector:on_query(
@@ -254,7 +269,10 @@ on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = Stat
 -spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) ->
     {ok, pid()} | {error, empty_request}.
 on_query_async(
-    InstanceId, {ChannelId, _Message} = Req, ReplyFunAndArgs0, #{channels := Channels} = State
+    InstanceId,
+    {ChannelId, _Message} = Req,
+    ReplyFunAndArgs0,
+    #{iotdb_version := IoTDBVsn, channels := Channels} = State
 ) ->
     ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
@@ -263,7 +281,7 @@ on_query_async(
         send_message => Req,
         state => emqx_utils:redact(State)
     }),
-    case try_render_message(Req, Channels) of
+    case try_render_message(Req, IoTDBVsn, Channels) of
         {ok, IoTDBPayload} ->
             ReplyFunAndArgs =
                 {
@@ -282,10 +300,10 @@ on_query_async(
 
 on_add_channel(
     InstanceId,
-    #{channels := Channels} = OldState0,
+    #{iotdb_version := Version, channels := Channels} = OldState0,
     ChannelId,
     #{
-        parameters := #{iotdb_version := Version, data := Data} = Parameter
+        parameters := #{data := Data} = Parameter
     }
 ) ->
     case maps:is_key(ChannelId, Channels) of
@@ -404,25 +422,41 @@ proc_data(PreProcessedData, Msg) ->
         now_us => erlang:convert_time_unit(NowNS, nanosecond, microsecond),
         now_ns => NowNS
     },
-    lists:map(
-        fun(
-            #{
-                timestamp := TimestampTkn,
-                measurement := Measurement,
-                data_type := DataType0,
-                value := ValueTkn
-            }
-        ) ->
-            DataType = emqx_placeholder:proc_tmpl(DataType0, Msg),
-            #{
-                timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
-                measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
-                data_type => DataType,
-                value => proc_value(DataType, ValueTkn, Msg)
-            }
-        end,
-        PreProcessedData
-    ).
+    proc_data(PreProcessedData, Msg, Nows, []).
+
+proc_data(
+    [
+        #{
+            timestamp := TimestampTkn,
+            measurement := Measurement,
+            data_type := DataType0,
+            value := ValueTkn
+        }
+        | T
+    ],
+    Msg,
+    Nows,
+    Acc
+) ->
+    DataType = list_to_binary(
+        string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg)))
+    ),
+    case proc_value(DataType, ValueTkn, Msg) of
+        {ok, Value} ->
+            proc_data(T, Msg, Nows, [
+                #{
+                    timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
+                    measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
+                    data_type => DataType,
+                    value => Value
+                }
+                | Acc
+            ]);
+        Error ->
+            Error
+    end;
+proc_data([], _Msg, _Nows, Acc) ->
+    {ok, lists:reverse(Acc)}.
 
 iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
     Timestamp;
@@ -441,16 +475,19 @@ iot_timestamp(Timestamp, _) when is_binary(Timestamp) ->
     binary_to_integer(Timestamp).
 
 proc_value(<<"TEXT">>, ValueTkn, Msg) ->
-    case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of
-        <<"undefined">> -> null;
-        Val -> Val
-    end;
+    {ok,
+        case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of
+            <<"undefined">> -> null;
+            Val -> Val
+        end};
 proc_value(<<"BOOLEAN">>, ValueTkn, Msg) ->
-    convert_bool(replace_var(ValueTkn, Msg));
+    {ok, convert_bool(replace_var(ValueTkn, Msg))};
 proc_value(Int, ValueTkn, Msg) when Int =:= <<"INT32">>; Int =:= <<"INT64">> ->
-    convert_int(replace_var(ValueTkn, Msg));
+    {ok, convert_int(replace_var(ValueTkn, Msg))};
 proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> ->
-    convert_float(replace_var(ValueTkn, Msg)).
+    {ok, convert_float(replace_var(ValueTkn, Msg))};
+proc_value(Type, _, _) ->
+    {error, {invalid_type, Type}}.
 
 replace_var(Tokens, Data) when is_list(Tokens) ->
     [Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
@@ -495,18 +532,18 @@ convert_float(Str) when is_binary(Str) ->
 convert_float(undefined) ->
     null.
 
-make_iotdb_insert_request(DataList, IsAligned, DeviceId, IotDBVsn) ->
+make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) ->
     InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
-    Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
+    Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IoTDBVsn),
     {ok,
         maps:merge(Rows, #{
-            iotdb_field_key(is_aligned, IotDBVsn) => IsAligned,
-            iotdb_field_key(device_id, IotDBVsn) => DeviceId
+            iotdb_field_key(is_aligned, IoTDBVsn) => IsAligned,
+            iotdb_field_key(device_id, IoTDBVsn) => DeviceId
         })}.
 
-replace_dtypes(Rows0, IotDBVsn) ->
+replace_dtypes(Rows0, IoTDBVsn) ->
     {Types, Rows} = maps:take(dtypes, Rows0),
-    Rows#{iotdb_field_key(data_types, IotDBVsn) => Types}.
+    Rows#{iotdb_field_key(data_types, IoTDBVsn) => Types}.
 
 aggregate_rows(DataList, InitAcc) ->
     lists:foldr(
@@ -612,9 +649,9 @@ eval_response_body(Body, Resp) ->
 
 preproc_data_template(DataList) ->
     Atom2Bin = fun
-        (Atom, Converter) when is_atom(Atom) ->
-            Converter(Atom);
-        (Bin, _) ->
+        (Atom) when is_atom(Atom) ->
+            erlang:atom_to_binary(Atom);
+        (Bin) ->
             Bin
     end,
     lists:map(
@@ -627,33 +664,24 @@ preproc_data_template(DataList) ->
             }
         ) ->
             #{
-                timestamp => emqx_placeholder:preproc_tmpl(
-                    Atom2Bin(Timestamp, fun erlang:atom_to_binary/1)
-                ),
+                timestamp => emqx_placeholder:preproc_tmpl(Atom2Bin(Timestamp)),
                 measurement => emqx_placeholder:preproc_tmpl(Measurement),
-                data_type => emqx_placeholder:preproc_tmpl(
-                    Atom2Bin(
-                        DataType,
-                        fun(Atom) ->
-                            erlang:list_to_binary(string:uppercase(erlang:atom_to_list(Atom)))
-                        end
-                    )
-                ),
+                data_type => emqx_placeholder:preproc_tmpl(Atom2Bin(DataType)),
                 value => emqx_placeholder:preproc_tmpl(Value)
             }
         end,
         DataList
     ).
 
-try_render_message({ChannelId, Msg}, Channels) ->
+try_render_message({ChannelId, Msg}, IoTDBVsn, Channels) ->
     case maps:find(ChannelId, Channels) of
         {ok, Channel} ->
-            render_channel_message(Channel, Msg);
+            render_channel_message(Channel, IoTDBVsn, Msg);
         _ ->
             {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
     end.
 
-render_channel_message(#{is_aligned := IsAligned, iotdb_version := IoTDBVsn} = Channel, Message) ->
+render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) ->
     Payloads = to_list(parse_payload(get_payload(Message))),
     case device_id(Message, Payloads, Channel) of
         undefined ->
@@ -663,9 +691,12 @@ render_channel_message(#{is_aligned := IsAligned, iotdb_version := IoTDBVsn} = C
                 [] ->
                     {error, invalid_data};
                 DataTemplate ->
-                    DataList = proc_data(DataTemplate, Message),
-
-                    make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn)
+                    case proc_data(DataTemplate, Message) of
+                        {ok, DataList} ->
+                            make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn);
+                        Error ->
+                            Error
+                    end
             end
     end.
 

+ 28 - 5
apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl

@@ -255,7 +255,6 @@ is_error_check(Reason) ->
     end.
 
 action_config(Name, Config) ->
-    Version = ?config(iotdb_version, Config),
     Type = ?config(bridge_type, Config),
     ConfigString =
         io_lib:format(
@@ -263,15 +262,13 @@ action_config(Name, Config) ->
             "  enable = true\n"
             "  connector = \"~s\"\n"
             "  parameters = {\n"
-            "     iotdb_version = \"~s\"\n"
             "     data = []\n"
             "  }\n"
             "}\n",
             [
                 Type,
                 Name,
-                Name,
-                Version
+                Name
             ]
         ),
     ct:pal("ActionConfig:~ts~n", [ConfigString]),
@@ -281,12 +278,14 @@ connector_config(Name, Config) ->
     Host = ?config(bridge_host, Config),
     Port = ?config(bridge_port, Config),
     Type = ?config(bridge_type, Config),
+    Version = ?config(iotdb_version, Config),
     ServerURL = iotdb_server_url(Host, Port),
     ConfigString =
         io_lib:format(
             "connectors.~s.~s {\n"
             "  enable = true\n"
             "  base_url = \"~s\"\n"
+            "  iotdb_version = \"~s\"\n"
             "  authentication = {\n"
             "     username = \"root\"\n"
             "     password = \"root\"\n"
@@ -295,7 +294,8 @@ connector_config(Name, Config) ->
             [
                 Type,
                 Name,
-                ServerURL
+                ServerURL,
+                Version
             ]
         ),
     ct:pal("ConnectorConfig:~ts~n", [ConfigString]),
@@ -646,6 +646,29 @@ t_template(Config) ->
     iotdb_reset(Config, TemplateDeviceId),
     ok.
 
+t_sync_query_case(Config) ->
+    DeviceId = iotdb_device(Config),
+    Payload = make_iotdb_payload(DeviceId, "temp", "InT32", "36"),
+    MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
+    ok = emqx_bridge_v2_testlib:t_sync_query(
+        Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
+    ),
+    Query = <<"select temp from ", DeviceId/binary>>,
+    {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
+    ?assertMatch(
+        #{<<"values">> := [[36]]},
+        emqx_utils_json:decode(IoTDBResult)
+    ).
+
+t_sync_query_invalid_type(Config) ->
+    DeviceId = iotdb_device(Config),
+    Payload = make_iotdb_payload(DeviceId, "temp", "IxT32", "36"),
+    MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
+    IsInvalidType = fun(Result) -> ?assertMatch({error, {invalid_type, _}}, Result) end,
+    ok = emqx_bridge_v2_testlib:t_sync_query(
+        Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query
+    ).
+
 is_empty(null) -> true;
 is_empty([]) -> true;
 is_empty([[]]) -> true;