Sfoglia il codice sorgente

Merge pull request #10928 from zmstone/0602-sync-release-51-to-master

0602 sync release 51 to master
Zaiming (Stone) Shi 2 anni fa
parent
commit
aa46c0a484
29 ha cambiato i file con 1050 aggiunte e 405 eliminazioni
  1. 30 0
      .ci/docker-compose-file/docker-compose-iotdb.yaml
  2. 1 0
      .ci/docker-compose-file/docker-compose-toxiproxy.yaml
  3. 58 0
      .ci/docker-compose-file/iotdb013/iotdb-rest.properties
  4. 6 0
      .ci/docker-compose-file/toxiproxy.json
  5. 2 2
      .github/workflows/run_emqx_app_tests.yaml
  6. 2 2
      apps/emqx/include/emqx_release.hrl
  7. 137 17
      apps/emqx/src/emqx_config.erl
  8. 1 6
      apps/emqx/src/emqx_mqtt_caps.erl
  9. 1 174
      apps/emqx/test/emqx_channel_SUITE.erl
  10. 1 1
      apps/emqx/test/emqx_client_SUITE.erl
  11. 300 0
      apps/emqx/test/emqx_config_SUITE.erl
  12. 0 1
      apps/emqx/test/emqx_connection_SUITE.erl
  13. 1 1
      apps/emqx/test/emqx_session_SUITE.erl
  14. 23 4
      apps/emqx/test/emqx_shared_sub_SUITE.erl
  15. 0 1
      apps/emqx/test/emqx_takeover_SUITE.erl
  16. 0 1
      apps/emqx/test/emqx_ws_connection_SUITE.erl
  17. 6 3
      apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl
  18. 21 7
      apps/emqx_bridge/test/emqx_bridge_testlib.erl
  19. 110 87
      apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl
  20. 309 73
      apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl
  21. 5 0
      apps/emqx_dashboard/src/emqx_dashboard.erl
  22. 14 15
      apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
  23. 3 3
      apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl
  24. 1 0
      apps/emqx_management/src/emqx_mgmt_api_clients.erl
  25. 1 3
      apps/emqx_management/src/emqx_mgmt_api_trace.erl
  26. 11 2
      apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl
  27. 1 1
      apps/emqx_prometheus/src/emqx_prometheus.app.src
  28. 3 1
      apps/emqx_prometheus/src/emqx_prometheus_schema.erl
  29. 2 0
      changes/ce/perf-10790.en.md

+ 30 - 0
.ci/docker-compose-file/docker-compose-iotdb.yaml

@@ -29,3 +29,33 @@ services:
     #     - "18080:18080"
     networks:
       - emqx_bridge
+
+  iotdb_0_13:
+    container_name: iotdb013
+    hostname: iotdb013
+    image: apache/iotdb:0.13.4-node
+    restart: always
+    environment:
+      - enable_rest_service=true
+      - cn_internal_address=iotdb013
+      - cn_internal_port=10710
+      - cn_consensus_port=10720
+      - cn_target_config_node_list=iotdb013:10710
+      - dn_rpc_address=iotdb013
+      - dn_internal_address=iotdb013
+      - dn_rpc_port=6667
+      - dn_mpp_data_exchange_port=10740
+      - dn_schema_region_consensus_port=10750
+      - dn_data_region_consensus_port=10760
+      - dn_target_config_node_list=iotdb013:10710
+    volumes:
+      - ./iotdb013/iotdb-rest.properties:/iotdb/conf/iotdb-rest.properties
+    #     - ./data:/iotdb/data
+    #     - ./logs:/iotdb/logs
+    expose:
+      - "18080"
+    # IoTDB's REST interface, uncomment for local testing
+    # ports:
+    #     - "18080:18080"
+    networks:
+      - emqx_bridge

+ 1 - 0
.ci/docker-compose-file/docker-compose-toxiproxy.yaml

@@ -46,6 +46,7 @@ services:
       # IOTDB
       - 14242:4242
       - 28080:18080
+      - 38080:38080
     command:
       - "-host=0.0.0.0"
       - "-config=/config/toxiproxy.json"

+ 58 - 0
.ci/docker-compose-file/iotdb013/iotdb-rest.properties

@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+####################
+### REST Service Configuration
+####################
+
+# Is the REST service enabled
+enable_rest_service=true
+
+# the binding port of the REST service
+# rest_service_port=18080
+
+# the default row limit to a REST query response when the rowSize parameter is not given in request
+# rest_query_default_row_size_limit=10000
+
+# the expiration time of the user login information cache (in seconds)
+# cache_expire_in_seconds=28800
+
+# maximum number of users can be stored in the user login cache.
+# cache_max_num=100
+
+# init capacity of users can be stored in the user login cache.
+# cache_init_num=10
+
+# is SSL enabled
+# enable_https=false
+
+# SSL key store path
+# key_store_path=
+
+# SSL key store password
+# key_store_pwd=
+
+# SSL trust store path
+# trust_store_path=
+
+# SSL trust store password.
+# trust_store_pwd=
+
+# SSL timeout (in seconds)
+# idle_timeout_in_seconds=50000

+ 6 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -132,6 +132,12 @@
     "upstream": "iotdb:18080",
     "enabled": true
   },
