Browse Source

fix: REST API version 2 not available in IoTDB 0.13 and 1.0

This commit makes sure that REST API version 1 is used when the
user has selected to use IoTDB 0.13 or 1.0

Fixes:
https://emqx.atlassian.net/browse/EMQX-9920
Kjell Winblad 2 years atrás
parent
commit
4f21bdb885

+ 17 - 6
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -57,11 +57,6 @@
     (TYPE) =:= <<"kafka_consumer">> orelse ?IS_BI_DIR_BRIDGE(TYPE)
     (TYPE) =:= <<"kafka_consumer">> orelse ?IS_BI_DIR_BRIDGE(TYPE)
 ).
 ).
 
 
-%% [FIXME] this has no place here, it's used in parse_confs/3, which should
-%% rather delegate to a behavior callback than implementing domain knowledge
-%% here (reversed dependency)
--define(INSERT_TABLET_PATH, "/rest/v2/insertTablet").
-
 -if(?EMQX_RELEASE_EDITION == ee).
 -if(?EMQX_RELEASE_EDITION == ee).
 bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
 bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
 bridge_to_resource_type(mqtt) -> emqx_connector_mqtt;
 bridge_to_resource_type(mqtt) -> emqx_connector_mqtt;
@@ -343,6 +338,11 @@ parse_confs(
             }
             }
     };
     };
 parse_confs(<<"iotdb">>, Name, Conf) ->
 parse_confs(<<"iotdb">>, Name, Conf) ->
+    %% [FIXME] this has no place here, it's used in parse_confs/3, which should
+    %% rather delegate to a behavior callback than implementing domain knowledge
+    %% here (reversed dependency)
+    InsertTabletPathV1 = <<"rest/v1/insertTablet">>,
+    InsertTabletPathV2 = <<"rest/v2/insertTablet">>,
     #{
     #{
         base_url := BaseURL,
         base_url := BaseURL,
         authentication :=
         authentication :=
@@ -352,10 +352,21 @@ parse_confs(<<"iotdb">>, Name, Conf) ->
             }
             }
     } = Conf,
     } = Conf,
     BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
     BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
+    %% This version atom correspond to the macro ?VSN_1_1_X in
+    %% emqx_bridge_iotdb.hrl. It would be better to use the macro directly, but
+    %% this cannot be done without introducing a dependency on the
+    %% emqx_iotdb_bridge app (which is an EE app).
+    DefaultIOTDBBridge = 'v1.1.x',
+    Version = maps:get(iotdb_version, Conf, DefaultIOTDBBridge),
+    InsertTabletPath =
+        case Version of
+            DefaultIOTDBBridge -> InsertTabletPathV2;
+            _ -> InsertTabletPathV1
+        end,
     WebhookConfig =
     WebhookConfig =
         Conf#{
         Conf#{
             method => <<"post">>,
             method => <<"post">>,
-            url => <<BaseURL/binary, ?INSERT_TABLET_PATH>>,
+            url => <<BaseURL/binary, InsertTabletPath/binary>>,
             headers => [
             headers => [
                 {<<"Content-type">>, <<"application/json">>},
                 {<<"Content-type">>, <<"application/json">>},
                 {<<"Authorization">>, BasicToken}
                 {<<"Authorization">>, BasicToken}

+ 2 - 1
apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl

@@ -5,7 +5,8 @@
 -ifndef(EMQX_BRIDGE_IOTDB_HRL).
 -ifndef(EMQX_BRIDGE_IOTDB_HRL).
 -define(EMQX_BRIDGE_IOTDB_HRL, true).
 -define(EMQX_BRIDGE_IOTDB_HRL, true).
 
 
--define(VSN_1_X, 'v1.x').
+-define(VSN_1_1_X, 'v1.1.x').
+-define(VSN_1_0_X, 'v1.0.x').
 -define(VSN_0_13_X, 'v0.13.x').
 -define(VSN_0_13_X, 'v0.13.x').
 
 
 -endif.
 -endif.

+ 3 - 3
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl

@@ -109,10 +109,10 @@ basic_config() ->
             )},
             )},
         {iotdb_version,
         {iotdb_version,
             mk(
             mk(
-                hoconsc:enum([?VSN_1_X, ?VSN_0_13_X]),
+                hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
                 #{
                 #{
                     desc => ?DESC("config_iotdb_version"),
                     desc => ?DESC("config_iotdb_version"),
-                    default => ?VSN_1_X
+                    default => ?VSN_1_1_X
                 }
                 }
             )}
             )}
     ] ++ resource_creation_opts() ++
     ] ++ resource_creation_opts() ++
