فهرست منبع

Merge branch 'master' into denny-update-local-override-conf

zhongwencool 3 سال پیش
والد
کامیت
0e633f87b3
69فایلهای تغییر یافته به همراه945 افزوده شده و 381 حذف شده
  1. 1 20
      .github/workflows/run_test_cases.yaml
  2. 14 0
      CHANGES-5.0.md
  3. 2 3
      Makefile
  4. 4 0
      README-CN.md
  5. 69 79
      README-RU.md
  6. 25 0
      README.md
  7. 1 1
      apps/emqx/include/emqx_release.hrl
  8. 1 0
      apps/emqx/priv/bpapi.versions
  9. 5 2
      apps/emqx/src/bpapi/emqx_bpapi.erl
  10. 2 0
      apps/emqx/src/bpapi/emqx_bpapi_trans.erl
  11. 1 1
      apps/emqx/src/emqx.app.src
  12. 21 3
      apps/emqx/src/emqx_access_control.erl
  13. 10 4
      apps/emqx/src/emqx_channel.erl
  14. 5 0
      apps/emqx/src/emqx_rpc.erl
  15. 1 1
      apps/emqx/src/emqx_schema.erl
  16. 25 12
      apps/emqx/src/emqx_shared_sub.erl
  17. 2 1
      apps/emqx/src/emqx_tls_lib.erl
  18. 50 0
      apps/emqx/src/proto/emqx_shared_sub_proto_v1.erl
  19. 1 1
      apps/emqx/test/emqx_bpapi_SUITE.erl
  20. 9 3
      apps/emqx/test/emqx_common_test_helpers.erl
  21. 1 0
      apps/emqx/test/emqx_proper_types.erl
  22. 53 0
      apps/emqx/test/emqx_shared_sub_SUITE.erl
  23. 4 0
      apps/emqx_authn/docker-ct
  24. 1 1
      apps/emqx_authn/src/emqx_authn.app.src
  25. 6 1
      apps/emqx_authn/src/simple_authn/emqx_authn_http.erl
  26. 27 14
      apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl
  27. 117 0
      apps/emqx_authn/test/emqx_authn_SUITE.erl
  28. 13 1
      apps/emqx_authn/test/emqx_authn_jwt_SUITE.erl
  29. 4 0
      apps/emqx_authz/docker-ct
  30. 1 1
      apps/emqx_authz/src/emqx_authz.app.src
  31. 7 18
      apps/emqx_authz/src/emqx_authz.erl
  32. 1 1
      apps/emqx_bridge/src/emqx_bridge.app.src
  33. 2 1
      apps/emqx_bridge/src/emqx_bridge_api.erl
  34. 1 2
      apps/emqx_conf/etc/emqx_conf.conf
  35. 29 16
      apps/emqx_conf/src/emqx_cluster_rpc.erl
  36. 1 1
      apps/emqx_conf/src/emqx_conf.app.src
  37. 1 9
      apps/emqx_conf/src/emqx_conf_schema.erl
  38. 4 0
      apps/emqx_connector/docker-ct
  39. 1 1
      apps/emqx_connector/src/emqx_connector.app.src
  40. 76 14
      apps/emqx_connector/src/emqx_connector_ssl.erl
  41. 1 3
      apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
  42. 1 1
      apps/emqx_dashboard/src/emqx_dashboard.app.src
  43. 1 1
      apps/emqx_dashboard/src/emqx_dashboard_middleware.erl
  44. 1 1
      apps/emqx_exhook/src/emqx_exhook.app.src
  45. 2 2
      apps/emqx_exhook/src/emqx_exhook_handler.erl
  46. 29 3
      apps/emqx_exhook/test/emqx_exhook_SUITE.erl
  47. 12 5
      apps/emqx_exhook/test/props/prop_exhook_hooks.erl
  48. 6 1
      apps/emqx_gateway/src/coap/emqx_coap_channel.erl
  49. 25 30
      apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl
  50. 1 1
      apps/emqx_gateway/src/emqx_gateway.app.src
  51. 54 26
      apps/emqx_gateway/test/emqx_coap_SUITE.erl
  52. 2 2
      apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl
  53. 1 1
      apps/emqx_management/src/emqx_management.app.src
  54. 0 32
      apps/emqx_management/src/emqx_management_schema.erl
  55. 23 15
      apps/emqx_management/src/emqx_mgmt_api_status.erl
  56. 7 3
      apps/emqx_plugin_libs/src/emqx_placeholder.erl
  57. 1 1
      apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src
  58. 15 0
      apps/emqx_plugin_libs/test/emqx_placeholder_SUITE.erl
  59. 11 6
      bin/emqx
  60. 1 0
      build
  61. 2 2
      deploy/charts/emqx/Chart.yaml
  62. 6 0
      mix.exs
  63. 4 5
      rebar.config.erl
  64. 8 6
      rel/emqx_vars
  65. 3 2
      scripts/apps-version-check.sh
  66. 112 0
      scripts/ct/run.sh
  67. 0 4
      scripts/docker-ct-apps
  68. 16 15
      scripts/find-apps.sh
  69. 1 1
      scripts/get-dashboard.sh

+ 1 - 20
.github/workflows/run_test_cases.yaml

@@ -121,27 +121,8 @@ jobs:
             PGSQL_TAG: 13
             REDIS_TAG: 6
           run: |
-            docker-compose \
-                -f .ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml \
-                -f .ci/docker-compose-file/docker-compose-mongo-single-tls.yaml \
-                -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
-                -f .ci/docker-compose-file/docker-compose-mysql-tls.yaml \
-                -f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
-                -f .ci/docker-compose-file/docker-compose-pgsql-tls.yaml \
-                -f .ci/docker-compose-file/docker-compose-redis-single-tcp.yaml \
-                -f .ci/docker-compose-file/docker-compose-redis-single-tls.yaml \
-                -f .ci/docker-compose-file/docker-compose-redis-sentinel-tcp.yaml \
-                -f .ci/docker-compose-file/docker-compose-redis-sentinel-tls.yaml \
-                -f .ci/docker-compose-file/docker-compose.yaml \
-                up -d --build
-
-          # produces <app-name>.coverdata
-        - name: run common test
-          working-directory: source
-          run: |
-            docker exec -i ${{ matrix.otp_release }} bash -c "git config --global --add safe.directory \"$GITHUB_WORKSPACE\" && make ${{ matrix.app_name }}-ct"
+            ./scripts/ct/run.sh --app ${{ matrix.app_name }}
         - uses: actions/upload-artifact@v1
-          if: matrix.otp_release == 'erlang24'
           with:
             name: coverdata
             path: source/_build/test/cover

+ 14 - 0
CHANGES-5.0.md

@@ -1,9 +1,21 @@
 # 5.0.8
 