+  {
+    "name": "iotdb013",
+    "listen": "0.0.0.0:38080",
+    "upstream": "iotdb013:18080",
+    "enabled": true
+  },
   {
     "name": "minio_tcp",
     "listen": "0.0.0.0:19000",

+ 2 - 2
.github/workflows/run_emqx_app_tests.yaml

@@ -71,10 +71,10 @@ jobs:
         ./rebar3 xref
         ./rebar3 dialyzer
         ./rebar3 eunit -v
-        ./rebar3 ct -v --readable=true
+        ./rebar3 ct --name 'test@127.0.0.1' -v --readable=true
         ./rebar3 proper -d test/props
     - uses: actions/upload-artifact@v3
       if: failure()
       with:
-        name: logs
+        name: logs-${{ matrix.runs-on }}
         path: apps/emqx/_build/test/logs

+ 2 - 2
apps/emqx/include/emqx_release.hrl

@@ -32,10 +32,10 @@
 %% `apps/emqx/src/bpapi/README.md'
 
 %% Community edition
--define(EMQX_RELEASE_CE, "5.1.0-alpha.1").
+-define(EMQX_RELEASE_CE, "5.1.0-alpha.2").
 
 %% Enterprise edition
--define(EMQX_RELEASE_EE, "5.1.0-alpha.1").
+-define(EMQX_RELEASE_EE, "5.1.0-alpha.2").
 
 %% the HTTP API version
 -define(EMQX_API_VERSION, "5.0").

+ 137 - 17
apps/emqx/src/emqx_config.erl

@@ -189,23 +189,11 @@ find_raw(KeyPath) ->
 
 -spec get_zone_conf(atom(), emqx_utils_maps:config_key_path()) -> term().
 get_zone_conf(Zone, KeyPath) ->
-    case find(?ZONE_CONF_PATH(Zone, KeyPath)) of
-        %% not found in zones, try to find the global config
-        {not_found, _, _} ->
-            ?MODULE:get(KeyPath);
-        {ok, Value} ->
-            Value
-    end.
+    ?MODULE:get(?ZONE_CONF_PATH(Zone, KeyPath)).
 
 -spec get_zone_conf(atom(), emqx_utils_maps:config_key_path(), term()) -> term().
 get_zone_conf(Zone, KeyPath, Default) ->
-    case find(?ZONE_CONF_PATH(Zone, KeyPath)) of
-        %% not found in zones, try to find the global config
-        {not_found, _, _} ->
-            ?MODULE:get(KeyPath, Default);
-        {ok, Value} ->
-            Value
-    end.
+    ?MODULE:get(?ZONE_CONF_PATH(Zone, KeyPath), Default).
 
 -spec put_zone_conf(atom(), emqx_utils_maps:config_key_path(), term()) -> ok.
 put_zone_conf(Zone, KeyPath, Conf) ->
@@ -230,6 +218,9 @@ find_listener_conf(Type, Listener, KeyPath) ->
 
 -spec put(map()) -> ok.
 put(Config) ->
+    put_with_order(Config).
+
+put1(Config) ->
     maps:fold(
         fun(RootName, RootValue, _) ->
             ?MODULE:put([atom(RootName)], RootValue)
@@ -245,8 +236,8 @@ erase(RootName) ->
 
 -spec put(emqx_utils_maps:config_key_path(), term()) -> ok.
 put(KeyPath, Config) ->
-    Putter = fun(Path, Map, Value) ->
-        emqx_utils_maps:deep_put(Path, Map, Value)
+    Putter = fun(_Path, Map, Value) ->
+        maybe_update_zone(KeyPath, Map, Value)
     end,
     do_put(?CONF, Putter, KeyPath, Config).
 
@@ -342,7 +333,9 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) ->
     %% check configs against the schema
     {AppEnvs, CheckedConf} = check_config(SchemaMod, RawConf, #{}),
     save_to_app_env(AppEnvs),
-    ok = save_to_config_map(CheckedConf, RawConf).
+    ok = save_to_config_map(CheckedConf, RawConf),
+    maybe_init_default_zone(),
+    ok.
 
 %% Merge environment variable overrides on top, then merge with overrides.
 overlay_v0(SchemaMod, RawConf) when is_map(RawConf) ->
@@ -791,3 +784,130 @@ to_atom_conf_path(Path, OnFail) ->
                     V
             end
     end.
+
+%% @doc Init zones under root `zones'
+%% 1. ensure one `default' zone as it is referenced by listeners.
+%%   if default zone is unset, clone all default values from `GlobalDefaults'
+%%   if default zone is set, values are merged with `GlobalDefaults'
+%% 2. For any user defined zones, merge with `GlobalDefaults'
+%%
+%% note1, this should be called as post action after emqx_config terms (zones, and GlobalDefaults)
+%%       are written in the PV storage during emqx config loading/initialization.
+-spec maybe_init_default_zone() -> skip | ok.
+maybe_init_default_zone() ->
+    case emqx_config:get([zones], ?CONFIG_NOT_FOUND_MAGIC) of
+        ?CONFIG_NOT_FOUND_MAGIC ->
+            skip;
+        Zones0 when is_map(Zones0) ->
+            Zones =
+                case Zones0 of
+                    #{default := _DefaultZone} = Z1 ->
+                        Z1;
+                    Z2 ->
+                        Z2#{default => #{}}
+                end,
+            GLD = zone_global_defaults(),
+            NewZones = maps:map(
+                fun(_ZoneName, ZoneVal) ->
+                    merge_with_global_defaults(GLD, ZoneVal)
+                end,
+                Zones
+            ),
+            ?MODULE:put([zones], NewZones)
+    end.
+
+-spec merge_with_global_defaults(map(), map()) -> map().
+merge_with_global_defaults(GlobalDefaults, ZoneVal) ->
+    emqx_utils_maps:deep_merge(GlobalDefaults, ZoneVal).
+
+%% @doc Update zones
+%%    when 1) zone updates, return *new* zones
+%%    when 2) zone global config updates, write to PT directly.
+%% Zone global defaults are always presented in the configmap (PT) when updating zone
+-spec maybe_update_zone(runtime_config_key_path(), RootValue :: map(), Val :: term()) ->
+    NewZoneVal :: map().
+maybe_update_zone([zones | T], ZonesValue, Value) ->
+    %% note, do not write to PT, return *New value* instead
+    NewZonesValue = emqx_utils_maps:deep_put(T, ZonesValue, Value),
+    ExistingZoneNames = maps:keys(?MODULE:get([zones], #{})),
+    %% Update only new zones with global defaults
+    GLD = zone_global_defaults(),
+    maps:fold(
+        fun(ZoneName, ZoneValue, Acc) ->
+            Acc#{ZoneName := merge_with_global_defaults(GLD, ZoneValue)}
+        end,
+        NewZonesValue,
+        maps:without(ExistingZoneNames, NewZonesValue)
+    );
+maybe_update_zone([RootName | T], RootValue, Value) when is_atom(RootName) ->
+    NewRootValue = emqx_utils_maps:deep_put(T, RootValue, Value),
+    case is_zone_root(RootName) of
+        false ->
+            skip;
+        true ->
+            %% When updates on global default roots.
+            ExistingZones = ?MODULE:get([zones], #{}),
+            RootNameBin = atom_to_binary(RootName),
+            NewZones = maps:map(
+                fun(ZoneName, ZoneVal) ->
+                    BinPath = [<<"zones">>, atom_to_binary(ZoneName), RootNameBin],
+                    case
+                        %% look for user defined value from RAWCONF
+                        ?MODULE:get_raw(
+                            BinPath,
+                            ?CONFIG_NOT_FOUND_MAGIC
+                        )
+                    of
+                        ?CONFIG_NOT_FOUND_MAGIC ->
+                            ZoneVal#{RootName => NewRootValue};
+                        RawUserZoneRoot ->
+                            UserDefinedValues = rawconf_to_conf(
+                                emqx_schema, BinPath, RawUserZoneRoot
+                            ),
+                            ZoneVal#{
+                                RootName :=
+                                    emqx_utils_maps:deep_merge(
+                                        NewRootValue,
+                                        UserDefinedValues
+                                    )
+                            }
+                    end
+                end,
+                ExistingZones
+            ),
+            persistent_term:put(?PERSIS_KEY(?CONF, zones), NewZones)
+    end,
+    NewRootValue.
+
+zone_global_defaults() ->
+    maps:from_list([{K, ?MODULE:get([K])} || K <- zone_roots()]).
+
+-spec is_zone_root(atom) -> boolean().
+is_zone_root(Name) ->
+    lists:member(Name, zone_roots()).
+
+-spec zone_roots() -> [atom()].
+zone_roots() ->
+    lists:map(fun list_to_atom/1, emqx_zone_schema:roots()).
+
+%%%
+%%% @doc During init, ensure order of puts that zone is put after the other global defaults.
+%%%
+put_with_order(#{zones := _Zones} = Conf) ->
+    put1(maps:without([zones], Conf)),
+    put1(maps:with([zones], Conf));
+put_with_order(Conf) ->
+    put1(Conf).
+
+%%
+%% @doc Helper function that converts raw conf val to runtime conf val
+%%      with the types info from schema module
+-spec rawconf_to_conf(module(), RawPath :: [binary()], RawValue :: term()) -> term().
+rawconf_to_conf(SchemaModule, RawPath, RawValue) ->
+    {_, RawUserDefinedValues} =
+        check_config(
+            SchemaModule,
+            emqx_utils_maps:deep_put(RawPath, #{}, RawValue)
+        ),
+    AtomPath = to_atom_conf_path(RawPath, {raise_error, maybe_update_zone_error}),
+    emqx_utils_maps:deep_get(AtomPath, RawUserDefinedValues).

+ 1 - 6
apps/emqx/src/emqx_mqtt_caps.erl

@@ -41,8 +41,6 @@
     exclusive_subscription => boolean()
 }.
 
--define(MAX_TOPIC_LEVELS, 65535).
-
 -define(PUBCAP_KEYS, [
     max_topic_levels,
     max_qos_allowed,
@@ -154,8 +152,5 @@ get_caps(Zone) ->
 get_caps(Keys, Zone) ->
     maps:with(
         Keys,
-        maps:merge(
-            emqx_config:get([mqtt]),
-            emqx_config:get_zone_conf(Zone, [mqtt])
-        )
+        emqx_config:get_zone_conf(Zone, [mqtt])
     ).

+ 1 - 174
apps/emqx/test/emqx_channel_SUITE.erl

@@ -27,176 +27,6 @@
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 
-force_gc_conf() ->
-    #{bytes => 16777216, count => 16000, enable => true}.
-
-force_shutdown_conf() ->
-    #{enable => true, max_heap_size => 4194304, max_mailbox_size => 1000}.
-
-rpc_conf() ->
-    #{
-        async_batch_size => 256,
-        authentication_timeout => 5000,
-        call_receive_timeout => 15000,
-        connect_timeout => 5000,
-        mode => async,
-        port_discovery => stateless,
-        send_timeout => 5000,
-        socket_buffer => 1048576,
-        socket_keepalive_count => 9,
-        socket_keepalive_idle => 900,
-        socket_keepalive_interval => 75,
-        socket_recbuf => 1048576,
-        socket_sndbuf => 1048576,
-        tcp_client_num => 1,
-        tcp_server_port => 5369
-    }.
-
-mqtt_conf() ->
-    #{
-        await_rel_timeout => 300000,
-        idle_timeout => 15000,
-        ignore_loop_deliver => false,
-        keepalive_backoff => 0.75,
-        max_awaiting_rel => 100,
-        max_clientid_len => 65535,
-        max_inflight => 32,
-        max_mqueue_len => 1000,
-        max_packet_size => 1048576,
-        max_qos_allowed => 2,
-        max_subscriptions => infinity,
-        max_topic_alias => 65535,
-        max_topic_levels => 128,
-        mqueue_default_priority => lowest,
-        mqueue_priorities => disabled,
-        mqueue_store_qos0 => true,
-        peer_cert_as_clientid => disabled,
-        peer_cert_as_username => disabled,
-        response_information => [],
-        retain_available => true,
-        retry_interval => 30000,
-        server_keepalive => disabled,
-        session_expiry_interval => 7200000,
-        shared_subscription => true,
-        strict_mode => false,
-        upgrade_qos => false,
-        use_username_as_clientid => false,
-        wildcard_subscription => true
-    }.
-
-listener_mqtt_tcp_conf() ->
-    #{
-        acceptors => 16,
-        zone => default,
-        access_rules => ["allow all"],
-        bind => {{0, 0, 0, 0}, 1883},
-        max_connections => 1024000,
-        mountpoint => <<>>,
-        proxy_protocol => false,
-        proxy_protocol_timeout => 3000,
-        tcp_options => #{
-            active_n => 100,
-            backlog => 1024,
-            buffer => 4096,
-            high_watermark => 1048576,
-            nodelay => false,
-            reuseaddr => true,
-            send_timeout => 15000,
-            send_timeout_close => true
-        }
-    }.
-
-listener_mqtt_ws_conf() ->
-    #{
-        acceptors => 16,
-        zone => default,
-        access_rules => ["allow all"],
-        bind => {{0, 0, 0, 0}, 8083},
-        max_connections => 1024000,
-        mountpoint => <<>>,
-        proxy_protocol => false,
-        proxy_protocol_timeout => 3000,
-        tcp_options =>
-            #{
-                active_n => 100,
-                backlog => 1024,
-                buffer => 4096,
-                high_watermark => 1048576,
-                nodelay => false,
-                reuseaddr => true,
-                send_timeout => 15000,
-                send_timeout_close => true
-            },
-        websocket =>
-            #{
-                allow_origin_absence => true,
-                check_origin_enable => false,
-                check_origins => [],
-                compress => false,
-                deflate_opts =>
-                    #{
-                        client_max_window_bits => 15,
-                        mem_level => 8,
-                        server_max_window_bits => 15
-                    },
-                fail_if_no_subprotocol => true,
-                idle_timeout => 86400000,
-                max_frame_size => infinity,
-                mqtt_path => "/mqtt",
-                mqtt_piggyback => multiple,
-                % should allow uppercase in config
-                proxy_address_header => "X-Forwarded-For",
-                proxy_port_header => "x-forwarded-port",
-                supported_subprotocols =>
-                    ["mqtt", "mqtt-v3", "mqtt-v3.1.1", "mqtt-v5"]
-            }
-    }.
-
-listeners_conf() ->
-    #{
-        tcp => #{default => listener_mqtt_tcp_conf()},
-        ws => #{default => listener_mqtt_ws_conf()}
-    }.
-
-limiter_conf() ->
-    Make = fun() ->
-        #{
-            burst => 0,
-            rate => infinity
-        }
-    end,
-
-    lists:foldl(
-        fun(Name, Acc) ->
-            Acc#{Name => Make()}
-        end,
-        #{},
-        [bytes, messages, message_routing, connection, internal]
-    ).
-
-stats_conf() ->
-    #{enable => true}.
-
-zone_conf() ->
-    #{}.
-
-basic_conf() ->
-    #{
-        force_gc => force_gc_conf(),
-        force_shutdown => force_shutdown_conf(),
-        mqtt => mqtt_conf(),
-        rpc => rpc_conf(),
-        stats => stats_conf(),
-        listeners => listeners_conf(),
-        zones => zone_conf(),
-        limiter => limiter_conf()
-    }.
-
-set_test_listener_confs() ->
-    Conf = emqx_config:get([], #{}),
-    emqx_config:put(basic_conf()),
-    Conf.
-
 %%--------------------------------------------------------------------
 %% CT Callbacks
 %%--------------------------------------------------------------------
@@ -242,14 +72,11 @@ init_per_testcase(_TestCase, Config) ->
         fun(_) -> {ok, #{is_superuser => false}} end
     ),
     ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> allow end),