@@ -217,7 +217,7 @@ conn_bridge_example(_Method, Type) ->
         is_aligned => false,
         is_aligned => false,
         device_id => <<"my_device">>,
         device_id => <<"my_device">>,
         base_url => <<"http://iotdb.local:18080/">>,
         base_url => <<"http://iotdb.local:18080/">>,
-        iotdb_version => ?VSN_1_X,
+        iotdb_version => ?VSN_1_1_X,
         connect_timeout => <<"15s">>,
         connect_timeout => <<"15s">>,
         pool_type => <<"random">>,
         pool_type => <<"random">>,
         pool_size => 8,
         pool_size => 8,

+ 10 - 4
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl

@@ -280,7 +280,7 @@ make_iotdb_insert_request(MessageUnparsedPayload, State) ->
     Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload),
     Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload),
     IsAligned = maps:get(is_aligned, State, false),
     IsAligned = maps:get(is_aligned, State, false),
     DeviceId = device_id(Message, State),
     DeviceId = device_id(Message, State),
-    IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_X),
+    IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X),
     Payload = make_list(maps:get(payload, Message)),
     Payload = make_list(maps:get(payload, Message)),
     PreProcessedData = preproc_data_list(Payload),
     PreProcessedData = preproc_data_list(Payload),
     DataList = proc_data(PreProcessedData, Message),
     DataList = proc_data(PreProcessedData, Message),
@@ -349,15 +349,21 @@ insert_value(1, Data, [Value | Values]) ->
 insert_value(Index, Data, [Value | Values]) ->
 insert_value(Index, Data, [Value | Values]) ->
     [[null | Value] | insert_value(Index - 1, Data, Values)].
     [[null | Value] | insert_value(Index - 1, Data, Values)].
 
 
-iotdb_field_key(is_aligned, ?VSN_1_X) ->
+iotdb_field_key(is_aligned, ?VSN_1_1_X) ->
+    <<"is_aligned">>;
+iotdb_field_key(is_aligned, ?VSN_1_0_X) ->
     <<"is_aligned">>;
     <<"is_aligned">>;
 iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
 iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
     <<"isAligned">>;
     <<"isAligned">>;
-iotdb_field_key(device_id, ?VSN_1_X) ->
+iotdb_field_key(device_id, ?VSN_1_1_X) ->
+    <<"device">>;
+iotdb_field_key(device_id, ?VSN_1_0_X) ->
     <<"device">>;
     <<"device">>;
 iotdb_field_key(device_id, ?VSN_0_13_X) ->
 iotdb_field_key(device_id, ?VSN_0_13_X) ->
     <<"deviceId">>;
     <<"deviceId">>;
-iotdb_field_key(data_types, ?VSN_1_X) ->
+iotdb_field_key(data_types, ?VSN_1_1_X) ->
+    <<"data_types">>;
+iotdb_field_key(data_types, ?VSN_1_0_X) ->
     <<"data_types">>;
     <<"data_types">>;
 iotdb_field_key(data_types, ?VSN_0_13_X) ->
 iotdb_field_key(data_types, ?VSN_0_13_X) ->
     <<"dataTypes">>.
     <<"dataTypes">>.