+## Bug fixes
+
+* Fix exhook `client.authorize` never being execauted. [#8780](https://github.com/emqx/emqx/pull/8780)
+* Fix JWT plugin don't support non-integer timestamp claims. [#8867](https://github.com/emqx/emqx/pull/8867)
+* Avoid publishing will message when client fails to auhtenticate. [#8887](https://github.com/emqx/emqx/pull/8887)
+* Speed up dispatching of shared subscription messages in a cluster [#8893](https://github.com/emqx/emqx/pull/8893)
+* Fix the extra / prefix when CoAP gateway parsing client topics. [#8658](https://github.com/emqx/emqx/pull/8658)
+* Speed up updating the configuration, When some nodes in the cluster are down. [#8857](https://github.com/emqx/emqx/pull/8857)
+
 ## Enhancements
 
+* Print a warning message when boot with the default (insecure) Erlang cookie. [#8905](https://github.com/emqx/emqx/pull/8905)
 * Change the `/gateway` API path to plural form. [#8823](https://github.com/emqx/emqx/pull/8823)
 * Don't allow updating config items when they already exist in `local-override.conf`. [#8851](https://github.com/emqx/emqx/pull/8851)
+* Remove `node.etc_dir` from emqx.conf, because it is never used.
+  Also allow user to customize the logging directory [#8892](https://github.com/emqx/emqx/pull/8892)
 
 # 5.0.7
 
@@ -12,6 +24,7 @@
 * Remove `will_msg` (not used) field from the client API. [#8721](https://github.com/emqx/emqx/pull/8721)
 * Fix `$queue` topic name error in management API return. [#8728](https://github.com/emqx/emqx/pull/8728)
 * Fix race condition which may cause `client.connected` and `client.disconnected` out of order. [#8625](https://github.com/emqx/emqx/pull/8625)
+* Fix quic listener default idle timeout's type. [#8826](https://github.com/emqx/emqx/pull/8826)
 
 ## Enhancements
 
@@ -33,6 +46,7 @@
 
 ## Enhancements
 
+* Add `bootstrap_users_file` configuration to add default Dashboard username list, which is only added when EMQX is first started.
 * The license is now copied to all nodes in the cluster when it's reloaded. [#8598](https://github.com/emqx/emqx/pull/8598)
 * Added a HTTP API to manage licenses. [#8610](https://github.com/emqx/emqx/pull/8610)
 * Updated `/nodes` API node_status from `Running/Stopped` to `running/stopped`. [#8642](https://github.com/emqx/emqx/pull/8642)

+ 2 - 3
Makefile

@@ -6,7 +6,7 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/5.0-17:1.13.4-24.2.1-1-d
 export EMQX_DEFAULT_RUNNER = debian:11-slim
 export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
 export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh)
-export EMQX_DASHBOARD_VERSION ?= v1.0.7
+export EMQX_DASHBOARD_VERSION ?= v1.0.8
 export EMQX_EE_DASHBOARD_VERSION ?= e1.0.0
 export EMQX_REL_FORM ?= tgz
 export QUICER_DOWNLOAD_FROM_RELEASE = 1
@@ -80,7 +80,6 @@ static_checks:
 
 APPS=$(shell $(SCRIPTS)/find-apps.sh)
 
-## app/name-ct targets are intended for local tests hence cover is not enabled
 .PHONY: $(APPS:%=%-ct)
 define gen-app-ct-target
 $1-ct: $(REBAR)
@@ -132,7 +131,7 @@ $(REL_PROFILES:%=%): $(COMMON_DEPS)
 clean: $(PROFILES:%=clean-%)
 $(PROFILES:%=clean-%):
 	@if [ -d _build/$(@:clean-%=%) ]; then \
-		rm rebar.lock \
+		rm -f rebar.lock; \
 		rm -rf _build/$(@:clean-%=%)/rel; \
 		$(FIND) _build/$(@:clean-%=%) -name '*.beam' -o -name '*.so' -o -name '*.app' -o -name '*.appup' -o -name '*.o' -o -name '*.d' -type f | xargs rm -f; \
 		$(FIND) _build/$(@:clean-%=%) -type l -delete; \

+ 4 - 0
README-CN.md

@@ -40,6 +40,10 @@ docker run -d --name emqx-ee -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084
 
 接下来请参考 [入门指南](https://www.emqx.io/docs/zh/v5.0/getting-started/getting-started.html#启动-emqx) 开启您的 EMQX 之旅。
 
+#### 在 Kubernetes 上运行 EMQX 集群
+
+请参考 [EMQX Operator 文档](https://github.com/emqx/emqx-operator/blob/main/docs/zh_CN/getting-started/getting-started.md)。
+
 #### 更多安装方式
 
 您可以从 [www.emqx.io/zh/downloads](https://www.emqx.io/zh/downloads) 下载不同格式的 EMQX 安装包进行手动安装。

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 69 - 79
README-RU.md


+ 25 - 0
README.md

@@ -41,6 +41,10 @@ docker run -d --name emqx-ee -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084
 
 Next, please follow the [getting started guide](https://www.emqx.io/docs/en/v5.0/getting-started/getting-started.html#start-emqx) to tour the EMQX features.
 
+#### Run EMQX cluster on kubernetes
+
+For details: [EMQX Operator](https://github.com/emqx/emqx-operator/blob/main/docs/en_US/getting-started/getting-started.md).
+
 #### More installation options
 
 If you prefer to install and manage EMQX yourself, you can download the latest version from [www.emqx.io/downloads](https://www.emqx.io/downloads).
@@ -106,6 +110,27 @@ make
 _build/emqx/rel/emqx/bin/emqx console
 ```
 
+### Building on Apple silicon (M1, M2)
+
+Homebrew on Apple silicon [changed default location of it's home directory](https://github.com/Homebrew/brew/issues/9177) from `/usr/local` to `/opt/homebrew` and as a result a few things broke in the process.
+
+Concerning EMQX, when you install `unixodbc` package (one of the dependencies) via Homebrew, and build Erlang/OTP  with [kerl](https://github.com/kerl/kerl), kerl will not be able to find `unixodbc`.
+
+Here is how to solve it:
+
+```bash
+brew install unixodbc kerl
+sudo ln -s $(realpath $(brew --prefix unixodbc)) /usr/local/odbc
+export CC="/usr/bin/gcc -I$(brew --prefix unixodbc)/include"
+export LDFLAGS="-L$(brew --prefix unixodbc)/lib"
+kerl build 24.3
+mkdir ~/.kerl/installations
+kerl install 24.3 ~/.kerl/installations/24.3
+. ~/.kerl/installations/24.3/activate
+```
+
+Then you can proceed with `make`.
+
 ## License
 
 See [LICENSE](./LICENSE).

+ 1 - 1
apps/emqx/include/emqx_release.hrl

@@ -32,7 +32,7 @@
 %% `apps/emqx/src/bpapi/README.md'
 
 %% Community edition
--define(EMQX_RELEASE_CE, "5.0.6").
+-define(EMQX_RELEASE_CE, "5.0.7").
 
 %% Enterprise edition
 -define(EMQX_RELEASE_EE, "5.0.0-alpha.1").

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -26,6 +26,7 @@
 {emqx_resource,1}.
 {emqx_retainer,1}.
 {emqx_rule_engine,1}.
+{emqx_shared_sub,1}.
 {emqx_slow_subs,1}.
 {emqx_statsd,1}.
 {emqx_telemetry,1}.

+ 5 - 2
apps/emqx/src/bpapi/emqx_bpapi.erl

@@ -69,9 +69,12 @@ start() ->
     announce(emqx).
 
 %% @doc Get maximum version of the backplane API supported by the node
--spec supported_version(node(), api()) -> api_version().
+-spec supported_version(node(), api()) -> api_version() | undefined.
 supported_version(Node, API) ->
-    ets:lookup_element(?TAB, {Node, API}, #?TAB.version).
+    case ets:lookup(?TAB, {Node, API}) of
+        [#?TAB{version = V}] -> V;
+        [] -> undefined
+    end.
 
 %% @doc Get maximum version of the backplane API supported by the
 %% entire cluster

+ 2 - 0
apps/emqx/src/bpapi/emqx_bpapi_trans.erl

@@ -153,6 +153,8 @@ extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Node, M, F, A]) ->
     {call_or_cast(CallOrCast), M, F, A};
 extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Tag, _Node, M, F, A]) ->
     {call_or_cast(CallOrCast), M, F, A};
+extract_mfa(?BACKEND(emqx_rpc, call), [_Tag, _Node, M, F, A, _Timeout]) ->
+    {call_or_cast(call), M, F, A};
 %% (e)rpc:
 extract_mfa(?BACKEND(rpc, multicall), [M, F, A]) ->
     {call_or_cast(multicall), M, F, A};

+ 1 - 1
apps/emqx/src/emqx.app.src

@@ -3,7 +3,7 @@
     {id, "emqx"},
     {description, "EMQX Core"},
     % strict semver, bump manually!
-    {vsn, "5.0.7"},
+    {vsn, "5.0.8"},
     {modules, []},
     {registered, []},
     {applications, [

+ 21 - 3
apps/emqx/src/emqx_access_control.erl

@@ -17,6 +17,7 @@
 -module(emqx_access_control).
 
 -include("emqx.hrl").
+-include("logger.hrl").
 
 -export([
     authenticate/1,
@@ -70,9 +71,26 @@ check_authorization_cache(ClientInfo, PubSub, Topic) ->
 
 do_authorize(ClientInfo, PubSub, Topic) ->
     NoMatch = emqx:get_config([authorization, no_match], allow),
-    case run_hooks('client.authorize', [ClientInfo, PubSub, Topic], NoMatch) of
-        allow -> allow;
-        _Other -> deny
+    Default = #{result => NoMatch, from => default},
+    case run_hooks('client.authorize', [ClientInfo, PubSub, Topic], Default) of
+        AuthzResult = #{result := Result} when Result == allow; Result == deny ->
+            From = maps:get(from, AuthzResult, unknown),
+            emqx:run_hook(
+                'client.check_authz_complete',
+                [ClientInfo, PubSub, Topic, Result, From]
+            ),
+            Result;
+        Other ->
+            ?SLOG(error, #{
+                msg => "unknown_authorization_return_format",
+                expected_example => "#{result => allow, from => default}",
+                got => Other
+            }),
+            emqx:run_hook(
+                'client.check_authz_complete',
+                [ClientInfo, PubSub, Topic, deny, unknown_return_format]
+            ),
+            deny
     end.
 
 -compile({inline, [run_hooks/3]}).

+ 10 - 4
apps/emqx/src/emqx_channel.erl

@@ -22,6 +22,8 @@
 -include("logger.hrl").
 -include("types.hrl").
 
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
 -ifdef(TEST).
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -1423,7 +1425,8 @@ interval(will_timer, #channel{will_msg = WillMsg}) ->
 %%--------------------------------------------------------------------
 
 -spec terminate(any(), channel()) -> ok.
-terminate(_, #channel{conn_state = idle}) ->
+terminate(_, #channel{conn_state = idle} = _Channel) ->
+    ?tp(channel_terminated, #{channel => _Channel}),
     ok;
 terminate(normal, Channel) ->
     run_terminate_hook(normal, Channel);
@@ -1431,7 +1434,8 @@ terminate({shutdown, kicked}, Channel) ->
     run_terminate_hook(kicked, Channel);
 terminate({shutdown, Reason}, Channel) when
     Reason =:= discarded;
-    Reason =:= takenover
+    Reason =:= takenover;
+    Reason =:= not_authorized
 ->
     run_terminate_hook(Reason, Channel);
 terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
@@ -1452,9 +1456,11 @@ persist_if_session(#channel{session = Session} = Channel) ->
             ok
     end.
 
-run_terminate_hook(_Reason, #channel{session = undefined}) ->
+run_terminate_hook(_Reason, #channel{session = undefined} = _Channel) ->
+    ?tp(channel_terminated, #{channel => _Channel}),
     ok;
-run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) ->
+run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session} = _Channel) ->
+    ?tp(channel_terminated, #{channel => _Channel}),
     emqx_session:terminate(ClientInfo, Reason, Session).
 
 %%--------------------------------------------------------------------

+ 5 - 0
apps/emqx/src/emqx_rpc.erl

@@ -22,6 +22,7 @@
 -export([
     call/4,
     call/5,
+    call/6,
     cast/4,
     cast/5,
     multicall/4,
@@ -78,6 +79,10 @@ call(Node, Mod, Fun, Args) ->
 call(Key, Node, Mod, Fun, Args) ->
     filter_result(gen_rpc:call(rpc_node({Key, Node}), Mod, Fun, Args)).
 
+-spec call(term(), node(), module(), atom(), list(), timeout()) -> call_result().
+call(Key, Node, Mod, Fun, Args, Timeout) ->
+    filter_result(gen_rpc:call(rpc_node({Key, Node}), Mod, Fun, Args, Timeout)).
+
 -spec multicall([node()], module(), atom(), list()) -> multicall_result().
 multicall(Nodes, Mod, Fun, Args) ->
     gen_rpc:multicall(rpc_nodes(Nodes), Mod, Fun, Args).

+ 1 - 1
apps/emqx/src/emqx_schema.erl

@@ -869,7 +869,7 @@ fields("mqtt_quic_listener") ->
             sc(
                 duration_ms(),
                 #{
-                    default => "0",
+                    default => 0,
                     desc => ?DESC(fields_mqtt_quic_listener_idle_timeout)
                 }
             )},

+ 25 - 12
apps/emqx/src/emqx_shared_sub.erl

@@ -38,7 +38,8 @@
 
 -export([
     dispatch/3,
-    dispatch/4
+    dispatch/4,
+    do_dispatch_with_ack/4
 ]).
 
 -export([
@@ -170,30 +171,31 @@ do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
 %% return either 'ok' (when everything is fine) or 'error'
 do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
     %% For QoS 0 message, send it as regular dispatch
-    SubPid ! {deliver, Topic, Msg},
-    ok;
+    send(SubPid, Topic, {deliver, Topic, Msg});
 do_dispatch(SubPid, _Group, Topic, Msg, retry) ->
     %% Retry implies all subscribers nack:ed, send again without ack
-    SubPid ! {deliver, Topic, Msg},
-    ok;
+    send(SubPid, Topic, {deliver, Topic, Msg});
 do_dispatch(SubPid, Group, Topic, Msg, fresh) ->
     case ack_enabled() of
         true ->
-            dispatch_with_ack(SubPid, Group, Topic, Msg);
+            %% FIXME: replace with `emqx_shared_sub_proto:dispatch_with_ack' in 5.2
+            do_dispatch_with_ack(SubPid, Group, Topic, Msg);
         false ->
-            SubPid ! {deliver, Topic, Msg},
-            ok
+            send(SubPid, Topic, {deliver, Topic, Msg})
     end.
 
-dispatch_with_ack(SubPid, Group, Topic, Msg) ->
+-spec do_dispatch_with_ack(pid(), emqx_types:group(), emqx_types:topic(), emqx_types:message()) ->
+    ok | {error, _}.
+do_dispatch_with_ack(SubPid, Group, Topic, Msg) ->
     %% For QoS 1/2 message, expect an ack
     Ref = erlang:monitor(process, SubPid),
     Sender = self(),
-    SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)},
+    %% FIXME: replace with regular send in 5.2
+    send(SubPid, Topic, {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)}),
     Timeout =
         case Msg#message.qos of
-            ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
-            ?QOS_2 -> infinity
+            ?QOS_2 -> infinity;
+            _ -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS)
         end,
     try
         receive
@@ -412,6 +414,17 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
+send(Pid, Topic, Msg) ->
+    Node = node(Pid),
+    _ =
+        case Node =:= node() of
+            true ->
+                Pid ! Msg;
+            false ->
+                emqx_shared_sub_proto_v1:send(Node, Pid, Topic, Msg)
+        end,
+    ok.
+
 maybe_insert_round_robin_count({Group, _Topic} = GroupTopic) ->
     strategy(Group) =:= round_robin_per_group andalso
         ets:insert(?SHARED_SUBS_ROUND_ROBIN_COUNTER, {GroupTopic, 0}),

+ 2 - 1
apps/emqx/src/emqx_tls_lib.erl

@@ -145,7 +145,8 @@ all_ciphers_set_cached() ->
     case persistent_term:get(?FUNCTION_NAME, false) of
         false ->
             S = sets:from_list(all_ciphers()),
-            persistent_term:put(?FUNCTION_NAME, S);
+            persistent_term:put(?FUNCTION_NAME, S),
+            S;
         Set ->
             Set
     end.

+ 50 - 0
apps/emqx/src/proto/emqx_shared_sub_proto_v1.erl

@@ -0,0 +1,50 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_shared_sub_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    send/4,
+    dispatch_with_ack/5
+]).
+
+-include("bpapi.hrl").
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+introduced_in() ->
+    "5.0.8".
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec send(node(), pid(), emqx_types:topic(), term()) -> true.
+send(Node, Pid, Topic, Msg) ->
+    emqx_rpc:cast(Topic, Node, erlang, send, [Pid, Msg]).
+
+-spec dispatch_with_ack(
+    pid(), emqx_types:group(), emqx_types:topic(), emqx_types:message(), timeout()
+) ->
+    ok | {error, _}.
+dispatch_with_ack(Pid, Group, Topic, Msg, Timeout) ->
+    emqx_rpc:call(
+        Topic, node(Pid), emqx_shared_sub, do_dispatch_with_ack, [Pid, Group, Topic, Msg], Timeout
+    ).

+ 1 - 1
apps/emqx/test/emqx_bpapi_SUITE.erl

@@ -40,7 +40,7 @@ end_per_suite(_Config) ->
 t_max_supported_version(_Config) ->
     ?assertMatch(3, emqx_bpapi:supported_version('fake-node2@localhost', api2)),
     ?assertMatch(2, emqx_bpapi:supported_version(api2)),
-    ?assertError(_, emqx_bpapi:supported_version('fake-node2@localhost', nonexistent_api)),
+    ?assertMatch(undefined, emqx_bpapi:supported_version('fake-node2@localhost', nonexistent_api)),
     ?assertError(_, emqx_bpapi:supported_version(nonexistent_api)).
 
 t_announce(Config) ->

+ 9 - 3
apps/emqx/test/emqx_common_test_helpers.erl

@@ -181,11 +181,15 @@ start_app(App, Handler) ->
 app_conf_file(emqx_conf) -> "emqx.conf.all";
 app_conf_file(App) -> atom_to_list(App) ++ ".conf".
 
-%% TODO: get rid of cuttlefish
 app_schema(App) ->
     Mod = list_to_atom(atom_to_list(App) ++ "_schema"),
-    true = is_list(Mod:roots()),
-    Mod.
+    try
+        true = is_list(Mod:roots()),
+        Mod
+    catch
+        error:undef ->
+            no_schema
+    end.
 
 mustache_vars(App) ->
     [
@@ -221,6 +225,8 @@ render_config_file(ConfigFile, Vars0) ->
     ok = file:write_file(NewName, Targ),
     NewName.
 
+read_schema_configs(no_schema, _ConfigFile) ->
+    ok;
 read_schema_configs(Schema, ConfigFile) ->
     NewConfig = generate_config(Schema, ConfigFile),
     lists:foreach(

+ 1 - 0
apps/emqx/test/emqx_proper_types.erl

@@ -69,6 +69,7 @@ conninfo() ->
         {conn_props, properties()},
         {connected, boolean()},
         {connected_at, timestamp()},
+        {disconnected_at, timestamp()},
         {keepalive, range(0, 16#ffff)},
         {receive_maximum, non_neg_integer()},
         {expiry_interval, non_neg_integer()}

+ 53 - 0
apps/emqx/test/emqx_shared_sub_SUITE.erl

@@ -567,6 +567,59 @@ t_local(_) ->
     ?assertNotEqual(UsedSubPid1, UsedSubPid2),
     ok.
 
+t_remote(_) ->
+    %% This testcase verifies dispatching of shared messages to the remote nodes via backplane API.
+    %%
+    %% In this testcase we start two EMQX nodes: local and remote.
+    %% A subscriber connects to the remote node.
+    %% A publisher connects to the local node and sends three messages with different QoS.
+    %% The test verifies that the remote side received all three messages.
+    ok = ensure_config(sticky, true),
+    GroupConfig = #{
+        <<"local_group">> => local,
+        <<"round_robin_group">> => round_robin,
+        <<"sticky_group">> => sticky
+    },
+
+    Node = start_slave('remote_shared_sub_testtesttest', 21999),
+    ok = ensure_group_config(GroupConfig),
+    ok = ensure_group_config(Node, GroupConfig),
+
+    Topic = <<"foo/bar">>,
+    ClientIdLocal = <<"ClientId1">>,
+    ClientIdRemote = <<"ClientId2">>,
+
+    {ok, ConnPidLocal} = emqtt:start_link([{clientid, ClientIdLocal}]),
+    {ok, ConnPidRemote} = emqtt:start_link([{clientid, ClientIdRemote}, {port, 21999}]),
+
+    try
+        {ok, ClientPidLocal} = emqtt:connect(ConnPidLocal),
+        {ok, ClientPidRemote} = emqtt:connect(ConnPidRemote),
+
+        emqtt:subscribe(ConnPidRemote, {<<"$share/remote_group/", Topic/binary>>, 0}),
+
+        ct:sleep(100),
+
+        Message1 = emqx_message:make(ClientPidLocal, 0, Topic, <<"hello1">>),
+        Message2 = emqx_message:make(ClientPidLocal, 1, Topic, <<"hello2">>),
+        Message3 = emqx_message:make(ClientPidLocal, 2, Topic, <<"hello3">>),
+
+        emqx:publish(Message1),
+        {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPidRemote]),
+
+        emqx:publish(Message2),
+        {true, UsedSubPid1} = last_message(<<"hello2">>, [ConnPidRemote]),
+
+        emqx:publish(Message3),
+        {true, UsedSubPid1} = last_message(<<"hello3">>, [ConnPidRemote]),
+
+        ok
+    after
+        emqtt:stop(ConnPidLocal),
+        emqtt:stop(ConnPidRemote),
+        stop_slave(Node)
+    end.
+
 t_local_fallback(_) ->
     ok = ensure_group_config(#{
         <<"local_group">> => local,

+ 4 - 0
apps/emqx_authn/docker-ct

@@ -0,0 +1,4 @@
+mongo
+redis
+mysql
+pgsql

+ 1 - 1
apps/emqx_authn/src/emqx_authn.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_authn, [
     {description, "EMQX Authentication"},
-    {vsn, "0.1.5"},
+    {vsn, "0.1.6"},
     {modules, []},
     {registered, [emqx_authn_sup, emqx_authn_registry]},
     {applications, [kernel, stdlib, emqx_resource, ehttpc, epgsql, mysql, jose]},

+ 6 - 1
apps/emqx_authn/src/simple_authn/emqx_authn_http.erl

@@ -361,7 +361,12 @@ handle_response(Headers, Body) ->
                 _ ->
                     ignore
             end;
-        {error, _Reason} ->
+        {error, Reason} ->
+            ?TRACE_AUTHN_PROVIDER(
+                error,
+                "parse_http_response_failed",
+                #{content_type => ContentType, body => Body, reason => Reason}
+            ),
             ignore
     end.
 

+ 27 - 14
apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl

@@ -383,7 +383,7 @@ do_verify(JWT, [JWK | More], VerifyClaims) ->
     try jose_jws:verify(JWK, JWT) of
         {true, Payload, _JWT} ->
             Claims0 = emqx_json:decode(Payload, [return_maps]),
-            Claims = try_convert_to_int(Claims0, [<<"exp">>, <<"iat">>, <<"nbf">>]),
+            Claims = try_convert_to_num(Claims0, [<<"exp">>, <<"iat">>, <<"nbf">>]),
             case verify_claims(Claims, VerifyClaims) of
                 ok ->
                     {ok, Claims};
@@ -403,37 +403,37 @@ verify_claims(Claims, VerifyClaims0) ->
     VerifyClaims =
         [
             {<<"exp">>, fun(ExpireTime) ->
-                is_integer(ExpireTime) andalso Now < ExpireTime
+                is_number(ExpireTime) andalso Now < ExpireTime
             end},
             {<<"iat">>, fun(IssueAt) ->
-                is_integer(IssueAt) andalso IssueAt =< Now
+                is_number(IssueAt) andalso IssueAt =< Now
             end},
             {<<"nbf">>, fun(NotBefore) ->
-                is_integer(NotBefore) andalso NotBefore =< Now
+                is_number(NotBefore) andalso NotBefore =< Now
             end}
         ] ++ VerifyClaims0,
     do_verify_claims(Claims, VerifyClaims).
 
-try_convert_to_int(Claims, [Name | Names]) ->
+try_convert_to_num(Claims, [Name | Names]) ->
     case Claims of
         #{Name := Value} ->
             case Value of
-                Int when is_integer(Int) ->
-                    try_convert_to_int(Claims#{Name => Int}, Names);
+                Int when is_number(Int) ->
+                    try_convert_to_num(Claims#{Name => Int}, Names);
                 Bin when is_binary(Bin) ->
-                    case string:to_integer(Bin) of
-                        {Int, <<>>} ->
-                            try_convert_to_int(Claims#{Name => Int}, Names);
+                    case binary_to_number(Bin) of
+                        {ok, Num} ->
+                            try_convert_to_num(Claims#{Name => Num}, Names);
                         _ ->
-                            try_convert_to_int(Claims, Names)
+                            try_convert_to_num(Claims, Names)
                     end;
                 _ ->
-                    try_convert_to_int(Claims, Names)
+                    try_convert_to_num(Claims, Names)
             end;
         _ ->
-            try_convert_to_int(Claims, Names)
+            try_convert_to_num(Claims, Names)
     end;
-try_convert_to_int(Claims, []) ->
+try_convert_to_num(Claims, []) ->
     Claims.
 
 do_verify_claims(_Claims, []) ->
@@ -519,3 +519,16 @@ to_binary(B) when is_binary(B) ->
     B.
 
 sc(Type, Meta) -> hoconsc:mk(Type, Meta).
+
+binary_to_number(Bin) ->
+    try
+        {ok, erlang:binary_to_integer(Bin)}
+    catch
+        _:_ ->
+            try
+                {ok, erlang:binary_to_float(Bin)}
+            catch
+                _:_ ->
+                    false
+            end
+    end.

+ 117 - 0
apps/emqx_authn/test/emqx_authn_SUITE.erl

@@ -0,0 +1,117 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_authn_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+
+%%=================================================================================
+%% CT boilerplate
+%%=================================================================================
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_Config) ->
+    ok.
+
+init_per_testcase(Case, Config) ->
+    ?MODULE:Case({init, Config}).
+
+end_per_testcase(Case, Config) ->
+    ?MODULE:Case({'end', Config}).
+
+%%=================================================================================
+%% Helpers fns
+%%=================================================================================
+
+%%=================================================================================
+%% Testcases
+%%=================================================================================
+
+t_will_message_connection_denied({init, Config}) ->
+    emqx_common_test_helpers:start_apps([emqx_conf, emqx_authn]),
+    mria:clear_table(emqx_authn_mnesia),
+    AuthnConfig = #{
+        <<"mechanism">> => <<"password_based">>,
+        <<"backend">> => <<"built_in_database">>,
+        <<"user_id_type">> => <<"clientid">>
+    },
+    Chain = 'mqtt:global',
+    emqx:update_config(
+        [authentication],
+        {create_authenticator, Chain, AuthnConfig}
+    ),
+    User = #{user_id => <<"subscriber">>, password => <<"p">>},
+    AuthenticatorID = <<"password_based:built_in_database">>,
+    {ok, _} = emqx_authentication:add_user(
+        Chain,
+        AuthenticatorID,
+        User
+    ),
+    Config;
+t_will_message_connection_denied({'end', _Config}) ->
+    emqx:update_config(
+        [authentication],
+        {delete_authenticator, 'mqtt:global', <<"password_based:built_in_database">>}
+    ),
+    emqx_common_test_helpers:stop_apps([emqx_authn, emqx_conf]),
+    mria:clear_table(emqx_authn_mnesia),
+    ok;
+t_will_message_connection_denied(Config) when is_list(Config) ->
+    {ok, Subscriber} = emqtt:start_link([
+        {clientid, <<"subscriber">>},
+        {password, <<"p">>}
+    ]),
+    {ok, _} = emqtt:connect(Subscriber),
+    {ok, _, [?RC_SUCCESS]} = emqtt:subscribe(Subscriber, <<"lwt">>),
+
+    process_flag(trap_exit, true),
+
+    {ok, Publisher} = emqtt:start_link([
+        {clientid, <<"publisher">>},
+        {will_topic, <<"lwt">>},
+        {will_payload, <<"should not be published">>}
+    ]),
+    snabbkaffe:start_trace(),
+    ?wait_async_action(
+        {error, _} = emqtt:connect(Publisher),
+        #{?snk_kind := channel_terminated}
+    ),
+    snabbkaffe:stop(),
+
+    receive
+        {publish, #{
+            topic := <<"lwt">>,
+            payload := <<"should not be published">>
+        }} ->
+            ct:fail("should not publish will message")
+    after 0 ->
+        ok
+    end,
+
+    ok.

+ 13 - 1
apps/emqx_authn/test/emqx_authn_jwt_SUITE.erl

@@ -408,7 +408,19 @@ t_verify_claims(_) ->
     },
     ?assertMatch(
         {ok, #{is_superuser := false}}, emqx_authn_jwt:authenticate(Credential4, State1)
-    ).
+    ),
+
+    Payload5 = #{
+        <<"username">> => <<"myuser">>,
+        <<"foo">> => <<"myuser">>,
+        <<"exp">> => erlang:system_time(second) + 10.5
+    },
+    JWS5 = generate_jws('hmac-based', Payload5, Secret),
+    Credential5 = #{
+        username => <<"myuser">>,
+        password => JWS5
+    },
+    ?assertMatch({ok, #{is_superuser := false}}, emqx_authn_jwt:authenticate(Credential5, State1)).
 
 t_jwt_not_allow_empty_claim_name(_) ->
     Request = #{

+ 4 - 0
apps/emqx_authz/docker-ct

@@ -0,0 +1,4 @@
+mongo
+redis
+mysql
+pgsql

+ 1 - 1
apps/emqx_authz/src/emqx_authz.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_authz, [
     {description, "An OTP application"},
-    {vsn, "0.1.4"},
+    {vsn, "0.1.5"},
     {registered, []},
     {mod, {emqx_authz_app, []}},
     {applications, [

+ 7 - 18
apps/emqx_authz/src/emqx_authz.erl

@@ -49,7 +49,8 @@
 
 -type default_result() :: allow | deny.
 
--type authz_result() :: {stop, allow} | {ok, deny}.
+-type authz_result_value() :: #{result := allow | deny, from => _}.
+-type authz_result() :: {stop, authz_result_value()} | {ok, authz_result_value()} | ignore.
 
 -type sources() :: [source()].
 
@@ -319,7 +320,7 @@ authorize(
                 is_superuser => true
             }),
             emqx_metrics:inc(?METRIC_SUPERUSER),
-            {stop, allow};
+            {stop, #{result => allow, from => superuser}};
         false ->
             authorize_non_superuser(Client, PubSub, Topic, DefaultResult, Sources)
     end.
@@ -331,15 +332,11 @@ authorize_non_superuser(
     } = Client,
     PubSub,
     Topic,
-    DefaultResult,
+    _DefaultResult,
     Sources
 ) ->
     case do_authorize(Client, PubSub, Topic, sources_with_defaults(Sources)) of
         {{matched, allow}, AuthzSource} ->
-            emqx:run_hook(
-                'client.check_authz_complete',
-                [Client, PubSub, Topic, allow, AuthzSource]
-            ),
             log_allowed(#{
                 username => Username,
                 ipaddr => IpAddress,
@@ -348,12 +345,8 @@ authorize_non_superuser(
             }),
             emqx_metrics_worker:inc(authz_metrics, AuthzSource, allow),
             emqx_metrics:inc(?METRIC_ALLOW),
-            {stop, allow};
+            {stop, #{result => allow, from => AuthzSource}};
         {{matched, deny}, AuthzSource} ->
-            emqx:run_hook(
-                'client.check_authz_complete',
-                [Client, PubSub, Topic, deny, AuthzSource]
-            ),
             ?SLOG(warning, #{
                 msg => "authorization_permission_denied",
                 username => Username,
@@ -363,12 +356,8 @@ authorize_non_superuser(
             }),
             emqx_metrics_worker:inc(authz_metrics, AuthzSource, deny),
             emqx_metrics:inc(?METRIC_DENY),
-            {stop, deny};
+            {stop, #{result => deny, from => AuthzSource}};
         nomatch ->
-            emqx:run_hook(
-                'client.check_authz_complete',
-                [Client, PubSub, Topic, DefaultResult, default]
-            ),
             ?SLOG(info, #{
                 msg => "authorization_failed_nomatch",
                 username => Username,
@@ -377,7 +366,7 @@ authorize_non_superuser(
                 reason => "no-match rule"
             }),
             emqx_metrics:inc(?METRIC_NOMATCH),
-            {stop, DefaultResult}
+            ignore
     end.
 
 log_allowed(Meta) ->

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge, [
     {description, "An OTP application"},
-    {vsn, "0.1.2"},
+    {vsn, "0.1.3"},
     {registered, []},
     {mod, {emqx_bridge_app, []}},
     {applications, [

+ 2 - 1
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -584,9 +584,10 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) ->
 
 format_bridge_info([FirstBridge | _] = Bridges) ->
     Res = maps:remove(node, FirstBridge),
+    NRes = emqx_connector_ssl:drop_invalid_certs(Res),
     NodeStatus = collect_status(Bridges),
     NodeMetrics = collect_metrics(Bridges),
-    Res#{
+    NRes#{
         status => aggregate_status(NodeStatus),
         node_status => NodeStatus,
         metrics => aggregate_metrics(NodeMetrics),

+ 1 - 2
apps/emqx_conf/etc/emqx_conf.conf

@@ -10,9 +10,8 @@
 
 node {
   name = "emqx@127.0.0.1"
-  cookie = emqxsecretcookie
+  cookie = "{{ emqx_default_erlang_cookie }}"
   data_dir = "{{ platform_data_dir }}"
-  etc_dir = "{{ platform_etc_dir }}"
 }
 
 log {

+ 29 - 16
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -72,6 +72,7 @@
 -define(TIMEOUT, timer:minutes(1)).
 -define(APPLY_KIND_REPLICATE, replicate).
 -define(APPLY_KIND_INITIATE, initiate).
+-define(IS_STATUS(_A_), (_A_ =:= peers_lagging orelse _A_ =:= stopped_nodes)).
 
 -type tnx_id() :: pos_integer().
 
@@ -123,13 +124,13 @@ start_link(Node, Name, RetryMs) ->
 %% the result is expected to be `ok | {ok, _}' to indicate success,
 %% and `{error, _}' to indicate failure.
 %%
-%% The excpetion of the MFA evaluation is captured and translated
+%% The exception of the MFA evaluation is captured and translated
 %% into an `{error, _}' tuple.
 %% This call tries to wait for all peer nodes to be in-sync before
 %% returning the result.
 %%
 %% In case of partial success, an `error' level log is emitted
-%% but the initial localy apply result is returned.
+%% but the initial local apply result is returned.
 -spec multicall(module(), atom(), list()) -> term().
 multicall(M, F, A) ->
     multicall(M, F, A, all, timer:minutes(2)).
@@ -141,11 +142,12 @@ multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse Req
             Result;
         {init_failure, Error} ->
             Error;
-        {peers_lagging, TnxId, Res, Nodes} ->
+        {Status, TnxId, Res, Nodes} when ?IS_STATUS(Status) ->
             %% The init MFA return ok, but some other nodes failed.
             ?SLOG(error, #{
                 msg => "cluster_rpc_peers_lagging",
-                lagging_nodes => Nodes,
+                status => Status,
+                nodes => Nodes,
                 tnx_id => TnxId
             }),
             Res
@@ -193,9 +195,9 @@ do_multicall(M, F, A, RequiredSyncs, Timeout) ->
             InitRes;
         {init_failure, Error0} ->
             {init_failure, Error0};
-        {peers_lagging, Nodes} ->
+        {Status, Nodes} when ?IS_STATUS(Status) ->
             {ok, TnxId0, MFARes} = InitRes,
-            {peers_lagging, TnxId0, MFARes, Nodes}
+            {Status, TnxId0, MFARes, Nodes}
     end.
 
 -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
@@ -509,14 +511,18 @@ do_alarm(Fun, Res, #{tnx_id := Id} = Meta) ->
     emqx_alarm:Fun(cluster_rpc_apply_failed, Meta#{result => ?TO_BIN(Res)}, AlarmMsg).
 
 wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
-    case lagging_node(TnxId) of
+    Lagging = lagging_nodes(TnxId),
+    Stopped = stopped_nodes(),
+    case Lagging -- Stopped of
+        [] when Stopped =:= [] ->
+            ok;
+        [] ->
+            {stopped_nodes, Stopped};
         [_ | _] when Remain > 0 ->
             ok = timer:sleep(Delay),
             wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay);
-        [] ->
-            ok;
-        Nodes ->
-            {peers_lagging, Nodes}
+        [_ | _] ->
+            {peers_lagging, Lagging}
     end.
 
 wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) ->
@@ -527,14 +533,18 @@ wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) ->
         false when Remain > 0 ->
             wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay);
         false ->
-            case lagging_node(TnxId) of
-                %% All commit but The succeedNum > length(nodes()).
-                [] -> ok;
-                Nodes -> {peers_lagging, Nodes}
+            case lagging_nodes(TnxId) of
+                [] ->
+                    ok;
+                Lagging ->
+                    case stopped_nodes() of
+                        [] -> {peers_lagging, Lagging};
+                        Stopped -> {stopped_nodes, Stopped}
+                    end
             end
     end.
 
-lagging_node(TnxId) ->
+lagging_nodes(TnxId) ->
     {atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['<', TnxId]),
     Nodes.
 
@@ -548,6 +558,9 @@ commit_status_trans(Operator, TnxId) ->
     Result = '$2',
     mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]).
 
+stopped_nodes() ->
+    ekka_cluster:info(stopped_nodes).
+
 get_retry_ms() ->
     emqx_conf:get([node, cluster_call, retry_interval], timer:minutes(1)).
 

+ 1 - 1
apps/emqx_conf/src/emqx_conf.app.src

@@ -1,6 +1,6 @@
 {application, emqx_conf, [
     {description, "EMQX configuration management"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {registered, []},
     {mod, {emqx_conf_app, []}},
     {applications, [kernel, stdlib]},

+ 1 - 9
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -400,7 +400,7 @@ fields("node") ->
                 string(),
                 #{
                     mapping => "vm_args.-setcookie",
-                    default => "emqxsecretcookie",
+                    required => true,
                     'readOnly' => true,
                     sensitive => true,
                     desc => ?DESC(node_cookie)
@@ -536,14 +536,6 @@ fields("node") ->
                     desc => ?DESC(node_applications)
                 }
             )},
-        {"etc_dir",
-            sc(
-                string(),
-                #{
-                    desc => ?DESC(node_etc_dir),
-                    'readOnly' => true
-                }
-            )},
         {"cluster_call",
             sc(
                 ?R_REF("cluster_call"),

+ 4 - 0
apps/emqx_connector/docker-ct

@@ -0,0 +1,4 @@
+mongo
+redis
+mysql
+pgsql

+ 1 - 1
apps/emqx_connector/src/emqx_connector.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_connector, [
     {description, "An OTP application"},
-    {vsn, "0.1.4"},
+    {vsn, "0.1.5"},
     {registered, []},
     {mod, {emqx_connector_app, []}},
     {applications, [

+ 76 - 14
apps/emqx_connector/src/emqx_connector_ssl.erl

@@ -18,27 +18,89 @@
 
 -export([
     convert_certs/2,
+    drop_invalid_certs/1,
     clear_certs/2
 ]).
 
-convert_certs(RltvDir, NewConfig) ->
-    NewSSL = map_get_oneof([<<"ssl">>, ssl], NewConfig, undefined),
-    case emqx_tls_lib:ensure_ssl_files(RltvDir, NewSSL) of
-        {ok, NewSSL1} ->
-            {ok, new_ssl_config(NewConfig, NewSSL1)};
+%% TODO: rm `connector` case after `dev/ee5.0` merged into `master`.
+%% The `connector` config layer will be removed.
+%% for bridges with `connector` field. i.e. `mqtt_source` and `mqtt_sink`
+convert_certs(RltvDir, #{<<"connector">> := Connector} = Config) when
+    is_map(Connector)
+->
+    SSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined),
+    new_ssl_config(RltvDir, Config, SSL);
+convert_certs(RltvDir, #{connector := Connector} = Config) when
+    is_map(Connector)
+->
+    SSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined),
+    new_ssl_config(RltvDir, Config, SSL);
+%% for bridges without `connector` field. i.e. webhook
+convert_certs(RltvDir, #{<<"ssl">> := SSL} = Config) ->
+    new_ssl_config(RltvDir, Config, SSL);
+convert_certs(RltvDir, #{ssl := SSL} = Config) ->
+    new_ssl_config(RltvDir, Config, SSL);
+%% for bridges use connector name
+convert_certs(_RltvDir, Config) ->
+    {ok, Config}.
+
+clear_certs(RltvDir, #{<<"connector">> := Connector} = _Config) when
+    is_map(Connector)
+->
+    OldSSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined),
+    ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL);
+clear_certs(RltvDir, #{connector := Connector} = _Config) when
+    is_map(Connector)
+->
+    OldSSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined),
+    ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL);
+clear_certs(RltvDir, #{<<"ssl">> := OldSSL} = _Config) ->
+    ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL);
+clear_certs(RltvDir, #{ssl := OldSSL} = _Config) ->
+    ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL);
+clear_certs(_RltvDir, _) ->
+    ok.
+
+drop_invalid_certs(#{<<"connector">> := Connector} = Config) when
+    is_map(Connector)
+->
+    SSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined),
+    NewSSL = emqx_tls_lib:drop_invalid_certs(SSL),
+    new_ssl_config(Config, NewSSL);
+drop_invalid_certs(#{connector := Connector} = Config) when
+    is_map(Connector)
+->
+    SSL = map_get_oneof([<<"ssl">>, ssl], Connector, undefined),
+    NewSSL = emqx_tls_lib:drop_invalid_certs(SSL),
+    new_ssl_config(Config, NewSSL);
+drop_invalid_certs(#{<<"ssl">> := SSL} = Config) ->
+    NewSSL = emqx_tls_lib:drop_invalid_certs(SSL),
+    new_ssl_config(Config, NewSSL);
+drop_invalid_certs(#{ssl := SSL} = Config) ->
+    NewSSL = emqx_tls_lib:drop_invalid_certs(SSL),
+    new_ssl_config(Config, NewSSL);
+%% for bridges use connector name
+drop_invalid_certs(Config) ->
+    Config.
+
+new_ssl_config(RltvDir, Config, SSL) ->
+    case emqx_tls_lib:ensure_ssl_files(RltvDir, SSL) of
+        {ok, NewSSL} ->
+            {ok, new_ssl_config(Config, NewSSL)};
         {error, Reason} ->
             {error, {bad_ssl_config, Reason}}
     end.
 
-clear_certs(_RltvDir, undefined) ->
-    ok;
-clear_certs(RltvDir, Config) ->
-    OldSSL = map_get_oneof([<<"ssl">>, ssl], Config, undefined),
-    ok = emqx_tls_lib:delete_ssl_files(RltvDir, undefined, OldSSL).
-
-new_ssl_config(Config, undefined) -> Config;
-new_ssl_config(Config, #{<<"enable">> := _} = SSL) -> Config#{<<"ssl">> => SSL};
-new_ssl_config(Config, #{enable := _} = SSL) -> Config#{ssl => SSL}.
+new_ssl_config(#{connector := Connector} = Config, NewSSL) ->
+    Config#{connector => Connector#{ssl => NewSSL}};
+new_ssl_config(#{<<"connector">> := Connector} = Config, NewSSL) ->
+    Config#{<<"connector">> => Connector#{<<"ssl">> => NewSSL}};
+new_ssl_config(#{ssl := _} = Config, NewSSL) ->
+    Config#{ssl => NewSSL};
+new_ssl_config(#{<<"ssl">> := _} = Config, NewSSL) ->
+    Config#{<<"ssl">> => NewSSL};
+new_ssl_config(Config, _NewSSL) ->
+    Config.
 
 map_get_oneof([], _Map, Default) ->
     Default;

+ 1 - 3
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl

@@ -87,7 +87,6 @@ fields("connector") ->
             sc(
                 binary(),
                 #{
-                    default => "emqx",
                     desc => ?DESC("username")
                 }
             )},
@@ -95,7 +94,6 @@ fields("connector") ->
             sc(
                 binary(),
                 #{
-                    default => "emqx",
                     desc => ?DESC("password")
                 }
             )},
@@ -226,7 +224,7 @@ fields("egress") ->
             sc(
                 binary(),
                 #{
-                    required => true,
+                    default => <<"${payload}">>,
                     desc => ?DESC("payload")
                 }
             )}

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard.app.src

@@ -2,7 +2,7 @@
 {application, emqx_dashboard, [
     {description, "EMQX Web Dashboard"},
     % strict semver, bump manually!
-    {vsn, "5.0.4"},
+    {vsn, "5.0.5"},
     {modules, []},
     {registered, [emqx_dashboard_sup]},
     {applications, [kernel, stdlib, mnesia, minirest, emqx]},

+ 1 - 1
apps/emqx_dashboard/src/emqx_dashboard_middleware.erl

@@ -23,7 +23,7 @@
 execute(Req, Env) ->
     case check_dispatch_ready(Env) of
         true -> add_cors_flag(Req, Env);
-        false -> {stop, cowboy_req:reply(503, Req)}
+        false -> {stop, cowboy_req:reply(503, #{<<"retry-after">> => <<"15">>}, Req)}
     end.
 
 add_cors_flag(Req, Env) ->

+ 1 - 1
apps/emqx_exhook/src/emqx_exhook.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_exhook, [
     {description, "EMQX Extension for Hook"},
-    {vsn, "5.0.3"},
+    {vsn, "5.0.4"},
     {modules, []},
     {registered, []},
     {mod, {emqx_exhook_app, []}},

+ 2 - 2
apps/emqx_exhook/src/emqx_exhook_handler.erl

@@ -133,7 +133,7 @@ on_client_authenticate(ClientInfo, AuthResult) ->
     end.
 
 on_client_authorize(ClientInfo, PubSub, Topic, Result) ->
-    Bool = Result == allow,
+    Bool = maps:get(result, Result, deny) == allow,
     Type =
         case PubSub of
             publish -> 'PUBLISH';
@@ -158,7 +158,7 @@ on_client_authorize(ClientInfo, PubSub, Topic, Result) ->
                     true -> allow;
                     _ -> deny
                 end,
-            {StopOrOk, NResult};
+            {StopOrOk, #{result => NResult, from => exhook}};
         _ ->
             {ok, Result}
     end.

+ 29 - 3
apps/emqx_exhook/test/emqx_exhook_SUITE.erl

@@ -23,6 +23,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("emqx/include/emqx_hooks.hrl").
 
 -define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl).
 
@@ -105,7 +106,10 @@ load_cfg(Cfg) ->
 %%--------------------------------------------------------------------
 
 t_access_failed_if_no_server_running(Config) ->
-    emqx_exhook_mgr:disable(<<"default">>),
+    meck:expect(emqx_metrics_worker, inc, fun(_, _, _) -> ok end),
+    meck:expect(emqx_metrics, inc, fun(_) -> ok end),
+    emqx_hooks:add('client.authorize', {emqx_authz, authorize, [[]]}, ?HP_AUTHZ),
+
     ClientInfo = #{
         clientid => <<"user-id-1">>,
         username => <<"usera">>,
@@ -114,14 +118,35 @@ t_access_failed_if_no_server_running(Config) ->
         protocol => mqtt,
         mountpoint => undefined
     },
+    ?assertMatch(
+        allow,
+        emqx_access_control:authorize(
+            ClientInfo#{username => <<"gooduser">>},
+            publish,
+            <<"acl/1">>
+        )
+    ),
+
+    ?assertMatch(
+        deny,
+        emqx_access_control:authorize(
+            ClientInfo#{username => <<"baduser">>},
+            publish,
+            <<"acl/2">>
+        )
+    ),
+
+    emqx_exhook_mgr:disable(<<"default">>),
     ?assertMatch(
         {stop, {error, not_authorized}},
         emqx_exhook_handler:on_client_authenticate(ClientInfo, #{auth_result => success})
     ),
 
     ?assertMatch(
-        {stop, deny},
-        emqx_exhook_handler:on_client_authorize(ClientInfo, publish, <<"t/1">>, allow)
+        {stop, #{result := deny, from := exhook}},
+        emqx_exhook_handler:on_client_authorize(ClientInfo, publish, <<"t/1">>, #{
+            result => allow, from => exhook
+        })
     ),
 
     Message = emqx_message:make(<<"t/1">>, <<"abc">>),
@@ -130,6 +155,7 @@ t_access_failed_if_no_server_running(Config) ->
         emqx_exhook_handler:on_message_publish(Message)
     ),
     emqx_exhook_mgr:enable(<<"default">>),
+    emqx_hooks:del('client.authorize', {emqx_authz, authorize}),
     assert_get_basic_usage_info(Config).
 
 t_lookup(_) ->

+ 12 - 5
apps/emqx_exhook/test/props/prop_exhook_hooks.erl

@@ -133,9 +133,16 @@ prop_client_authenticate() ->
     ).
 
 prop_client_authorize() ->
+    MkResult = fun(Result) -> #{result => Result, from => exhook} end,
     ?ALL(
         {ClientInfo0, PubSub, Topic, Result, Meta},
-        {clientinfo(), oneof([publish, subscribe]), topic(), oneof([allow, deny]), request_meta()},
+        {
+            clientinfo(),
+            oneof([publish, subscribe]),
+            topic(),
+            oneof([MkResult(allow), MkResult(deny)]),
+            request_meta()
+        },
         begin
             ClientInfo = inject_magic_into(username, ClientInfo0),
             OutResult = emqx_hooks:run_fold(
@@ -145,9 +152,9 @@ prop_client_authorize() ->
             ),
             ExpectedOutResult =
                 case maps:get(username, ClientInfo) of
-                    <<"baduser">> -> deny;
-                    <<"gooduser">> -> allow;
-                    <<"normaluser">> -> allow;
+                    <<"baduser">> -> MkResult(deny);
+                    <<"gooduser">> -> MkResult(allow);
+                    <<"normaluser">> -> MkResult(allow);
                     _ -> Result
                 end,
             ?assertEqual(ExpectedOutResult, OutResult),
@@ -544,7 +551,7 @@ subopts(SubOpts) ->
 authresult_to_bool(AuthResult) ->
     AuthResult == ok.
 
-aclresult_to_bool(Result) ->
+aclresult_to_bool(#{result := Result}) ->
     Result == allow.
 
 pubsub_to_enum(publish) -> 'PUBLISH';

+ 6 - 1
apps/emqx_gateway/src/coap/emqx_coap_channel.erl

@@ -153,7 +153,7 @@ init(
             mountpoint => Mountpoint
         }
     ),
-
+    %% FIXME: it should coap.hearbeat instead of idle_timeout?
     Heartbeat = ?GET_IDLE_TIME(Config),
     #channel{
         ctx = Ctx,
@@ -447,6 +447,7 @@ enrich_conninfo(
         conninfo = ConnInfo
     }
 ) ->
+    %% FIXME: generate a random clientid if absent
     case Queries of
         #{<<"clientid">> := ClientId} ->
             Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)),
@@ -467,6 +468,9 @@ enrich_clientinfo(
     {Queries, Msg},
     Channel = #channel{clientinfo = ClientInfo0}
 ) ->
+    %% FIXME:
+    %% 1. generate a random clientid if absent;
+    %% 2. assgin username, password to `undefined` if absent
     case Queries of
         #{
             <<"username">> := UserName,
@@ -542,6 +546,7 @@ process_connect(
         )
     of
         {ok, _Sess} ->
+            %% FIXME: Token in cluster wide?
             RandVal = rand:uniform(?TOKEN_MAXIMUM),
             Token = erlang:list_to_binary(erlang:integer_to_list(RandVal)),
             NResult = Result#{events => [{event, connected}]},

+ 25 - 30
apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl

@@ -69,17 +69,7 @@ handle_method(_, _, Msg, _, _) ->
 check_topic([]) ->
     error;
 check_topic(Path) ->
-    Sep = <<"/">>,
-    {ok,
-        emqx_http_lib:uri_decode(
-            lists:foldl(
-                fun(Part, Acc) ->
-                    <<Acc/binary, Sep/binary, Part/binary>>
-                end,
-                <<>>,
-                Path
-            )
-        )}.
+    {ok, emqx_http_lib:uri_decode(iolist_to_binary(lists:join(<<"/">>, Path)))}.
 
 get_sub_opts(#coap_message{options = Opts} = Msg) ->
     SubOpts = maps:fold(fun parse_sub_opts/3, #{}, Opts),
@@ -124,25 +114,30 @@ get_publish_qos(Msg) ->
     end.
 
 apply_publish_opts(Msg, MQTTMsg) ->
-    maps:fold(
-        fun
-            (<<"retain">>, V, Acc) ->
-                Val = erlang:binary_to_atom(V),
-                emqx_message:set_flag(retain, Val, Acc);
-            (<<"expiry">>, V, Acc) ->
-                Val = erlang:binary_to_integer(V),
-                Props = emqx_message:get_header(properties, Acc),
-                emqx_message:set_header(
-                    properties,
-                    Props#{'Message-Expiry-Interval' => Val},
-                    Acc
-                );
-            (_, _, Acc) ->
-                Acc
-        end,
-        MQTTMsg,
-        emqx_coap_message:get_option(uri_query, Msg)
-    ).
+    case emqx_coap_message:get_option(uri_query, Msg) of
+        undefined ->
+            MQTTMsg;
+        Qs ->
+            maps:fold(
+                fun
+                    (<<"retain">>, V, Acc) ->
+                        Val = erlang:binary_to_atom(V),
+                        emqx_message:set_flag(retain, Val, Acc);
+                    (<<"expiry">>, V, Acc) ->
+                        Val = erlang:binary_to_integer(V),
+                        Props = emqx_message:get_header(properties, Acc),
+                        emqx_message:set_header(
+                            properties,
+                            Props#{'Message-Expiry-Interval' => Val},
+                            Acc
+                        );
+                    (_, _, Acc) ->
+                        Acc
+                end,
+                MQTTMsg,
+                Qs
+            )
+    end.
 
 subscribe(#coap_message{token = <<>>} = Msg, _, _, _) ->
     reply({error, bad_request}, <<"observe without token">>, Msg);

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway, [
     {description, "The Gateway management application"},
-    {vsn, "0.1.4"},
+    {vsn, "0.1.5"},
     {registered, []},
     {mod, {emqx_gateway_app, []}},
     {applications, [kernel, stdlib, grpc, emqx, emqx_authn]},

+ 54 - 26
apps/emqx_gateway/test/emqx_coap_SUITE.erl

@@ -143,12 +143,15 @@ t_connection_with_authn_failed(_) ->
     ok.
 
 t_publish(_) ->
-    Action = fun(Channel, Token) ->
-        Topic = <<"/abc">>,
+    %% can publish to a normal topic
+    Topics = [
+        <<"abc">>,
+        %% can publish to a `/` leading topic
+        <<"/abc">>
+    ],
+    Action = fun(Topic, Channel, Token) ->
         Payload = <<"123">>,
-
-        TopicStr = binary_to_list(Topic),
-        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        URI = pubsub_uri(binary_to_list(Topic), Token),
 
         %% Sub topic first
         emqx:subscribe(Topic),
@@ -164,24 +167,28 @@ t_publish(_) ->
             ?assert(false)
         end
     end,
-    with_connection(Action).
+    with_connection(Topics, Action).
 
 t_subscribe(_) ->
-    Topic = <<"/abc">>,
-    Fun = fun(Channel, Token) ->
-        TopicStr = binary_to_list(Topic),
+    %% can subscribe to a normal topic
+    Topics = [
+        <<"abc">>,
+        %% can subscribe to a `/` leading topic
+        <<"/abc">>
+    ],
+    Fun = fun(Topic, Channel, Token) ->
         Payload = <<"123">>,
-
-        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        URI = pubsub_uri(binary_to_list(Topic), Token),
         Req = make_req(get, Payload, [{observe, 0}]),
         {ok, content, _} = do_request(Channel, URI, Req),
         ?LOGT("observer topic:~ts~n", [Topic]),
 
+        %% ensure subscribe succeed
         timer:sleep(100),
         [SubPid] = emqx:subscribers(Topic),
         ?assert(is_pid(SubPid)),
 
-        %% Publish a message
+        %% publish a message
         emqx:publish(emqx_message:make(Topic, Payload)),
         {ok, content, Notify} = with_response(Channel),
         ?LOGT("observer get Notif=~p", [Notify]),
@@ -191,18 +198,27 @@ t_subscribe(_) ->
         ?assertEqual(Payload, PayloadRecv)
     end,
 
-    with_connection(Fun),
-    timer:sleep(100),
+    with_connection(Topics, Fun),
 
-    ?assertEqual([], emqx:subscribers(Topic)).
+    %% subscription removed if coap client disconnected
+    timer:sleep(100),
+    lists:foreach(
+        fun(Topic) ->
+            ?assertEqual([], emqx:subscribers(Topic))
+        end,
+        Topics
+    ).
 
 t_un_subscribe(_) ->
-    Topic = <<"/abc">>,
-    Fun = fun(Channel, Token) ->
-        TopicStr = binary_to_list(Topic),
+    %% can unsubscribe to a normal topic
+    Topics = [
+        <<"abc">>,
+        %% can unsubscribe to a `/` leading topic
+        <<"/abc">>
+    ],
+    Fun = fun(Topic, Channel, Token) ->
         Payload = <<"123">>,
-
-        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        URI = pubsub_uri(binary_to_list(Topic), Token),
 
         Req = make_req(get, Payload, [{observe, 0}]),
         {ok, content, _} = do_request(Channel, URI, Req),
@@ -219,16 +235,15 @@ t_un_subscribe(_) ->
         ?assertEqual([], emqx:subscribers(Topic))
     end,
 
-    with_connection(Fun).
+    with_connection(Topics, Fun).
 
 t_observe_wildcard(_) ->
     Fun = fun(Channel, Token) ->
         %% resolve_url can't process wildcard with #
-        Topic = <<"/abc/+">>,
-        TopicStr = binary_to_list(Topic),
+        Topic = <<"abc/+">>,
         Payload = <<"123">>,
 
-        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        URI = pubsub_uri(binary_to_list(Topic), Token),
         Req = make_req(get, Payload, [{observe, 0}]),
         {ok, content, _} = do_request(Channel, URI, Req),
         ?LOGT("observer topic:~ts~n", [Topic]),
@@ -238,7 +253,7 @@ t_observe_wildcard(_) ->
         ?assert(is_pid(SubPid)),
 
         %% Publish a message
-        PubTopic = <<"/abc/def">>,
+        PubTopic = <<"abc/def">>,
         emqx:publish(emqx_message:make(PubTopic, Payload)),
         {ok, content, Notify} = with_response(Channel),
 
@@ -320,7 +335,7 @@ t_clients_get_subscription_api(_) ->
 
         {200, [Subs]} = request(get, Path),
 
-        ?assertEqual(<<"/coap/observe">>, maps:get(topic, Subs)),
+        ?assertEqual(<<"coap/observe">>, maps:get(topic, Subs)),
 
         observe(Channel, Token, false),
 
@@ -386,6 +401,9 @@ observe(Channel, Token, false) ->
     {ok, nocontent, _Data} = do_request(Channel, URI, Req),
     ok.
 
+pubsub_uri(Topic, Token) when is_list(Topic), is_list(Token) ->
+    ?PS_PREFIX ++ "/" ++ Topic ++ "?clientid=client1&token=" ++ Token.
+
 make_req(Method) ->
     make_req(Method, <<>>).
 
@@ -442,6 +460,16 @@ with_connection(Action) ->
     end,
     do(Fun).
 
+with_connection(Checks, Action) ->
+    Fun = fun(Channel) ->
+        Token = connection(Channel),
+        timer:sleep(100),
+        lists:foreach(fun(E) -> Action(E, Channel, Token) end, Checks),
+        disconnection(Channel, Token),
+        timer:sleep(100)
+    end,
+    do(Fun).
+
 receive_deliver(Wait) ->
     receive
         {deliver, _, Msg} ->

+ 2 - 2
apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl

@@ -98,7 +98,7 @@ t_case_coap_publish(_) ->
     Prefix = Mod:ps_prefix(),
     Fun = fun(Channel, Token, Topic, Checker) ->
         TopicStr = binary_to_list(Topic),
-        URI = Prefix ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        URI = Prefix ++ "/" ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
 
         Req = Mod:make_req(post, <<>>),
         Checker(Mod:do_request(Channel, URI, Req))
@@ -114,7 +114,7 @@ t_case_coap_subscribe(_) ->
     Prefix = Mod:ps_prefix(),
     Fun = fun(Channel, Token, Topic, Checker) ->
         TopicStr = binary_to_list(Topic),
-        URI = Prefix ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        URI = Prefix ++ "/" ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
 
         Req = Mod:make_req(get, <<>>, [{observe, 0}]),
         Checker(Mod:do_request(Channel, URI, Req))

+ 1 - 1
apps/emqx_management/src/emqx_management.app.src

@@ -2,7 +2,7 @@
 {application, emqx_management, [
     {description, "EMQX Management API and CLI"},
     % strict semver, bump manually!
-    {vsn, "5.0.4"},
+    {vsn, "5.0.5"},
     {modules, []},
     {registered, [emqx_management_sup]},
     {applications, [kernel, stdlib, emqx_plugins, minirest, emqx]},

+ 0 - 32
apps/emqx_management/src/emqx_management_schema.erl

@@ -1,32 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
--module(emqx_management_schema).
-
--include_lib("typerefl/include/types.hrl").
-
--behaviour(hocon_schema).
-
--export([
-    namespace/0,
-    roots/0,
-    fields/1
-]).
-
-namespace() -> management.
-
-roots() -> [].
-
-fields(_) -> [].

+ 23 - 15
apps/emqx_management/src/emqx_mgmt_api_status.erl

@@ -33,18 +33,26 @@ init(Req0, State) ->
 %%--------------------------------------------------------------------
 
 running_status() ->
-    BrokerStatus =
-        case emqx:is_running() of
-            true ->
-                started;
-            false ->
-                stopped
-        end,
-    AppStatus =
-        case lists:keysearch(emqx, 1, application:which_applications()) of
-            false -> not_running;
-            {value, _Val} -> running
-        end,
-    Status = io_lib:format("Node ~ts is ~ts~nemqx is ~ts", [node(), BrokerStatus, AppStatus]),
-    Body = list_to_binary(Status),
-    {200, #{<<"content-type">> => <<"text/plain">>}, Body}.
+    case emqx_dashboard_listener:is_ready(timer:seconds(20)) of
+        true ->
+            BrokerStatus = broker_status(),
+            AppStatus = application_status(),
+            Body = io_lib:format("Node ~ts is ~ts~nemqx is ~ts", [node(), BrokerStatus, AppStatus]),
+            {200, #{<<"content-type">> => <<"text/plain">>}, list_to_binary(Body)};
+        false ->
+            {503, #{<<"retry-after">> => <<"15">>}, <<>>}
+    end.
+
+broker_status() ->
+    case emqx:is_running() of
+        true ->
+            started;
+        false ->
+            stopped
+    end.
+
+application_status() ->
+    case lists:keysearch(emqx, 1, application:which_applications()) of
+        false -> not_running;
+        {value, _Val} -> running
+    end.

+ 7 - 3
apps/emqx_plugin_libs/src/emqx_placeholder.erl

@@ -39,7 +39,7 @@
     sql_data/1
 ]).
 
--define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
+-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\}|\"\\$\\{[a-zA-Z0-9\\._]+\\}\")").
 %% Space and CRLF
 -define(EX_WITHE_CHARS, "\\s").
 
@@ -235,7 +235,9 @@ get_phld_var(Phld, Data) ->
     emqx_rule_maps:nested_get(Phld, Data).
 
 preproc_var_re(#{placeholders := PHs}) ->
-    "(" ++ string:join([ph_to_re(PH) || PH <- PHs], "|") ++ ")";
+    Res = [ph_to_re(PH) || PH <- PHs],
+    QuoteRes = ["\"" ++ Re ++ "\"" || Re <- Res],
+    "(" ++ string:join(Res ++ QuoteRes, "|") ++ ")";
 preproc_var_re(#{}) ->
     ?EX_PLACE_HOLDER.
 
@@ -292,7 +294,9 @@ parse_nested(Attr) ->
     end.
 
 unwrap(<<"${", Val/binary>>) ->
-    binary:part(Val, {0, byte_size(Val) - 1}).
+    binary:part(Val, {0, byte_size(Val) - 1});
+unwrap(<<"\"${", Val/binary>>) ->
+    binary:part(Val, {0, byte_size(Val) - 2}).
 
 quote_sql(Str) ->
     quote(Str, <<"\\\\'">>).

+ 1 - 1
apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_plugin_libs, [
     {description, "EMQX Plugin utility libs"},
-    {vsn, "4.3.2"},
+    {vsn, "4.3.3"},
     {modules, []},
     {applications, [kernel, stdlib]},
     {env, []}

+ 15 - 0
apps/emqx_plugin_libs/test/emqx_placeholder_SUITE.erl

@@ -150,6 +150,21 @@ t_preproc_sql6(_) ->
         emqx_placeholder:proc_sql(ParamsTokens, Selected)
     ).
 
+t_preproc_sql7(_) ->
+    Selected = #{a => <<"a">>, b => <<"b">>},
+    {PrepareStatement, ParamsTokens} = emqx_placeholder:preproc_sql(
+        <<"a:\"${a}\",b:\"${b}\"">>,
+        #{
+            replace_with => '$n',
+            placeholders => [<<"${a}">>]
+        }
+    ),
+    ?assertEqual(<<"a:$1,b:\"${b}\"">>, PrepareStatement),
+    ?assertEqual(
+        [<<"a">>],
+        emqx_placeholder:proc_sql(ParamsTokens, Selected)
+    ).
+
 t_preproc_tmpl_deep(_) ->
     Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
 

+ 11 - 6
bin/emqx

@@ -7,7 +7,7 @@ set -euo pipefail
 DEBUG="${DEBUG:-0}"
 [ "$DEBUG" -eq 1 ] && set -x
 
-RUNNER_ROOT_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")"/..; pwd -P)"
+RUNNER_ROOT_DIR="$(cd "$(dirname "$(realpath "$0" || echo "$0")")"/..; pwd -P)"
 
 # shellcheck disable=SC1090,SC1091
 . "$RUNNER_ROOT_DIR"/releases/emqx_vars
@@ -600,7 +600,7 @@ is_down() {
         if ps -p "$PID" | grep -q 'defunct'; then
             # zombie state, print parent pid
             parent="$(ps -o ppid= -p "$PID" | tr -d ' ')"
-            echo "WARN: $PID is marked <defunct>, parent:"
+            echo "WARNING: $PID is marked <defunct>, parent:"
             ps -p "$parent"
             return 0
         fi
@@ -748,8 +748,9 @@ export ESCRIPT_NAME="$SHORT_NAME"
 
 PIPE_DIR="${PIPE_DIR:-/$DATA_DIR/${WHOAMI}_erl_pipes/$NAME/}"
 
-## make EMQX_NODE_COOKIE right
+## Resolve Erlang cookie.
 if [ -n "${EMQX_NODE_COOKIE:-}" ]; then
+    ## To be backward compatible, read EMQX_NODE_COOKIE
     export EMQX_NODE__COOKIE="${EMQX_NODE_COOKIE}"
     unset EMQX_NODE_COOKIE
 fi
@@ -762,9 +763,13 @@ if [ -z "$COOKIE" ]; then
         COOKIE="$(grep -E '^-setcookie' "${vm_args_file}" | awk '{print $2}')"
     fi
 fi
-
-if [ -z "$COOKIE" ]; then
-    die "Please set node.cookie in $EMQX_ETC_DIR/emqx.conf or override from environment variable EMQX_NODE__COOKIE"
+[ -z "$COOKIE" ] && COOKIE="$EMQX_DEFAULT_ERLANG_COOKIE"
+if [ $IS_BOOT_COMMAND = 'yes' ] && [ "$COOKIE" = "$EMQX_DEFAULT_ERLANG_COOKIE" ]; then
+    echoerr "!!!!!!"
+    echoerr "WARNING: Default (insecure) Erlang cookie is in use."
+    echoerr "WARNING: Configure node.cookie in $EMQX_ETC_DIR/emqx.conf or override from environment variable EMQX_NODE__COOKIE"
+    echoerr "NOTE: Use the same config value for all nodes in the cluster."
+    echoerr "!!!!!!"
 fi
 
 ## check if OTP version has mnesia_hook feature; if not, fallback to

+ 1 - 0
build

@@ -157,6 +157,7 @@ make_relup() {
     local name_pattern
     name_pattern="${PROFILE}-$(./pkg-vsn.sh "$PROFILE" --vsn_matcher --long)"
     local releases=()
+    mkdir -p _upgrade_base
     while read -r tgzfile ; do
         local base_vsn
         base_vsn="$(echo "$tgzfile" | grep -oE "[0-9]+\.[0-9]+\.[0-9]+(-(alpha|beta|rc)\.[0-9])?(-[0-9a-f]{8})?" | head -1)"

+ 2 - 2
deploy/charts/emqx/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 5
+version: 5.0.7
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: latest
+appVersion: 5.0.7

+ 6 - 0
mix.exs

@@ -547,6 +547,7 @@ defmodule EMQXUmbrella.MixProject do
 
   defp template_vars(release, release_type, :bin = _package_type, edition_type) do
     [
+      emqx_default_erlang_cookie: default_cookie(),
       platform_data_dir: "data",
       platform_etc_dir: "etc",
       platform_log_dir: "log",
@@ -569,6 +570,7 @@ defmodule EMQXUmbrella.MixProject do
 
   defp template_vars(release, release_type, :pkg = _package_type, edition_type) do
     [
+      emqx_default_erlang_cookie: default_cookie(),
       platform_data_dir: "/var/lib/emqx",
       platform_etc_dir: "/etc/emqx",
       platform_log_dir: "/var/log/emqx",
@@ -589,6 +591,10 @@ defmodule EMQXUmbrella.MixProject do
     ] ++ build_info()
   end
 
+  defp default_cookie() do
+    "emqx50elixir"
+  end
+
   defp emqx_description(release_type, edition_type) do
     case {release_type, edition_type} do
       {:cloud, :enterprise} ->

+ 4 - 5
rebar.config.erl

@@ -298,14 +298,13 @@ relform() ->
 emqx_description(cloud, ee) -> "EMQX Enterprise";
 emqx_description(cloud, ce) -> "EMQX".
 
-overlay_vars(RelType, PkgType, Edition) ->
-    overlay_vars_rel(RelType) ++
+overlay_vars(cloud, PkgType, Edition) ->
+    [
+        {emqx_default_erlang_cookie, "emqxsecretcookie"}
+    ] ++
         overlay_vars_pkg(PkgType) ++
         overlay_vars_edition(Edition).
 
-overlay_vars_rel(cloud) ->
-    [{vm_args_file, "vm.args"}].
-
 overlay_vars_edition(ce) ->
     [
         {emqx_schema_mod, emqx_conf_schema},

+ 8 - 6
rel/emqx_vars

@@ -7,17 +7,19 @@ REL_VSN="{{ release_version }}"
 ERTS_VSN="{{ erts_vsn }}"
 ERL_OPTS="{{ erl_opts }}"
 RUNNER_BIN_DIR="{{ runner_bin_dir }}"
-RUNNER_LOG_DIR="{{ runner_log_dir }}"
 RUNNER_LIB_DIR="{{ runner_lib_dir }}"
+IS_ELIXIR="${IS_ELIXIR:-{{ is_elixir }}}"
+## Allow users to pre-set `RUNNER_LOG_DIR` because it only affects boot commands like `start` and `console`,
+## but not other commands such as `ping` and `ctl`.
+RUNNER_LOG_DIR="${RUNNER_LOG_DIR:-{{ runner_log_dir }}}"
 EMQX_ETC_DIR="{{ emqx_etc_dir }}"
 RUNNER_USER="{{ runner_user }}"
-IS_ELIXIR="${IS_ELIXIR:-{{ is_elixir }}}"
 SCHEMA_MOD="{{ emqx_schema_mod }}"
 IS_ENTERPRISE="{{ is_enterprise }}"
-
-export EMQX_DESCRIPTION='{{ emqx_description }}'
-
-## computed vars
+## Do not change EMQX_DEFAULT_ERLANG_COOKIE.
+## Configure EMQX_NODE_COOKIE instead
+EMQX_DEFAULT_ERLANG_COOKIE='{{ emqx_default_erlang_cookie }}'
 REL_NAME="emqx"
+export EMQX_DESCRIPTION='{{ emqx_description }}'
 
 ## updated vars here

+ 3 - 2
scripts/apps-version-check.sh

@@ -12,7 +12,8 @@ parse_semver() {
     echo "$1" | tr '.|-' ' '
 }
 
-while read -r app; do
+APPS="$(./scripts/find-apps.sh)"
+for app in ${APPS}; do
     if [ "$app" != "emqx" ]; then
         app_path="$app"
     else
@@ -46,7 +47,7 @@ while read -r app; do
             bad_app_count=$(( bad_app_count + 1))
         fi
     fi
-done < <(./scripts/find-apps.sh)
+done
 
 if [ $bad_app_count -gt 0 ]; then
     exit 1

+ 112 - 0
scripts/ct/run.sh

@@ -0,0 +1,112 @@
+#!/usr/bin/env bash
+
+## This script runs CT (and necessary dependencies) in docker container(s)
+
+set -euo pipefail
+
+# ensure dir
+cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/../.."
+
+help() {
+    echo
+    echo "-h|--help:              To display this usage info"
+    echo "--app lib_dir/app_name: Print apps in json"
+    echo "--console:              Start EMQX in console mode"
+}
+
+WHICH_APP='novalue'
+CONSOLE='no'
+while [ "$#" -gt 0 ]; do
+    case $1 in
+        -h|--help)
+            help
+            exit 0
+            ;;
+        --app)
+            WHICH_APP="$2"
+            shift 2
+            ;;
+        --console)
+            CONSOLE='yes'
+            shift 1
+            ;;
+        *)
+            echo "unknown option $1"
+            exit 1
+            ;;
+    esac
+done
+
+if [ "${WHICH_APP}" = 'novalue' ]; then
+    echo "must provide --app arg"
+    exit 1
+fi
+
+ERLANG_CONTAINER='erlang24'
+DOCKER_CT_ENVS_FILE="${WHICH_APP}/docker-ct"
+
+if [ -f "$DOCKER_CT_ENVS_FILE" ]; then
+    # shellcheck disable=SC2002
+    CT_DEPS="$(cat "$DOCKER_CT_ENVS_FILE" | xargs)"
+fi
+CT_DEPS="${ERLANG_CONTAINER} ${CT_DEPS}"
+
+FILES=( )
+
+for dep in ${CT_DEPS}; do
+    case "${dep}" in
+        erlang24)
+            FILES+=( '.ci/docker-compose-file/docker-compose.yaml' )
+            ;;
+        mongo)
+            FILES+=( '.ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml'
+                     '.ci/docker-compose-file/docker-compose-mongo-single-tls.yaml' )
+            ;;
+        redis)
+            FILES+=( '.ci/docker-compose-file/docker-compose-redis-single-tcp.yaml'
+                     '.ci/docker-compose-file/docker-compose-redis-single-tls.yaml'
+                     '.ci/docker-compose-file/docker-compose-redis-sentinel-tcp.yaml'
+                     '.ci/docker-compose-file/docker-compose-redis-sentinel-tls.yaml' )
+            ;;
+        mysql)
+            FILES+=( '.ci/docker-compose-file/docker-compose-mysql-tcp.yaml'
+                     '.ci/docker-compose-file/docker-compose-mysql-tls.yaml' )
+            ;;
+        pgsql)
+            FILES+=( '.ci/docker-compose-file/docker-compose-pgsql-tcp.yaml'
+                     '.ci/docker-compose-file/docker-compose-pgsql-tls.yaml' )
+            ;;
+        *)
+            echo "unknown_ct_dependency $dep"
+            exit 1
+            ;;
+    esac
+done
+
+F_OPTIONS=""
+
+for file in "${FILES[@]}"; do
+    F_OPTIONS="$F_OPTIONS -f $file"
+done
+
+# shellcheck disable=2086 # no quotes for F_OPTIONS
+docker-compose $F_OPTIONS up -d --build
+
+# /emqx is where the source dir is mounted to the Erlang container
+# in .ci/docker-compose-file/docker-compose.yaml
+TTY=''
+if [[ -t 1 ]]; then
+    TTY='-t'
+fi
+docker exec -i $TTY "$ERLANG_CONTAINER" bash -c 'git config --global --add safe.directory /emqx'
+
+if [ "$CONSOLE" = 'yes' ]; then
+    docker exec -i $TTY "$ERLANG_CONTAINER" bash -c "make run"
+else
+    set +e
+    docker exec -i $TTY "$ERLANG_CONTAINER" bash -c "make ${WHICH_APP}-ct"
+    RESULT=$?
+    # shellcheck disable=2086 # no quotes for F_OPTIONS
+    docker-compose $F_OPTIONS down
+    exit $RESULT
+fi

+ 0 - 4
scripts/docker-ct-apps

@@ -1,4 +0,0 @@
-# apps need docker-compose to run CT
-apps/emqx_authn
-apps/emqx_authz
-apps/emqx_connector

+ 16 - 15
scripts/find-apps.sh

@@ -50,24 +50,25 @@ find_app() {
 
 CE="$(find_app 'apps')"
 EE="$(find_app 'lib-ee')"
-
-if [ "$CT" = 'novalue' ]; then
-    echo -e "${CE}\n${EE}"
-    exit 0
-fi
-
 APPS_ALL="$(echo -e "${CE}\n${EE}")"
-APPS_DOCKER_CT="$(grep -v -E '^#.*' scripts/docker-ct-apps)"
 
-# shellcheck disable=SC2068
-for app in ${APPS_DOCKER_CT[@]}; do
-    APPS_ALL=("${APPS_ALL[@]/$app}")
-done
-
-if [ "$CT" = 'docker' ]; then
-    RESULT="${APPS_DOCKER_CT}"
+if [ "$CT" = 'novalue' ]; then
+    RESULT="${APPS_ALL}"
 else
-    RESULT="${APPS_ALL[*]}"
+    APPS_NORMAL_CT=( )
+    APPS_DOCKER_CT=( )
+    for app in ${APPS_ALL}; do
+        if [ -f "${app}/docker-ct" ]; then
+            APPS_DOCKER_CT+=("$app")
+        else
+            APPS_NORMAL_CT+=("$app")
+        fi
+    done
+    if [ "$CT" = 'docker' ]; then
+        RESULT="${APPS_DOCKER_CT[*]}"
+    else
+        RESULT="${APPS_NORMAL_CT[*]}"
+    fi
 fi
 
 if [ "$WANT_JSON" = 'yes' ]; then

+ 1 - 1
scripts/get-dashboard.sh

@@ -42,7 +42,7 @@ curl -L --silent --show-error \
      --output "${RELEASE_ASSET_FILE}" \
      "$DIRECT_DOWNLOAD_URL"
 
-unzip -q "$RELEASE_ASSET_FILE" -d "$DASHBOARD_PATH"
+unzip -o -q "$RELEASE_ASSET_FILE" -d "$DASHBOARD_PATH"
 rm -rf "$DASHBOARD_PATH/www"
 mv "$DASHBOARD_PATH/dist" "$DASHBOARD_PATH/www"
 rm -f "$RELEASE_ASSET_FILE"