-    %% Set confs
-    OldConf = set_test_listener_confs(),
     emqx_common_test_helpers:start_apps([]),
-    [{config, OldConf} | Config].
+    Config.
 
 end_per_testcase(_TestCase, Config) ->
     meck:unload([emqx_access_control]),
-    emqx_config:put(?config(config, Config)),
     emqx_common_test_helpers:stop_apps([]),
     Config.
 

+ 1 - 1
apps/emqx/test/emqx_client_SUITE.erl

@@ -383,7 +383,7 @@ t_certcn_as_clientid_tlsv1_2(_) ->
     tls_certcn_as_clientid('tlsv1.2').
 
 t_peercert_preserved_before_connected(_) ->
-    ok = emqx_config:put_zone_conf(default, [mqtt], #{}),
+    ok = emqx_config:put_zone_conf(default, [mqtt, peer_cert_as_clientid], false),
     ok = emqx_hooks:add(
         'client.connect',
         {?MODULE, on_hook, ['client.connect', self()]},

+ 300 - 0
apps/emqx/test/emqx_config_SUITE.erl

@@ -19,6 +19,7 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 all() -> emqx_common_test_helpers:all(?MODULE).
@@ -96,3 +97,302 @@ t_unknown_rook_keys(_) ->
         end
     ),
     ok.
+
+t_init_load_emqx_schema(Config) ->
+    emqx_config:erase_all(),
+    %% Given empty config file
+    ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
+    application:set_env(emqx, config_files, [ConfFile]),
+    %% When load emqx_schema
+    ?assertEqual(ok, emqx_config:init_load(emqx_schema)),
+    %% Then default zone is injected with all global defaults
+    Default = emqx_config:get([zones, default]),
+    MQTT = emqx_config:get([mqtt]),
+    Stats = emqx_config:get([stats]),
+    FD = emqx_config:get([flapping_detect]),
+    FS = emqx_config:get([force_shutdown]),
+    CC = emqx_config:get([conn_congestion]),
+    FG = emqx_config:get([force_gc]),
+    OP = emqx_config:get([overload_protection]),
+    ?assertMatch(
+        #{
+            mqtt := MQTT,
+            stats := Stats,
+            flapping_detect := FD,
+            force_shutdown := FS,
+            conn_congestion := CC,
+            force_gc := FG,
+            overload_protection := OP
+        },
+        Default
+    ).
+
+t_init_zones_load_emqx_schema_no_default_for_none_existing(Config) ->
+    emqx_config:erase_all(),
+    %% Given empty config file
+    ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
+    application:set_env(emqx, config_files, [ConfFile]),
+    %% When emqx_schema is loaded
+    ?assertEqual(ok, emqx_config:init_load(emqx_schema)),
+    %% Then read for none existing zone should throw error
+    ?assertError(
+        {config_not_found, [zones, no_exists]},
+        emqx_config:get([zones, no_exists])
+    ).
+
+t_init_zones_load_other_schema(Config) ->
+    emqx_config:erase_all(),
+    %% Given empty config file
+    ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
+    application:set_env(emqx, config_files, [ConfFile]),
+    %% When load emqx_limiter_schema, not emqx_schema
+    %% Then load should success
+    ?assertEqual(ok, emqx_config:init_load(emqx_limiter_schema)),
+    %% Then no zones is loaded.
+    ?assertError(
+        {config_not_found, [zones]},
+        emqx_config:get([zones])
+    ),
+    %% Then no default zone is loaded.
+    ?assertError(
+        {config_not_found, [zones, default]},
+        emqx_config:get([zones, default])
+    ).
+
+t_init_zones_with_user_defined_default_zone(Config) ->
+    emqx_config:erase_all(),
+    %% Given user defined config for default zone
+    ConfFile = prepare_conf_file(
+        ?FUNCTION_NAME, <<"zones.default.mqtt.max_topic_alias=1024">>, Config
+    ),
+    application:set_env(emqx, config_files, [ConfFile]),
+    %% When schema is loaded
+    ?assertEqual(ok, emqx_config:init_load(emqx_schema)),
+
+    %% Then user defined value is set
+    {MqttV, Others} = maps:take(mqtt, emqx_config:get([zones, default])),
+    {ZGDMQTT, ExpectedOthers} = maps:take(mqtt, zone_global_defaults()),
+    ?assertEqual(ZGDMQTT#{max_topic_alias := 1024}, MqttV),
+    %% Then others are defaults
+    ?assertEqual(ExpectedOthers, Others).
+
+t_init_zones_with_user_defined_other_zone(Config) ->
+    emqx_config:erase_all(),
+    %% Given user defined config for default zone
+    ConfFile = prepare_conf_file(
+        ?FUNCTION_NAME, <<"zones.myzone.mqtt.max_topic_alias=1024">>, Config
+    ),
+    application:set_env(emqx, config_files, [ConfFile]),
+    %% When schema is loaded
+    ?assertEqual(ok, emqx_config:init_load(emqx_schema)),
+    %% Then user defined value is set and others are defaults
+
+    %% Then user defined value is set
+    {MqttV, Others} = maps:take(mqtt, emqx_config:get([zones, myzone])),
+    {ZGDMQTT, ExpectedOthers} = maps:take(mqtt, zone_global_defaults()),
+    ?assertEqual(ZGDMQTT#{max_topic_alias := 1024}, MqttV),
+    %% Then others are defaults
+    ?assertEqual(ExpectedOthers, Others),
+    %% Then default zone still have the defaults
+    ?assertEqual(zone_global_defaults(), emqx_config:get([zones, default])).
+
+t_init_zones_with_cust_root_mqtt(Config) ->
+    emqx_config:erase_all(),
+    %% Given config file with mqtt user overrides
+    ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"mqtt.retry_interval=10m">>, Config),
+    application:set_env(emqx, config_files, [ConfFile]),
+    %% When emqx_schema is loaded
+    ?assertEqual(ok, emqx_config:init_load(emqx_schema)),
+    %% Then the value is reflected as internal representation in default `zone'
+    %% and other fields under mqtt are defaults.
+    GDefaultMqtt = maps:get(mqtt, zone_global_defaults()),
+    ?assertEqual(
+        GDefaultMqtt#{retry_interval := 600000},
+        emqx_config:get([zones, default, mqtt])
+    ).
+
+t_default_zone_is_updated_after_global_defaults_updated(Config) ->
+    emqx_config:erase_all(),
+    %% Given empty emqx conf
+    ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
+    application:set_env(emqx, config_files, [ConfFile]),
+    ?assertEqual(ok, emqx_config:init_load(emqx_schema)),
+    ?assertNotEqual(900000, emqx_config:get([zones, default, mqtt, retry_interval])),
+    %% When emqx_schema is loaded
+    emqx_config:put([mqtt, retry_interval], 900000),
+    %% Then the value is reflected in default `zone' and other fields under mqtt are defaults.
+    GDefaultMqtt = maps:get(mqtt, zone_global_defaults()),
+    ?assertEqual(
+        GDefaultMqtt#{retry_interval := 900000},
+        emqx_config:get([zones, default, mqtt])
+    ).
+
+t_myzone_is_updated_after_global_defaults_updated(Config) ->
+    emqx_config:erase_all(),
+    %% Given emqx conf file with user override in myzone (none default zone)
+    ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.myzone.mqtt.max_inflight=32">>, Config),
+    application:set_env(emqx, config_files, [ConfFile]),
+    ?assertEqual(ok, emqx_config:init_load(emqx_schema)),
+    ?assertNotEqual(900000, emqx_config:get([zones, myzone, mqtt, retry_interval])),
+    %% When update another value of global default
+    emqx_config:put([mqtt, retry_interval], 900000),
+    %% Then the value is reflected in myzone and the user defined value unchanged.
+    GDefaultMqtt = maps:get(mqtt, zone_global_defaults()),
+    ?assertEqual(
+        GDefaultMqtt#{
+            retry_interval := 900000,
+            max_inflight := 32
+        },
+        emqx_config:get([zones, myzone, mqtt])
+    ),
+    %% Then the value is reflected in default zone as well.
+    ?assertEqual(
+        GDefaultMqtt#{retry_interval := 900000},
+        emqx_config:get([zones, default, mqtt])
+    ).
+
+t_zone_no_user_defined_overrides(Config) ->
+    emqx_config:erase_all(),
+    %% Given emqx conf file with user specified myzone
+    ConfFile = prepare_conf_file(
+        ?FUNCTION_NAME, <<"zones.myzone.mqtt.retry_interval=10m">>, Config
+    ),
+    application:set_env(emqx, config_files, [ConfFile]),
+    ?assertEqual(ok, emqx_config:init_load(emqx_schema)),
+    ?assertEqual(600000, emqx_config:get([zones, myzone, mqtt, retry_interval])),
+    %% When there is an update in global default
+    emqx_config:put([mqtt, max_inflight], 2),
+    %% Then the value is reflected in both default and myzone
+    ?assertMatch(2, emqx_config:get([zones, default, mqtt, max_inflight])),
+    ?assertMatch(2, emqx_config:get([zones, myzone, mqtt, max_inflight])),
+    %% Then user defined value from config is not overwritten
+    ?assertMatch(600000, emqx_config:get([zones, myzone, mqtt, retry_interval])).
+
+t_zone_no_user_defined_overrides_internal_represent(Config) ->
+    emqx_config:erase_all(),
+    %% Given emqx conf file with user specified myzone
+    ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.myzone.mqtt.max_inflight=1">>, Config),
+    application:set_env(emqx, config_files, [ConfFile]),
+    ?assertEqual(ok, emqx_config:init_load(emqx_schema)),
+    ?assertEqual(1, emqx_config:get([zones, myzone, mqtt, max_inflight])),
+    %% When there is an update in global default
+    emqx_config:put([mqtt, max_inflight], 2),
+    %% Then the value is reflected in default `zone' but not user-defined zone
+    ?assertMatch(2, emqx_config:get([zones, default, mqtt, max_inflight])),
+    ?assertMatch(1, emqx_config:get([zones, myzone, mqtt, max_inflight])).
+
+t_update_global_defaults_no_updates_on_user_overrides(Config) ->
+    emqx_config:erase_all(),
+    %% Given default zone config in conf file.
+    ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.default.mqtt.max_inflight=1">>, Config),
+    application:set_env(emqx, config_files, [ConfFile]),
+    ?assertEqual(ok, emqx_config:init_load(emqx_schema)),
+    ?assertEqual(1, emqx_config:get([zones, default, mqtt, max_inflight])),
+    %% When there is an update in global default
+    emqx_config:put([mqtt, max_inflight], 20),
+    %% Then the value is not reflected in default `zone'
+    ?assertMatch(1, emqx_config:get([zones, default, mqtt, max_inflight])).
+
+t_zone_update_with_new_zone(Config) ->
+    emqx_config:erase_all(),
+    %% Given loaded an empty conf file
+    ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
+    application:set_env(emqx, config_files, [ConfFile]),
+    ?assertEqual(ok, emqx_config:init_load(emqx_schema)),
+    %% When there is an update for creating new zone config
+    ok = emqx_config:put([zones, myzone, mqtt, max_inflight], 2),
+    %% Then the value is set and other roots are created with defaults.
+    GDefaultMqtt = maps:get(mqtt, zone_global_defaults()),
+    ?assertEqual(
+        GDefaultMqtt#{max_inflight := 2},
+        emqx_config:get([zones, myzone, mqtt])
+    ).
+
+t_init_zone_with_global_defaults(_Config) ->
+    %% Given uninitialized empty config
+    emqx_config:erase_all(),
+    Zones = #{myzone => #{mqtt => #{max_inflight => 3}}},
+    %% when put zones with global default with emqx_config:put/1
+    GlobalDefaults = zone_global_defaults(),
+    AllConf = maps:put(zones, Zones, GlobalDefaults),
+    %% Then put sucess
+    ?assertEqual(ok, emqx_config:put(AllConf)),
+    %% Then GlobalDefaults are set
+    ?assertEqual(GlobalDefaults, maps:with(maps:keys(GlobalDefaults), emqx_config:get([]))),
+    %% Then my zone and default zone are set
+    {MqttV, Others} = maps:take(mqtt, emqx_config:get([zones, myzone])),
+    {ZGDMQTT, ExpectedOthers} = maps:take(mqtt, GlobalDefaults),
+    ?assertEqual(ZGDMQTT#{max_inflight := 3}, MqttV),
+    %% Then others are defaults
+    ?assertEqual(ExpectedOthers, Others).
+
+%%%
+%%% Helpers
+%%%
+prepare_conf_file(Name, Content, CTConfig) ->
+    Filename = tc_conf_file(Name, CTConfig),
+    filelib:ensure_dir(Filename),
+    ok = file:write_file(Filename, Content),
+    Filename.
+
+tc_conf_file(TC, Config) ->
+    DataDir = ?config(data_dir, Config),
+    filename:join([DataDir, TC, 'emqx.conf']).
+
+zone_global_defaults() ->
+    #{
+        conn_congestion =>
+            #{enable_alarm => true, min_alarm_sustain_duration => 60000},
+        flapping_detect =>
+            #{ban_time => 300000, max_count => 15, window_time => disabled},
+        force_gc =>
+            #{bytes => 16777216, count => 16000, enable => true},
+        force_shutdown =>
+            #{
+                enable => true,
+                max_heap_size => 4194304,
+                max_mailbox_size => 1000
+            },
+        mqtt =>
+            #{
+                await_rel_timeout => 300000,
+                exclusive_subscription => false,
+                idle_timeout => 15000,
+                ignore_loop_deliver => false,
+                keepalive_backoff => 0.75,
+                keepalive_multiplier => 1.5,
+                max_awaiting_rel => 100,
+                max_clientid_len => 65535,
+                max_inflight => 32,
+                max_mqueue_len => 1000,
+                max_packet_size => 1048576,
+                max_qos_allowed => 2,
+                max_subscriptions => infinity,
+                max_topic_alias => 65535,
+                max_topic_levels => 128,
+                mqueue_default_priority => lowest,
+                mqueue_priorities => disabled,
+                mqueue_store_qos0 => true,
+                peer_cert_as_clientid => disabled,
+                peer_cert_as_username => disabled,
+                response_information => [],
+                retain_available => true,
+                retry_interval => 30000,
+                server_keepalive => disabled,
+                session_expiry_interval => 7200000,
+                shared_subscription => true,
+                strict_mode => false,
+                upgrade_qos => false,
+                use_username_as_clientid => false,
+                wildcard_subscription => true
+            },
+        overload_protection =>
+            #{
+                backoff_delay => 1,
+                backoff_gc => false,
+                backoff_hibernation => true,
+                backoff_new_conn => true,
+                enable => false
+            },
+        stats => #{enable => true}
+    }.

+ 0 - 1
apps/emqx/test/emqx_connection_SUITE.erl

@@ -57,7 +57,6 @@ init_per_suite(Config) ->
     ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end),
     ok = meck:expect(emqx_alarm, deactivate, fun(_, _) -> ok end),
 
-    emqx_channel_SUITE:set_test_listener_confs(),
     emqx_common_test_helpers:start_apps([]),
     Config.
 

+ 1 - 1
apps/emqx/test/emqx_session_SUITE.erl

@@ -39,7 +39,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
 %%--------------------------------------------------------------------
 
 init_per_suite(Config) ->
-    emqx_channel_SUITE:set_test_listener_confs(),
+    emqx_common_test_helpers:start_apps([]),
     ok = meck:new(
         [emqx_hooks, emqx_metrics, emqx_broker],
         [passthrough, no_history, no_link]

+ 23 - 4
apps/emqx/test/emqx_shared_sub_SUITE.erl

@@ -21,6 +21,7 @@
 
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
 
 -define(SUITE, ?MODULE).
 
@@ -46,12 +47,30 @@
 all() -> emqx_common_test_helpers:all(?SUITE).
 
 init_per_suite(Config) ->
-    net_kernel:start(['master@127.0.0.1', longnames]),
+    DistPid =
+        case net_kernel:nodename() of
+            ignored ->
+                %% calling `net_kernel:start' without `epmd'
+                %% running will result in a failure.
+                emqx_common_test_helpers:start_epmd(),
+                {ok, Pid} = net_kernel:start(['master@127.0.0.1', longnames]),
+                ct:pal("start epmd, node name: ~p", [node()]),
+                Pid;
+            _ ->
+                undefined
+        end,
     emqx_common_test_helpers:boot_modules(all),
     emqx_common_test_helpers:start_apps([]),
-    Config.
-
-end_per_suite(_Config) ->
+    [{dist_pid, DistPid} | Config].
+
+end_per_suite(Config) ->
+    DistPid = ?config(dist_pid, Config),
+    case DistPid of
+        Pid when is_pid(Pid) ->
+            net_kernel:stop();
+        _ ->
+            ok
+    end,
     emqx_common_test_helpers:stop_apps([]).
 
 init_per_testcase(Case, Config) ->

+ 0 - 1
apps/emqx/test/emqx_takeover_SUITE.erl

@@ -34,7 +34,6 @@ all() -> emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
     emqx_common_test_helpers:boot_modules(all),
-    emqx_channel_SUITE:set_test_listener_confs(),
     ?check_trace(
         ?wait_async_action(
             emqx_common_test_helpers:start_apps([]),

+ 0 - 1
apps/emqx/test/emqx_ws_connection_SUITE.erl

@@ -137,7 +137,6 @@ end_per_testcase(_, Config) ->
     Config.
 
 init_per_suite(Config) ->
-    emqx_channel_SUITE:set_test_listener_confs(),
     emqx_common_test_helpers:start_apps([]),
     Config.
 

+ 6 - 3
apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl

@@ -158,12 +158,15 @@ verify_claims(type) ->
 verify_claims(desc) ->
     ?DESC(?FUNCTION_NAME);
 verify_claims(default) ->
-    #{};
+    [];
 verify_claims(validator) ->
     [fun do_check_verify_claims/1];
 verify_claims(converter) ->
-    fun(VerifyClaims) ->
-        [{to_binary(K), V} || {K, V} <- maps:to_list(VerifyClaims)]
+    fun
+        (VerifyClaims) when is_map(VerifyClaims) ->
+            [{to_binary(K), V} || {K, V} <- maps:to_list(VerifyClaims)];
+        (VerifyClaims) ->
+            VerifyClaims
     end;
 verify_claims(required) ->
     false;

+ 21 - 7
apps/emqx_bridge/test/emqx_bridge_testlib.erl

@@ -32,7 +32,7 @@ init_per_group(TestGroup, BridgeType, Config) ->
     {ok, _} = application:ensure_all_started(emqx_connector),
     emqx_mgmt_api_test_util:init_suite(),
     UniqueNum = integer_to_binary(erlang:unique_integer([positive])),
-    MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>,
+    MQTTTopic = <<"mqtt/topic/abc", UniqueNum/binary>>,
     [
         {proxy_host, ProxyHost},
         {proxy_port, ProxyPort},
@@ -116,6 +116,7 @@ create_bridge(Config, Overrides) ->
     Name = ?config(bridge_name, Config),
     BridgeConfig0 = ?config(bridge_config, Config),
     BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
+    ct:pal("creating bridge with config: ~p", [BridgeConfig]),
     emqx_bridge:create(BridgeType, Name, BridgeConfig).
 
 create_bridge_api(Config) ->
@@ -203,7 +204,7 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config) ->
 %% Testcases
 %%------------------------------------------------------------------------------
 
-t_sync_query(Config, MakeMessageFun, IsSuccessCheck) ->
+t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
     ResourceId = resource_id(Config),
     ?check_trace(
         begin
@@ -217,11 +218,13 @@ t_sync_query(Config, MakeMessageFun, IsSuccessCheck) ->
             IsSuccessCheck(emqx_resource:simple_sync_query(ResourceId, Message)),
             ok
         end,
-        []
+        fun(Trace) ->
+            ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
+        end
     ),
     ok.
 
-t_async_query(Config, MakeMessageFun, IsSuccessCheck) ->
+t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
     ResourceId = resource_id(Config),
     ReplyFun =
         fun(Pid, Result) ->
@@ -236,10 +239,21 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck) ->
                 ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
             ),
             Message = {send_message, MakeMessageFun()},
-            emqx_resource:query(ResourceId, Message, #{async_reply_fun => {ReplyFun, [self()]}}),
+            ?assertMatch(
+                {ok, {ok, _}},
+                ?wait_async_action(
+                    emqx_resource:query(ResourceId, Message, #{
+                        async_reply_fun => {ReplyFun, [self()]}
+                    }),
+                    #{?snk_kind := TracePoint, instance_id := ResourceId},
+                    5_000
+                )
+            ),
             ok
         end,
-        []
+        fun(Trace) ->
+            ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
+        end
     ),
     receive
         {result, Result} -> IsSuccessCheck(Result)
@@ -318,7 +332,7 @@ t_start_stop(Config, StopTracePoint) ->
         end,
         fun(Trace) ->
             %% one for each probe, one for real
-            ?assertMatch([_, _, _], ?of_kind(StopTracePoint, Trace)),
+            ?assertMatch([_, _, #{instance_id := ResourceId}], ?of_kind(StopTracePoint, Trace)),
             ok
         end
     ),

+ 110 - 87
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl

@@ -72,7 +72,7 @@ on_start(InstanceId, Config) ->
                 instance_id => InstanceId,
                 request => maps:get(request, State, <<>>)
             }),
-            ?tp(iotdb_bridge_started, #{}),
+            ?tp(iotdb_bridge_started, #{instance_id => InstanceId}),
             {ok, maps:merge(Config, State)};
         {error, Reason} ->
             ?SLOG(error, #{
@@ -104,83 +104,108 @@ on_get_status(InstanceId, State) ->
     | {ok, pos_integer(), [term()]}
     | {error, term()}.
 on_query(InstanceId, {send_message, Message}, State) ->
+    ?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
         msg => "iotdb_bridge_on_query_called",
         instance_id => InstanceId,
         send_message => Message,
         state => emqx_utils:redact(State)
     }),
-    IoTDBPayload = make_iotdb_insert_request(Message, State),
-    handle_response(
-        emqx_connector_http:on_query(
-            InstanceId, {send_message, IoTDBPayload}, State
-        )
-    ).
+    case make_iotdb_insert_request(Message, State) of
+        {ok, IoTDBPayload} ->
+            handle_response(
+                emqx_connector_http:on_query(
+                    InstanceId, {send_message, IoTDBPayload}, State
+                )
+            );
+        Error ->
+            Error
+    end.
 
 -spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) ->
-    {ok, pid()}.
+    {ok, pid()} | {error, empty_request}.
 on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
+    ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}),
     ?SLOG(debug, #{
         msg => "iotdb_bridge_on_query_async_called",
         instance_id => InstanceId,
         send_message => Message,
         state => emqx_utils:redact(State)
     }),
-    IoTDBPayload = make_iotdb_insert_request(Message, State),
-    ReplyFunAndArgs =
-        {
-            fun(Result) ->
-                Response = handle_response(Result),
-                emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response)
-            end,
-            []
-        },
-    emqx_connector_http:on_query_async(
-        InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State
-    ).
+    case make_iotdb_insert_request(Message, State) of
+        {ok, IoTDBPayload} ->
+            ReplyFunAndArgs =
+                {
+                    fun(Result) ->
+                        Response = handle_response(Result),
+                        emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response)
+                    end,
+                    []
+                },
+            emqx_connector_http:on_query_async(
+                InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State
+            );
+        Error ->
+            Error
+    end.
 
 %%--------------------------------------------------------------------
 %% Internal Functions
 %%--------------------------------------------------------------------
 
-make_parsed_payload(PayloadUnparsed) when is_binary(PayloadUnparsed) ->
-    emqx_utils_json:decode(PayloadUnparsed, [return_maps]);
-make_parsed_payload(PayloadUnparsed) when is_list(PayloadUnparsed) ->
-    lists:map(fun make_parsed_payload/1, PayloadUnparsed);
-make_parsed_payload(
-    #{
-        measurement := Measurement,
-        data_type := DataType,
-        value := Value
-    } = Data
-) ->
-    Data#{
-        <<"measurement">> => Measurement,
-        <<"data_type">> => DataType,
-        <<"value">> => Value
-    }.
+get_payload(#{payload := Payload}) ->
+    Payload;
+get_payload(#{<<"payload">> := Payload}) ->
+    Payload.
+
+parse_payload(ParsedPayload) when is_map(ParsedPayload) ->
+    ParsedPayload;
+parse_payload(UnparsedPayload) when is_binary(UnparsedPayload) ->
+    emqx_utils_json:decode(UnparsedPayload);
+parse_payload(UnparsedPayloads) when is_list(UnparsedPayloads) ->
+    lists:map(fun parse_payload/1, UnparsedPayloads).
+
+preproc_data_list(DataList) ->
+    lists:foldl(
+        fun preproc_data/2,
+        [],
+        DataList
+    ).
 
 preproc_data(
     #{
         <<"measurement">> := Measurement,
         <<"data_type">> := DataType,
         <<"value">> := Value
-    } = Data
+    } = Data,
+    Acc
 ) ->
-    #{
-        timestamp => emqx_plugin_libs_rule:preproc_tmpl(
-            maps:get(<<"timestamp">>, Data, <<"now">>)
-        ),
-        measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
-        data_type => DataType,
-        value => emqx_plugin_libs_rule:preproc_tmpl(Value)
-    }.
-
-preproc_data_list(DataList) ->
-    lists:map(
-        fun preproc_data/1,
-        DataList
-    ).
+    [
+        #{
+            timestamp => maybe_preproc_tmpl(
+                maps:get(<<"timestamp">>, Data, <<"now">>)
+            ),
+            measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
+            data_type => DataType,
+            value => maybe_preproc_tmpl(Value)
+        }
+        | Acc
+    ];
+preproc_data(_NoMatch, Acc) ->
+    ?SLOG(
+        warning,
+        #{
+            msg => "iotdb_bridge_preproc_data_failed",
+            required_fields => ['measurement', 'data_type', 'value'],
+            received => _NoMatch
+        }
+    ),
+    Acc.
+
+maybe_preproc_tmpl(Value) when is_binary(Value) ->
+    emqx_plugin_libs_rule:preproc_tmpl(Value);
+maybe_preproc_tmpl(Value) ->
+    Value.
 
 proc_data(PreProcessedData, Msg) ->
     NowNS = erlang:system_time(nanosecond),
@@ -199,9 +224,7 @@ proc_data(PreProcessedData, Msg) ->
             }
         ) ->
             #{
-                timestamp => iot_timestamp(
-                    emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows
-                ),
+                timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
                 measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Msg),
                 data_type => DataType,
                 value => proc_value(DataType, ValueTkn, Msg)
@@ -210,6 +233,11 @@ proc_data(PreProcessedData, Msg) ->
         PreProcessedData
     ).
 
+iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
+    Timestamp;
+iot_timestamp(TimestampTkn, Msg, Nows) ->
+    iot_timestamp(emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows).
+
 iot_timestamp(Timestamp, #{now_ms := NowMs}) when
     Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
 ->
@@ -240,6 +268,7 @@ replace_var(Val, _Data) ->
     Val.
 
 convert_bool(B) when is_boolean(B) -> B;
+convert_bool(null) -> null;
 convert_bool(1) -> true;
 convert_bool(0) -> false;
 convert_bool(<<"1">>) -> true;
@@ -249,8 +278,7 @@ convert_bool(<<"True">>) -> true;
 convert_bool(<<"TRUE">>) -> true;
 convert_bool(<<"false">>) -> false;
 convert_bool(<<"False">>) -> false;
-convert_bool(<<"FALSE">>) -> false;
-convert_bool(undefined) -> null.
+convert_bool(<<"FALSE">>) -> false.
 
 convert_int(Int) when is_integer(Int) -> Int;
 convert_int(Float) when is_float(Float) -> floor(Float);
@@ -276,24 +304,29 @@ convert_float(Str) when is_binary(Str) ->
 convert_float(undefined) ->
     null.
 
-make_iotdb_insert_request(MessageUnparsedPayload, State) ->
-    Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload),
+make_iotdb_insert_request(Message, State) ->
+    Payloads = to_list(parse_payload(get_payload(Message))),
     IsAligned = maps:get(is_aligned, State, false),
-    DeviceId = device_id(Message, State),
     IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X),
-    Payload = make_list(maps:get(payload, Message)),
-    PreProcessedData = preproc_data_list(Payload),
-    DataList = proc_data(PreProcessedData, Message),
-    InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
-    Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
-    maps:merge(Rows, #{
-        iotdb_field_key(is_aligned, IotDBVsn) => IsAligned,
-        iotdb_field_key(device_id, IotDBVsn) => DeviceId
-    }).
-
-replace_dtypes(Rows, IotDBVsn) ->
-    {Types, Map} = maps:take(dtypes, Rows),
-    Map#{iotdb_field_key(data_types, IotDBVsn) => Types}.
+    case {device_id(Message, Payloads, State), preproc_data_list(Payloads)} of
+        {undefined, _} ->
+            {error, device_id_missing};
+        {_, []} ->
+            {error, invalid_data};
+        {DeviceId, PreProcessedData} ->
+            DataList = proc_data(PreProcessedData, Message),
+            InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
+            Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
+            {ok,
+                maps:merge(Rows, #{
+                    iotdb_field_key(is_aligned, IotDBVsn) => IsAligned,
+                    iotdb_field_key(device_id, IotDBVsn) => DeviceId
+                })}
+    end.
+
+replace_dtypes(Rows0, IotDBVsn) ->
+    {Types, Rows} = maps:take(dtypes, Rows0),
+    Rows#{iotdb_field_key(data_types, IotDBVsn) => Types}.
 
 aggregate_rows(DataList, InitAcc) ->
     lists:foldr(
@@ -368,24 +401,14 @@ iotdb_field_key(data_types, ?VSN_1_0_X) ->
 iotdb_field_key(data_types, ?VSN_0_13_X) ->
     <<"dataTypes">>.
 
-make_list(List) when is_list(List) -> List;
-make_list(Data) -> [Data].
+to_list(List) when is_list(List) -> List;
+to_list(Data) -> [Data].
 
-device_id(Message, State) ->
+device_id(Message, Payloads, State) ->
     case maps:get(device_id, State, undefined) of
         undefined ->
-            case maps:get(payload, Message) of
-                #{<<"device_id">> := DeviceId} ->
-                    DeviceId;
-                #{device_id := DeviceId} ->
-                    DeviceId;
-                _NotFound ->
-                    Topic = maps:get(topic, Message),
-                    case re:replace(Topic, "/", ".", [global, {return, binary}]) of
-                        <<"root.", _/binary>> = Device -> Device;
-                        Device -> <<"root.", Device/binary>>
-                    end
-            end;
+            %% [FIXME] there could be conflicting device-ids in the Payloads
+            maps:get(<<"device_id">>, hd(Payloads), undefined);
         DeviceId ->
             DeviceIdTkn = emqx_plugin_libs_rule:preproc_tmpl(DeviceId),
             emqx_plugin_libs_rule:proc_tmpl(DeviceIdTkn, Message)

+ 309 - 73
apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl

@@ -6,8 +6,10 @@
 -compile(nowarn_export_all).
 -compile(export_all).
 
+-include("emqx_bridge_iotdb.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(BRIDGE_TYPE_BIN, <<"iotdb">>).
 -define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_iotdb]).
@@ -18,13 +20,15 @@
 
 all() ->
     [
-        {group, plain}
+        {group, plain},
+        {group, legacy}
     ].
 
 groups() ->
     AllTCs = emqx_common_test_helpers:all(?MODULE),
     [
-        {plain, AllTCs}
+        {plain, AllTCs},
+        {legacy, AllTCs}
     ].
 
 init_per_suite(Config) ->
@@ -43,7 +47,32 @@ init_per_group(plain = Type, Config0) ->
             [
                 {bridge_host, Host},
                 {bridge_port, Port},
-                {proxy_name, ProxyName}
+                {proxy_name, ProxyName},
+                {iotdb_version, ?VSN_1_1_X},
+                {iotdb_rest_prefix, <<"/rest/v2/">>}
+                | Config
+            ];
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_iotdb);
+                _ ->
+                    {skip, no_iotdb}
+            end
+    end;
+init_per_group(legacy = Type, Config0) ->
+    Host = os:getenv("IOTDB_LEGACY_HOST", "toxiproxy.emqx.net"),
+    Port = list_to_integer(os:getenv("IOTDB_LEGACY_PORT", "38080")),
+    ProxyName = "iotdb013",
+    case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
+        true ->
+            Config = emqx_bridge_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0),
+            [
+                {bridge_host, Host},
+                {bridge_port, Port},
+                {proxy_name, ProxyName},
+                {iotdb_version, ?VSN_0_13_X},
+                {iotdb_rest_prefix, <<"/rest/v1/">>}
                 | Config
             ];
         false ->
@@ -58,7 +87,8 @@ init_per_group(_Group, Config) ->
     Config.
 
 end_per_group(Group, Config) when
-    Group =:= plain
+    Group =:= plain;
+    Group =:= legacy
 ->
     emqx_bridge_testlib:end_per_group(Config),
     ok;
@@ -67,7 +97,7 @@ end_per_group(_Group, _Config) ->
 
 init_per_testcase(TestCase, Config0) ->
     Config = emqx_bridge_testlib:init_per_testcase(TestCase, Config0, fun bridge_config/3),
-    reset_service(Config),
+    iotdb_reset(Config),
     Config.
 
 end_per_testcase(TestCase, Config) ->
@@ -76,20 +106,23 @@ end_per_testcase(TestCase, Config) ->
 %%------------------------------------------------------------------------------
 %% Helper fns
 %%------------------------------------------------------------------------------
+iotdb_server_url(Host, Port) ->
+    iolist_to_binary([
+        "http://",
+        Host,
+        ":",
+        integer_to_binary(Port)
+    ]).
 
 bridge_config(TestCase, _TestGroup, Config) ->
     UniqueNum = integer_to_binary(erlang:unique_integer()),
     Host = ?config(bridge_host, Config),
     Port = ?config(bridge_port, Config),
+    Version = ?config(iotdb_version, Config),
     Name = <<
         (atom_to_binary(TestCase))/binary, UniqueNum/binary
     >>,
-    ServerURL = iolist_to_binary([
-        "http://",
-        Host,
-        ":",
-        integer_to_binary(Port)
-    ]),
+    ServerURL = iotdb_server_url(Host, Port),
     ConfigString =
         io_lib:format(
             "bridges.iotdb.~s {\n"
@@ -99,6 +132,7 @@ bridge_config(TestCase, _TestGroup, Config) ->
             "     username = \"root\"\n"
             "     password = \"root\"\n"
             "  }\n"
+            "iotdb_version = \"~s\"\n"
             "  pool_size = 1\n"
             "  resource_opts = {\n"
             "     health_check_interval = 5000\n"
@@ -109,12 +143,54 @@ bridge_config(TestCase, _TestGroup, Config) ->
             "}\n",
             [
                 Name,
-                ServerURL
+                ServerURL,
+                Version
             ]
         ),
     {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Config, ConfigString, Name)}.
 
-reset_service(Config) ->
+make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
+    #{
+        measurement => s_to_b(Measurement),
+        data_type => s_to_b(Type),
+        value => s_to_b(Value),
+        device_id => DeviceId,
+        is_aligned => true
+    }.
+
+make_iotdb_payload(DeviceId, Measurement, Type, Value, Timestamp) ->
+    Payload = make_iotdb_payload(DeviceId, Measurement, Type, Value),
+    Payload#{timestamp => Timestamp}.
+
+s_to_b(S) when is_list(S) -> list_to_binary(S);
+s_to_b(V) -> V.
+
+make_message_fun(Topic, Payload) ->
+    fun() ->
+        MsgId = erlang:unique_integer([positive]),
+        #{
+            topic => Topic,
+            id => MsgId,
+            payload => emqx_utils_json:encode(Payload),
+            retain => true
+        }
+    end.
+
+iotdb_topic(Config) ->
+    ?config(mqtt_topic, Config).
+
+iotdb_device(Config) ->
+    Topic = iotdb_topic(Config),
+    topic_to_iotdb_device(Topic).
+
+topic_to_iotdb_device(Topic) ->
+    Device = re:replace(Topic, "/", ".", [global, {return, binary}]),
+    <<"root.", Device/binary>>.
+
+iotdb_request(Config, Path, Body) ->
+    iotdb_request(Config, Path, Body, #{}).
+
+iotdb_request(Config, Path, Body, Opts) ->
     _BridgeConfig =
         #{
             <<"base_url">> := BaseURL,
@@ -125,43 +201,40 @@ reset_service(Config) ->
         } =
         ?config(bridge_config, Config),
     ct:pal("bridge config: ~p", [_BridgeConfig]),
-    Path = <<BaseURL/binary, "/rest/v2/nonQuery">>,
+    URL = <<BaseURL/binary, Path/binary>>,
     BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
     Headers = [
         {"Content-type", "application/json"},
         {"Authorization", binary_to_list(BasicToken)}
     ],
+    emqx_mgmt_api_test_util:request_api(post, URL, "", Headers, Body, Opts).
+
+iotdb_reset(Config) ->
     Device = iotdb_device(Config),
+    iotdb_reset(Config, Device).
+
+iotdb_reset(Config, Device) ->
+    Prefix = ?config(iotdb_rest_prefix, Config),
     Body = #{sql => <<"delete from ", Device/binary, ".*">>},
-    {ok, _} = emqx_mgmt_api_test_util:request_api(post, Path, "", Headers, Body, #{}).
+    {ok, _} = iotdb_request(Config, <<Prefix/binary, "nonQuery">>, Body).
 
-make_iotdb_payload(DeviceId) ->
-    make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36").
+iotdb_query(Config, Query) ->
+    Prefix = ?config(iotdb_rest_prefix, Config),
+    Path = <<Prefix/binary, "query">>,
+    Opts = #{return_all => true},
+    Body = #{sql => Query},
+    iotdb_request(Config, Path, Body, Opts).
 
-make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
-    #{
-        measurement => Measurement,
-        data_type => Type,
-        value => Value,
-        device_id => DeviceId,
-        is_aligned => false
-    }.
+is_success_check({ok, 200, _, Body}) ->
+    ?assert(is_code(200, emqx_utils_json:decode(Body))).
 
-make_message_fun(Topic, Payload) ->
-    fun() ->
-        MsgId = erlang:unique_integer([positive]),
-        #{
-            topic => Topic,
-            id => MsgId,
-            payload => Payload,
-            retain => true
-        }
-    end.
+is_code(Code, #{<<"code">> := Code}) -> true;
+is_code(_, _) -> false.
 
-iotdb_device(Config) ->
-    MQTTTopic = ?config(mqtt_topic, Config),
-    Device = re:replace(MQTTTopic, "/", ".dev", [global, {return, binary}]),
-    <<"root.", Device/binary>>.
+is_error_check(Reason) ->
+    fun(Result) ->
+        ?assertEqual({error, Reason}, Result)
+    end.
 
 %%------------------------------------------------------------------------------
 %% Testcases
@@ -169,55 +242,164 @@ iotdb_device(Config) ->
 
 t_sync_query_simple(Config) ->
     DeviceId = iotdb_device(Config),
-    Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"),
-    MakeMessageFun = make_message_fun(DeviceId, Payload),
-    IsSuccessCheck =
-        fun(Result) ->
-            ?assertEqual(ok, element(1, Result))
-        end,
-    emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck).
+    Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
+    MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
+    ok = emqx_bridge_testlib:t_sync_query(
+        Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
+    ),
+    Query = <<"select temp from ", DeviceId/binary>>,
+    {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
+    ?assertMatch(
+        #{<<"values">> := [[36]]},
+        emqx_utils_json:decode(IoTDBResult)
+    ).
 
 t_async_query(Config) ->
     DeviceId = iotdb_device(Config),
-    Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"),
-    MakeMessageFun = make_message_fun(DeviceId, Payload),
-    IsSuccessCheck =
-        fun(Result) ->
-            ?assertEqual(ok, element(1, Result))
-        end,
-    emqx_bridge_testlib:t_async_query(Config, MakeMessageFun, IsSuccessCheck).
+    Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
+    MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
+    ok = emqx_bridge_testlib:t_async_query(
+        Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query_async
+    ),
+    Query = <<"select temp from ", DeviceId/binary>>,
+    {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
+    ?assertMatch(
+        #{<<"values">> := [[36]]},
+        emqx_utils_json:decode(IoTDBResult)
+    ).
 
 t_sync_query_aggregated(Config) ->
     DeviceId = iotdb_device(Config),
     Payload = [
-        make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"),
-        (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "37"))#{timestamp => <<"mow_us">>},
-        (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "38"))#{timestamp => <<"mow_ns">>},
-        make_iotdb_payload(DeviceId, "charged", <<"BOOLEAN">>, "1"),
-        make_iotdb_payload(DeviceId, "stoked", <<"BOOLEAN">>, "true"),
-        make_iotdb_payload(DeviceId, "enriched", <<"BOOLEAN">>, <<"TRUE">>),
-        make_iotdb_payload(DeviceId, "drained", <<"BOOLEAN">>, "0"),
-        make_iotdb_payload(DeviceId, "dazzled", <<"BOOLEAN">>, "false"),
-        make_iotdb_payload(DeviceId, "unplugged", <<"BOOLEAN">>, <<"FALSE">>),
-        make_iotdb_payload(DeviceId, "weight", <<"FLOAT">>, "87.3"),
-        make_iotdb_payload(DeviceId, "foo", <<"TEXT">>, <<"bar">>)
+        make_iotdb_payload(DeviceId, "temp", "INT32", "36", 1685112026290),
+        make_iotdb_payload(DeviceId, "temp", "INT32", 37, 1685112026291),
+        make_iotdb_payload(DeviceId, "temp", "INT32", 38.7, 1685112026292),
+        make_iotdb_payload(DeviceId, "temp", "INT32", "39", <<"1685112026293">>),
+        make_iotdb_payload(DeviceId, "temp", "INT64", "36", 1685112026294),
+        make_iotdb_payload(DeviceId, "temp", "INT64", 36, 1685112026295),
+        make_iotdb_payload(DeviceId, "temp", "INT64", 36.7, 1685112026296),
+        %% implicit 'now()' timestamp
+        make_iotdb_payload(DeviceId, "temp", "INT32", "40"),
+        %% [FIXME] neither nanoseconds nor microseconds don't seem to be supported by IoTDB
+        (make_iotdb_payload(DeviceId, "temp", "INT32", "41"))#{timestamp => <<"now_us">>},
+        (make_iotdb_payload(DeviceId, "temp", "INT32", "42"))#{timestamp => <<"now_ns">>},
+
+        make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3", 1685112026290),
+        make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3, 1685112026291),
+        make_iotdb_payload(DeviceId, "weight", "FLOAT", 87, 1685112026292),
+        make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3", 1685112026293),
+        make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3, 1685112026294),
+        make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87, 1685112026295),
+
+        make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1", 1685112026300),
+        make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1, 1685112026300),
+        make_iotdb_payload(DeviceId, "started", "BOOLEAN", true, 1685112026300),
+        make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true", 1685112026300),
+        make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE", 1685112026300),
+        make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True", 1685112026300),
+        make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0", 1685112026300),
+        make_iotdb_payload(DeviceId, "toasted", "BOOLEAN", 0, 1685112026300),
+        make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false, 1685112026300),
+        make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false", 1685112026300),
+        make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE", 1685112026300),
+        make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False", 1685112026300),
+        make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null, 1685112026300),
+
+        make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300)
     ],
-    MakeMessageFun = make_message_fun(DeviceId, Payload),
-    IsSuccessCheck =
-        fun(Result) ->
-            ?assertEqual(ok, element(1, Result))
-        end,
-    emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck).
+    MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
+    ok = emqx_bridge_testlib:t_sync_query(
+        Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
+    ),
+
+    %% check temp
+    QueryTemp = <<"select temp from ", DeviceId/binary>>,
+    {ok, {{_, 200, _}, _, ResultTemp}} = iotdb_query(Config, QueryTemp),
+    ?assertMatch(
+        #{<<"values">> := [[36, 37, 38, 39, 36, 36, 36, 40, 41, 42]]},
+        emqx_utils_json:decode(ResultTemp)
+    ),
+    %% check weight
+    QueryWeight = <<"select weight from ", DeviceId/binary>>,
+    {ok, {{_, 200, _}, _, ResultWeight}} = iotdb_query(Config, QueryWeight),
+    ?assertMatch(
+        #{<<"values">> := [[87.3, 87.3, 87.0, 87.3, 87.3, 87.0]]},
+        emqx_utils_json:decode(ResultWeight)
+    ),
+    %% check rest ts = 1685112026300
+    QueryRest = <<"select * from ", DeviceId/binary, " where time = 1685112026300">>,
+    {ok, {{_, 200, _}, _, ResultRest}} = iotdb_query(Config, QueryRest),
+    #{<<"values">> := Values, <<"expressions">> := Expressions} = emqx_utils_json:decode(
+        ResultRest
+    ),
+    Results = maps:from_list(lists:zipwith(fun(K, [V]) -> {K, V} end, Expressions, Values)),
+    Exp = #{
+        exp(DeviceId, "charged") => true,
+        exp(DeviceId, "floated") => true,
+        exp(DeviceId, "started") => true,
+        exp(DeviceId, "stoked") => true,
+        exp(DeviceId, "enriched") => true,
+        exp(DeviceId, "gutted") => true,
+        exp(DeviceId, "drained") => false,
+        exp(DeviceId, "toasted") => false,
+        exp(DeviceId, "uncharted") => false,
+        exp(DeviceId, "dazzled") => false,
+        exp(DeviceId, "unplugged") => false,
+        exp(DeviceId, "unraveled") => false,
+        exp(DeviceId, "undecided") => null,
+        exp(DeviceId, "foo") => <<"bar">>,
+        exp(DeviceId, "temp") => null,
+        exp(DeviceId, "weight") => null
+    },
+    ?assertEqual(Exp, Results),
+
+    ok.
+
+exp(Dev, M0) ->
+    M = s_to_b(M0),
+    <<Dev/binary, ".", M/binary>>.
 
 t_sync_query_fail(Config) ->
     DeviceId = iotdb_device(Config),
-    Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "Anton"),
-    MakeMessageFun = make_message_fun(DeviceId, Payload),
+    Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "Anton"),
+    MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
     IsSuccessCheck =
         fun(Result) ->
             ?assertEqual(error, element(1, Result))
         end,
-    emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck).
+    emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query).
+
+t_sync_device_id_missing(Config) ->
+    emqx_bridge_testlib:t_sync_query(
+        Config,
+        make_message_fun(iotdb_topic(Config), #{foo => bar}),
+        is_error_check(device_id_missing),
+        iotdb_bridge_on_query
+    ).
+
+t_sync_invalid_data(Config) ->
+    emqx_bridge_testlib:t_sync_query(
+        Config,
+        make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
+        is_error_check(invalid_data),
+        iotdb_bridge_on_query
+    ).
+
+t_async_device_id_missing(Config) ->
+    emqx_bridge_testlib:t_async_query(
+        Config,
+        make_message_fun(iotdb_topic(Config), #{foo => bar}),
+        is_error_check(device_id_missing),
+        iotdb_bridge_on_query_async
+    ).
+
+t_async_invalid_data(Config) ->
+    emqx_bridge_testlib:t_async_query(
+        Config,
+        make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
+        is_error_check(invalid_data),
+        iotdb_bridge_on_query_async
+    ).
 
 t_create_via_http(Config) ->
     emqx_bridge_testlib:t_create_via_http(Config).
@@ -227,3 +409,57 @@ t_start_stop(Config) ->
 
 t_on_get_status(Config) ->
     emqx_bridge_testlib:t_on_get_status(Config).
+
+t_device_id(Config) ->
+    ResourceId = emqx_bridge_testlib:resource_id(Config),
+    %% Create without device_id configured
+    ?assertMatch({ok, _}, emqx_bridge_testlib:create_bridge(Config)),
+    ?retry(
+        _Sleep = 1_000,
+        _Attempts = 20,
+        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+    ConfiguredDevice = <<"root.someOtherDevice234">>,
+    DeviceId = <<"root.deviceFooBar123">>,
+    Topic = <<"some/random/topic">>,
+    iotdb_reset(Config, DeviceId),
+    iotdb_reset(Config, ConfiguredDevice),
+    Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true),
+    MessageF1 = make_message_fun(Topic, Payload1),
+    is_success_check(
+        emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()})
+    ),
+    {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
+    ct:pal("device_id result: ~p", [emqx_utils_json:decode(Res1_1)]),
+    #{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1),
+    ?assertNot(is_empty(Values1_1)),
+
+    iotdb_reset(Config, DeviceId),
+    iotdb_reset(Config, ConfiguredDevice),
+
+    %% reconfigure bridge with device_id
+    {ok, _} =
+        emqx_bridge_testlib:update_bridge_api(Config, #{<<"device_id">> => ConfiguredDevice}),
+
+    is_success_check(
+        emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()})
+    ),
+
+    %% even though we had a device_id in the message it's not being used
+    {ok, {{_, 200, _}, _, Res2_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
+    #{<<"values">> := Values2_1} = emqx_utils_json:decode(Res2_1),
+    ?assert(is_empty(Values2_1)),
+    {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(
+        Config, <<"select * from ", ConfiguredDevice/binary>>
+    ),
+    #{<<"values">> := Values2_2} = emqx_utils_json:decode(Res2_2),
+    ?assertNot(is_empty(Values2_2)),
+
+    iotdb_reset(Config, DeviceId),
+    iotdb_reset(Config, ConfiguredDevice),
+    ok.
+
+is_empty(null) -> true;
+is_empty([]) -> true;
+is_empty([[]]) -> true;
+is_empty(_) -> false.

+ 5 - 0
apps/emqx_dashboard/src/emqx_dashboard.erl

@@ -59,6 +59,11 @@ start_listeners(Listeners) ->
                     scheme => basic,
                     description =>
                         <<"Authorize with [API Keys](https://www.emqx.io/docs/en/v5.0/admin/api.html#api-keys)">>
+                },
+                'bearerAuth' => #{
+                    type => http,
+                    scheme => bearer,
+                    description => <<"Authorize with Bearer Token">>
                 }
             }
         }

+ 14 - 15
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -536,6 +536,7 @@ init_prop(Keys, Init, Type) ->
 
 format_prop(deprecated, Value) when is_boolean(Value) -> Value;
 format_prop(deprecated, _) -> true;
+format_prop(default, []) -> [];
 format_prop(_, Schema) -> to_bin(Schema).
 
 trans_required(Spec, true, _) -> Spec#{required => true};
@@ -567,18 +568,7 @@ trans_description(Spec, Hocon, Options) ->
             Spec;
         Desc ->
             Desc1 = binary:replace(Desc, [<<"\n">>], <<"<br/>">>, [global]),
-            maybe_add_summary_from_label(Spec#{description => Desc1}, Hocon, Options)
-    end.
-
-maybe_add_summary_from_label(Spec, Hocon, Options) ->
-    Label =
-        case desc_struct(Hocon) of
-            ?DESC(_, _) = Struct -> get_i18n(<<"label">>, Struct, undefined, Options);
-            _ -> undefined
-        end,
-    case Label of
-        undefined -> Spec;
-        _ -> Spec#{summary => Label}
+            Spec#{description => Desc1}
     end.
 
 get_i18n(Tag, ?DESC(Namespace, Id), Default, Options) ->
@@ -970,13 +960,13 @@ to_bin(List) when is_list(List) ->
 to_bin(Boolean) when is_boolean(Boolean) -> Boolean;
 to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
 to_bin({Type, Args}) ->
-    unicode:characters_to_binary(io_lib:format("~ts(~p)", [Type, Args]));
+    unicode:characters_to_binary(io_lib:format("~ts-~p", [Type, Args]));
 to_bin(X) ->
     X.
 
 parse_object(PropList = [_ | _], Module, Options) when is_list(PropList) ->
     {Props, Required, Refs} = parse_object_loop(PropList, Module, Options),
-    Object = #{<<"type">> => object, <<"properties">> => Props},
+    Object = #{<<"type">> => object, <<"properties">> => fix_empty_props(Props)},
     case Required of
         [] -> {Object, Refs};
         _ -> {maps:put(required, Required, Object), Refs}
@@ -1012,7 +1002,10 @@ parse_object_loop([{Name, Hocon} | Rest], Module, Options, Props, Required, Refs
             HoconType = hocon_schema:field_schema(Hocon, type),
             Init0 = init_prop([default | ?DEFAULT_FIELDS], #{}, Hocon),
             SchemaToSpec = schema_converter(Options),
-            Init = trans_desc(Init0, Hocon, SchemaToSpec, NameBin, Options),
+            Init = maps:remove(
+                summary,
+                trans_desc(Init0, Hocon, SchemaToSpec, NameBin, Options)
+            ),
             {Prop, Refs1} = SchemaToSpec(HoconType, Module),
             NewRequiredAcc =
                 case is_required(Hocon) of
@@ -1039,9 +1032,15 @@ parse_object_loop([{Name, Hocon} | Rest], Module, Options, Props, Required, Refs
 %% return true if the field has 'importance' set to 'hidden'
 is_hidden(Hocon) ->
     hocon_schema:is_hidden(Hocon, #{include_importance_up_from => ?IMPORTANCE_LOW}).
+
 is_required(Hocon) ->
     hocon_schema:field_schema(Hocon, required) =:= true.
 
+fix_empty_props([]) ->
+    #{};
+fix_empty_props(Props) ->
+    Props.
+
 content(ApiSpec) ->
     content(ApiSpec, undefined).
 

+ 3 - 3
apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl

@@ -55,7 +55,7 @@ t_start_no_session(_Config) ->
     Opts = #{
         clientinfo => #{
             clientid => ?CLIENT_ID,
-            zone => internal
+            zone => default
         },
         conninfo => #{
             clientid => ?CLIENT_ID,
@@ -76,7 +76,7 @@ t_start_no_expire(_Config) ->
     Opts = #{
         clientinfo => #{
             clientid => ?CLIENT_ID,
-            zone => internal
+            zone => default
         },
         conninfo => #{
             clientid => ?CLIENT_ID,
@@ -97,7 +97,7 @@ t_start_infinite_expire(_Config) ->
     Opts = #{
         clientinfo => #{
             clientid => ?CLIENT_ID,
-            zone => internal
+            zone => default
         },
         conninfo => #{
             clientid => ?CLIENT_ID,

+ 1 - 0
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -369,6 +369,7 @@ schema("/clients/:clientid/keepalive") ->
         put => #{
             description => ?DESC(set_keepalive_seconds),
             tags => ?TAGS,
+            hidden => true,
             parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
             'requestBody' => hoconsc:mk(hoconsc:ref(?MODULE, keepalive)),
             responses => #{

+ 1 - 3
apps/emqx_management/src/emqx_mgmt_api_trace.erl

@@ -344,9 +344,7 @@ fields(bytes) ->
                     description => ?DESC(max_response_bytes),
                     in => query,
                     required => false,
-                    default => 1000,
-                    minimum => 0,
-                    maximum => ?MAX_SINT32
+                    default => 1000
                 }
             )}
     ];

+ 11 - 2
apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl

@@ -137,9 +137,14 @@ t_global_zone(_Config) ->
     ),
     ?assertEqual(lists:usort(ZonesKeys), lists:usort(maps:keys(Zones))),
     ?assertEqual(
-        emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed]),
+        emqx_config:get_zone_conf(default, [mqtt, max_qos_allowed]),
         emqx_utils_maps:deep_get([<<"mqtt">>, <<"max_qos_allowed">>], Zones)
     ),
+    ?assertError(
+        {config_not_found, [zones, no_default, mqtt, max_qos_allowed]},
+        emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed])
+    ),
+
     NewZones1 = emqx_utils_maps:deep_put([<<"mqtt">>, <<"max_qos_allowed">>], Zones, 1),
     NewZones2 = emqx_utils_maps:deep_remove([<<"mqtt">>, <<"peer_cert_as_clientid">>], NewZones1),
     {ok, #{<<"mqtt">> := Res}} = update_global_zone(NewZones2),
@@ -151,7 +156,11 @@ t_global_zone(_Config) ->
         },
         Res
     ),
-    ?assertEqual(1, emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed])),
+    ?assertEqual(1, emqx_config:get_zone_conf(default, [mqtt, max_qos_allowed])),
+    ?assertError(
+        {config_not_found, [zones, no_default, mqtt, max_qos_allowed]},
+        emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed])
+    ),
     %% Make sure the override config is updated, and remove the default value.
     ?assertMatch(#{<<"max_qos_allowed">> := 1}, read_conf(<<"mqtt">>)),
 

+ 1 - 1
apps/emqx_prometheus/src/emqx_prometheus.app.src

@@ -2,7 +2,7 @@
 {application, emqx_prometheus, [
     {description, "Prometheus for EMQX"},
     % strict semver, bump manually!
-    {vsn, "5.0.11"},
+    {vsn, "5.0.12"},
     {modules, []},
     {registered, [emqx_prometheus_sup]},
     {applications, [kernel, stdlib, prometheus, emqx, emqx_management]},

+ 3 - 1
apps/emqx_prometheus/src/emqx_prometheus_schema.erl

@@ -59,7 +59,7 @@ fields("prometheus") ->
             ?HOCON(
                 list({string(), string()}),
                 #{
-                    default => #{},
+                    default => [],
                     required => false,
                     converter => fun ?MODULE:convert_headers/1,
                     desc => ?DESC(headers)
@@ -149,6 +149,8 @@ fields("prometheus") ->
 desc("prometheus") -> ?DESC(prometheus);
 desc(_) -> undefined.
 
+convert_headers(<<>>) ->
+    [];
 convert_headers(Headers) when is_map(Headers) ->
     maps:fold(
         fun(K, V, Acc) ->

+ 2 - 0
changes/ce/perf-10790.en.md

@@ -0,0 +1,2 @@
+Reducing overhead of reading configs per zone.
+