Просмотр исходного кода

Merge pull request #6666 from JimMoen/merge-5.0-beta.3-to-master

Merge 5.0 beta.3 to master
zhongwencool 4 лет назад
Родитель
Сommit
3bdfa183f3
29 измененных файлов с 496 добавлено и 364 удалено
  1. 1 1
      .github/workflows/run_api_tests.yaml
  2. 1 1
      Makefile
  3. 1 5
      apps/emqx/include/emqx_release.hrl
  4. 35 0
      apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl
  5. 1 17
      apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl
  6. 33 27
      apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl
  7. 2 4
      apps/emqx/src/emqx_metrics.erl
  8. 2 20
      apps/emqx/test/emqx_metrics_SUITE.erl
  9. 1 1
      apps/emqx/test/emqx_proper_types.erl
  10. 14 14
      apps/emqx_authn/src/simple_authn/emqx_authn_http.erl
  11. 16 3
      apps/emqx_authz/src/emqx_authz_api_sources.erl
  12. 113 113
      apps/emqx_authz/src/emqx_authz_http.erl
  13. 51 7
      apps/emqx_authz/src/emqx_authz_schema.erl
  14. 1 3
      apps/emqx_authz/test/emqx_authz_SUITE.erl
  15. 1 3
      apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl
  16. 27 90
      apps/emqx_authz/test/emqx_authz_http_SUITE.erl
  17. 20 9
      apps/emqx_connector/src/emqx_connector_http.erl
  18. 11 10
      apps/emqx_connector/test/emqx_connector_api_SUITE.erl
  19. 1 0
      apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
  20. 1 1
      apps/emqx_gateway/src/emqx_gateway_utils.erl
  21. 1 1
      apps/emqx_modules/src/emqx_topic_metrics_api.erl
  22. 63 1
      apps/emqx_prometheus/src/emqx_prometheus.erl
  23. 7 10
      apps/emqx_prometheus/src/emqx_prometheus_api.erl
  24. 17 2
      apps/emqx_retainer/src/emqx_retainer.erl
  25. 7 2
      apps/emqx_retainer/src/emqx_retainer_mnesia.erl
  26. 2 2
      apps/emqx_slow_subs/etc/emqx_slow_subs.conf
  27. 58 0
      apps/emqx_statsd/src/emqx_statsd.erl
  28. 2 11
      apps/emqx_statsd/src/emqx_statsd_api.erl
  29. 6 6
      rebar.config.erl

+ 1 - 1
.github/workflows/run_api_tests.yaml

@@ -61,7 +61,7 @@ jobs:
     - uses: actions/checkout@v2
       with:
         repository: emqx/emqx-fvt
-        ref: 1.0.3-dev1
+        ref: 1.0.3-dev2
         path: .
     - uses: actions/setup-java@v1
       with:

+ 1 - 1
Makefile

@@ -7,7 +7,7 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/4.4-2:23.3.4.9-3-alpine3
 export EMQX_DEFAULT_RUNNER = alpine:3.14
 export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
 export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh)
-export EMQX_DASHBOARD_VERSION ?= v0.14.0
+export EMQX_DASHBOARD_VERSION ?= v0.17.0
 export DOCKERFILE := deploy/docker/Dockerfile
 export DOCKERFILE_TESTING := deploy/docker/Dockerfile.testing
 ifeq ($(OS),Windows_NT)

+ 1 - 5
apps/emqx/include/emqx_release.hrl

@@ -24,8 +24,4 @@
 
 %% NOTE: This version number should be manually bumped for each release
 
-%% NOTE: This version number should have 3 numeric parts
-%% (Major.Minor.Patch), and extra info can be added after a final
-%% hyphen.
-
--define(EMQX_RELEASE, "5.0.0-beta.2").
+-define(EMQX_RELEASE, "5.0-beta.3").

+ 35 - 0
apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl

@@ -0,0 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_limiter_correction).
+
+%% API
+-export([ add/2 ]).
+
+-type correction_value() :: #{ correction := emqx_limiter_decimal:zero_or_float()
+                             , any() => any()
+                             }.
+
+-export_type([correction_value/0]).
+
+%%--------------------------------------------------------------------
+%%% API
+%%--------------------------------------------------------------------
+-spec add(number(), correction_value()) -> {integer(), correction_value()}.
+add(Inc, #{correction := Correction} = Data) ->
+    FixedInc = Inc + Correction,
+    IntInc = erlang:floor(FixedInc),
+    {IntInc, Data#{correction := FixedInc - IntInc}}.

+ 1 - 17
apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl

@@ -20,7 +20,7 @@
 
 %% API
 -export([ add/2, sub/2, mul/2
-        , add_to_counter/3, put_to_counter/3, floor_div/2]).
+        , put_to_counter/3, floor_div/2]).
 -export_type([decimal/0, zero_or_float/0]).
 
 -type decimal() :: infinity | number().
@@ -60,22 +60,6 @@ floor_div(infinity, _) ->
 floor_div(A, B) ->
     erlang:floor(A / B).
 
--spec add_to_counter(counters:counters_ref(), pos_integer(), decimal()) ->
-          {zero_or_float(), zero_or_float()}.
-add_to_counter(_, _, infinity) ->
-    {0, 0};
-add_to_counter(Counter, Index, Val) when is_float(Val) ->
-    IntPart = erlang:floor(Val),
-    if IntPart > 0 ->
-            counters:add(Counter, Index, IntPart);
-       true ->
-            ok
-    end,
-    {IntPart, Val - IntPart};
-add_to_counter(Counter, Index, Val) ->
-    counters:add(Counter, Index, Val),
-    {Val, 0}.
-
 -spec put_to_counter(counters:counters_ref(), pos_integer(), decimal()) -> ok.
 put_to_counter(_, _, infinity) ->
     ok;

+ 33 - 27
apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl

@@ -87,7 +87,7 @@
 -define(OVERLOAD_MIN_ALLOC, 0.3).  %% minimum coefficient for overloaded limiter
 
 -export_type([index/0]).
--import(emqx_limiter_decimal, [add/2, sub/2, mul/2, add_to_counter/3, put_to_counter/3]).
+-import(emqx_limiter_decimal, [add/2, sub/2, mul/2,  put_to_counter/3]).
 
 %%--------------------------------------------------------------------
 %% API
@@ -317,12 +317,11 @@ longitudinal(#{id := Id,
 longitudinal(#{id := Id,
                rate := Rate,
                capacity := Capacity,
-               correction := Correction,
                counter := Counter,
                index := Index,
                obtained := Obtained} = Node,
              InFlow, Nodes) when Counter =/= undefined ->
-    Flow = add(erlang:min(InFlow, Rate), Correction),
+    Flow = erlang:min(InFlow, Rate),
 
     ShouldAlloc =
         case counters:get(Counter, Index) of
@@ -340,11 +339,11 @@ longitudinal(#{id := Id,
         Avaiable when Avaiable > 0 ->
             %% XXX if capacity is infinity, and flow always > 0, the value in counter
             %% will be overflow at some point in the future, do we need to deal with this situation???
-            {Alloced, Decimal} = add_to_counter(Counter, Index, Avaiable),
+            {Inc, Node2} = emqx_limiter_correction:add(Avaiable, Node),
+            counters:add(Counter, Index, Inc),
 
-            {Alloced,
-             Nodes#{Id := Node#{obtained := Obtained + Alloced,
-                                correction := Decimal}}};
+            {Inc,
+             Nodes#{Id := Node2#{obtained := Obtained + Inc}}};
         _ ->
             {0, Nodes}
     end;
@@ -411,31 +410,38 @@ dispatch_burst([], State) ->
 dispatch_burst(GroupL,
                #{root := #{burst := Burst},
                  nodes := Nodes} = State) ->
-    InFlow = erlang:floor(Burst / erlang:length(GroupL)),
+    InFlow = Burst / erlang:length(GroupL),
     Dispatch = fun({Zone, Childs}, NodeAcc) ->
-                   #{id := ZoneId,
-                     burst := ZoneBurst,
-                     obtained := Obtained} = Zone,
-
-                   ZoneFlow = erlang:min(InFlow, ZoneBurst),
-                   EachFlow = ZoneFlow div erlang:length(Childs),
-                   Zone2 = Zone#{obtained := Obtained + ZoneFlow},
-                   NodeAcc2 = NodeAcc#{ZoneId := Zone2},
-                   dispatch_burst_to_buckets(Childs, EachFlow, NodeAcc2)
+                       #{id := ZoneId,
+                         burst := ZoneBurst,
+                         obtained := Obtained} = Zone,
+
+                       case erlang:min(InFlow, ZoneBurst) of
+                           0 -> NodeAcc;
+                           ZoneFlow ->
+                               EachFlow = ZoneFlow / erlang:length(Childs),
+                               {Alloced, NodeAcc2} = dispatch_burst_to_buckets(Childs, EachFlow, 0, NodeAcc),
+                               Zone2 = Zone#{obtained := Obtained + Alloced},
+                               NodeAcc2#{ZoneId := Zone2}
+                       end
                end,
     State#{nodes := lists:foldl(Dispatch, Nodes, GroupL)}.
 
 -spec dispatch_burst_to_buckets(list(node_id()),
-                                non_neg_integer(), nodes()) -> nodes().
-dispatch_burst_to_buckets(Childs, InFlow, Nodes) ->
-    Each = fun(ChildId, NodeAcc) ->
-                   #{counter := Counter,
-                     index := Index,
-                     obtained := Obtained} = Bucket = maps:get(ChildId, NodeAcc),
-                   counters:add(Counter, Index, InFlow),
-                   NodeAcc#{ChildId := Bucket#{obtained := Obtained + InFlow}}
-           end,
-    lists:foldl(Each, Nodes, Childs).
+                                float(), non_neg_integer(), nodes()) -> {non_neg_integer(), nodes()}.
+dispatch_burst_to_buckets([ChildId | T], InFlow, Alloced, Nodes) ->
+    #{counter := Counter,
+      index := Index,
+      obtained := Obtained} = Bucket = maps:get(ChildId, Nodes),
+    {Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket),
+
+    counters:add(Counter, Index, Inc),
+
+    Nodes2 = Nodes#{ChildId := Bucket2#{obtained := Obtained + Inc}},
+    dispatch_burst_to_buckets(T, InFlow, Alloced + Inc, Nodes2);
+
+dispatch_burst_to_buckets([], _, Alloced, Nodes) ->
+    {Alloced, Nodes}.
 
 -spec init_tree(emqx_limiter_schema:limiter_type(), state()) -> state().
 init_tree(Type, State) ->

+ 2 - 4
apps/emqx/src/emqx_metrics.erl

@@ -146,7 +146,6 @@
          {counter, 'messages.dropped.expired'},  % QoS2 Messages expired
          {counter, 'messages.dropped.no_subscribers'},  % Messages dropped
          {counter, 'messages.forward'},       % Messages forward
-         {counter, 'messages.retained'},      % Messages retained
          {counter, 'messages.delayed'},       % Messages delayed
          {counter, 'messages.delivered'},     % Messages delivered
          {counter, 'messages.acked'}          % Messages acked
@@ -207,7 +206,7 @@ stop() -> gen_server:stop(?SERVER).
 
 %% BACKW: v4.3.0
 upgrade_retained_delayed_counter_type() ->
-    Ks = ['messages.retained', 'messages.delayed'],
+    Ks = ['messages.delayed'],
     gen_server:call(?SERVER, {set_type_to_counter, Ks}, infinity).
 
 %%--------------------------------------------------------------------
@@ -556,7 +555,7 @@ reserved_idx('messages.dropped')             -> 109;
 reserved_idx('messages.dropped.expired')     -> 110;
 reserved_idx('messages.dropped.no_subscribers') -> 111;
 reserved_idx('messages.forward')             -> 112;
-reserved_idx('messages.retained')            -> 113;
+%%reserved_idx('messages.retained')            -> 113; %% keep the index, new metrics can use this
 reserved_idx('messages.delayed')             -> 114;
 reserved_idx('messages.delivered')           -> 115;
 reserved_idx('messages.acked')               -> 116;
@@ -592,4 +591,3 @@ reserved_idx('olp.gc')                       -> 303;
 reserved_idx('olp.new_conn')                 -> 304;
 
 reserved_idx(_)                              -> undefined.
-

+ 2 - 20
apps/emqx/test/emqx_metrics_SUITE.erl

@@ -71,19 +71,10 @@ t_inc_dec(_) ->
     with_metrics_server(
       fun() ->
         ?assertEqual(0, emqx_metrics:val('bytes.received')),
-        ?assertEqual(0, emqx_metrics:val('messages.retained')),
         ok = emqx_metrics:inc('bytes.received'),
         ok = emqx_metrics:inc('bytes.received', 2),
         ok = emqx_metrics:inc('bytes.received', 2),
-        ?assertEqual(5, emqx_metrics:val('bytes.received')),
-        ok = emqx_metrics:inc('messages.retained', 2),
-        ok = emqx_metrics:inc('messages.retained', 2),
-        ?assertEqual(4, emqx_metrics:val('messages.retained')),
-        ok = emqx_metrics:dec('messages.retained'),
-        ok = emqx_metrics:dec('messages.retained', 1),
-        ?assertEqual(2, emqx_metrics:val('messages.retained')),
-        ok = emqx_metrics:set('messages.retained', 3),
-        ?assertEqual(3, emqx_metrics:val('messages.retained'))
+        ?assertEqual(5, emqx_metrics:val('bytes.received'))
       end).
 
 t_inc_recv(_) ->
@@ -162,21 +153,12 @@ t_trans(_) ->
         ok = emqx_metrics:trans(inc, 'bytes.received'),
         ok = emqx_metrics:trans(inc, 'bytes.received', 2),
         ?assertEqual(0, emqx_metrics:val('bytes.received')),
-        ok = emqx_metrics:trans(inc, 'messages.retained', 2),
-        ok = emqx_metrics:trans(inc, 'messages.retained', 2),
-        ?assertEqual(0, emqx_metrics:val('messages.retained')),
         ok = emqx_metrics:commit(),
         ?assertEqual(3, emqx_metrics:val('bytes.received')),
-        ?assertEqual(4, emqx_metrics:val('messages.retained')),
-        ok = emqx_metrics:trans(dec, 'messages.retained'),
-        ok = emqx_metrics:trans(dec, 'messages.retained', 1),
-        ?assertEqual(4, emqx_metrics:val('messages.retained')),
-        ok = emqx_metrics:commit(),
-        ?assertEqual(2, emqx_metrics:val('messages.retained'))
+        ok = emqx_metrics:commit()
       end).
 
 with_metrics_server(Fun) ->
     {ok, _} = emqx_metrics:start_link(),
     _ = Fun(),
     ok = emqx_metrics:stop().
-

+ 1 - 1
apps/emqx/test/emqx_proper_types.erl

@@ -288,7 +288,7 @@ systopic_metrics() ->
               <<"messages/qos2/received">>, <<"messages/qos2/sent">>,
               <<"messages/publish">>, <<"messages/dropped">>,
               <<"messages/dropped/expired">>, <<"messages/dropped/no_subscribers">>,
-              <<"messages/forward">>, <<"messages/retained">>,
+              <<"messages/forward">>,
               <<"messages/delayed">>, <<"messages/delivered">>,
               <<"messages/acked">>],
     ?LET({Nodename, T},

+ 14 - 14
apps/emqx_authn/src/simple_authn/emqx_authn_http.erl

@@ -19,6 +19,7 @@
 -include("emqx_authn.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("typerefl/include/types.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
 
 -behaviour(hocon_schema).
 -behaviour(emqx_authentication).
@@ -77,7 +78,7 @@ validations() ->
     ].
 
 url(type) -> binary();
-url(validator) -> [fun check_url/1];
+url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")];
 url(nullable) -> false;
 url(_) -> undefined.
 
@@ -118,16 +119,16 @@ create(_AuthenticatorID, Config) ->
     create(Config).
 
 create(#{method := Method,
-         url := URL,
+         url := RawURL,
          headers := Headers,
          body := Body,
          request_timeout := RequestTimeout} = Config) ->
-    #{path := Path,
-      query := Query} = URIMap = parse_url(URL),
+    {BsaeUrlWithPath, Query} = parse_fullpath(RawURL),
+    URIMap = parse_url(BsaeUrlWithPath),
     ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
     State = #{method          => Method,
-              path            => Path,
-              base_query      => cow_qs:parse_qs(list_to_binary(Query)),
+              path            => maps:get(path, URIMap),
+              base_query      => cow_qs:parse_qs(to_bin(Query)),
               headers         => maps:to_list(Headers),
               body            => maps:to_list(Body),
               request_timeout => RequestTimeout,
@@ -204,11 +205,8 @@ destroy(#{resource_id := ResourceId}) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
-check_url(URL) ->
-    case emqx_http_lib:uri_parse(URL) of
-        {ok, _} -> true;
-        {error, _} -> false
-    end.
+parse_fullpath(RawURL) ->
+    cow_http:parse_fullpath(to_bin(RawURL)).
 
 check_body(Body) ->
     lists:all(
@@ -234,7 +232,8 @@ transform_header_name(Headers) ->
               end, #{}, Headers).
 
 check_ssl_opts(Conf) ->
-    case parse_url(get_conf_val("url", Conf)) of
+    {BaseUrlWithPath, _Query} = parse_fullpath(get_conf_val("url", Conf)),
+    case parse_url(BaseUrlWithPath) of
         #{scheme := https} ->
             case get_conf_val("ssl.enable", Conf) of
                 true -> ok;
@@ -264,12 +263,13 @@ generate_request(Credential, #{method := Method,
                                headers := Headers,
                                body := Body0}) ->
     Body = replace_placeholders(Body0, Credential),
+    NBaseQuery = replace_placeholders(BaseQuery, Credential),
     case Method of
         get ->
-            NPath = append_query(Path, BaseQuery ++ Body),
+            NPath = append_query(Path, NBaseQuery ++ Body),
             {NPath, Headers};
         post ->
-            NPath = append_query(Path, BaseQuery),
+            NPath = append_query(Path, NBaseQuery),
             ContentType = proplists:get_value(<<"content-type">>, Headers),
             NBody = serialize_body(ContentType, Body),
             {NPath, Headers, NBody}

+ 16 - 3
apps/emqx_authz/src/emqx_authz_api_sources.erl

@@ -412,9 +412,22 @@ get_raw_sources() ->
     RawSources = emqx:get_raw_config([authorization, sources], []),
     Schema = #{roots => emqx_authz_schema:fields("authorization"), fields => #{}},
     Conf = #{<<"sources">> => RawSources},
-    #{<<"sources">> := Sources} = hocon_schema:check_plain(Schema, Conf,
-                                                           #{only_fill_defaults => true}),
-    Sources.
+    #{<<"sources">> := Sources} = hocon_schema:check_plain(Schema, Conf, #{only_fill_defaults => true}),
+    merge_default_headers(Sources).
+
+merge_default_headers(Sources) ->
+    lists:map(fun(Source) ->
+        Convert =
+            case Source of
+                #{<<"method">> := <<"get">>} ->
+                    emqx_authz_schema:headers_no_content_type(converter);
+                #{<<"method">> := <<"post">>} ->
+                    emqx_authz_schema:headers(converter);
+                _ -> fun(H) -> H end
+            end,
+        Headers = Convert(maps:get(<<"headers">>, Source, #{})),
+        Source#{<<"headers">> => Headers}
+              end, Sources).
 
 get_raw_source(Type) ->
     lists:filter(fun (#{<<"type">> := T}) ->

+ 113 - 113
apps/emqx_authz/src/emqx_authz_http.erl

@@ -29,6 +29,7 @@
         , destroy/1
         , dry_run/1
         , authorize/4
+        , parse_url/1
         ]).
 
 -ifdef(TEST).
@@ -39,62 +40,29 @@
 description() ->
     "AuthZ with http".
 
-init(Source) ->
-    case emqx_authz_utils:create_resource(emqx_connector_http, Source) of
+init(Config) ->
+    NConfig = parse_config(Config),
+    case emqx_authz_utils:create_resource(emqx_connector_http, NConfig) of
         {error, Reason} -> error({load_config_error, Reason});
-        {ok, Id} -> Source#{annotations => #{id => Id}}
+        {ok, Id} -> NConfig#{annotations => #{id => Id}}
     end.
 
 destroy(#{annotations := #{id := Id}}) ->
     ok = emqx_resource:remove_local(Id).
 
-dry_run(Source) ->
-    emqx_resource:create_dry_run_local(emqx_connector_http, Source).
-
-authorize(Client, PubSub, Topic,
-            #{type := http,
-              query := Query,
-              path := Path,
-              headers := Headers,
-              method := Method,
-              request_timeout := RequestTimeout,
-              annotations := #{id := ResourceID}
-             } = Source) ->
-    Request = case Method of
-                  get ->
-                      Path1 = replvar(
-                                Path ++ "?" ++ Query,
-                                PubSub,
-                                Topic,
-                                maps:to_list(Client),
-                                fun var_uri_encode/1),
-
-                      {Path1, maps:to_list(Headers)};
-
-                  _ ->
-                      Body0 = maps:get(body, Source, #{}),
-                      Body1 = replvar_deep(
-                                Body0,
-                                PubSub,
-                                Topic,
-                                maps:to_list(Client),
-                                fun var_bin_encode/1),
-
-                      Body2 = serialize_body(
-                                maps:get(<<"content-type">>, Headers, <<"application/json">>),
-                                Body1),
-
-                      Path1 = replvar(
-                                Path,
-                                PubSub,
-                                Topic,
-                                maps:to_list(Client),
-                                fun var_uri_encode/1),
-
-                      {Path1, maps:to_list(Headers), Body2}
-              end,
-    HttpResult = emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}),
-    case HttpResult of
+dry_run(Config) ->
+    emqx_resource:create_dry_run_local(emqx_connector_http, parse_config(Config)).
+
+authorize( Client
+         , PubSub
+         , Topic
+         , #{ type            := http
+            , annotations     := #{id := ResourceID}
+            , method := Method
+            , request_timeout := RequestTimeout
+            } = Config) ->
+    Request = generate_request(PubSub, Topic, Client, Config),
+    case emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}) of
         {ok, 200, _Headers} ->
             {matched, allow};
         {ok, 204, _Headers} ->
@@ -112,12 +80,75 @@ authorize(Client, PubSub, Topic,
             ignore
     end.
 
+parse_config(#{ url := URL
+              , method := Method
+              , headers := Headers
+              , request_timeout := ReqTimeout
+              } = Conf) ->
+    {BaseURLWithPath, Query} = parse_fullpath(URL),
+    BaseURLMap = parse_url(BaseURLWithPath),
+    Conf#{ method          => Method
+         , base_url        => maps:remove(query, BaseURLMap)
+         , base_query      => cow_qs:parse_qs(bin(Query))
+         , body            => maps:get(body, Conf, #{})
+         , headers         => Headers
+         , request_timeout => ReqTimeout
+         }.
+
+parse_fullpath(RawURL) ->
+    cow_http:parse_fullpath(bin(RawURL)).
+
+parse_url(URL)
+  when URL =:= undefined ->
+    #{};
+parse_url(URL) ->
+    {ok, URIMap} = emqx_http_lib:uri_parse(URL),
+    case maps:get(query, URIMap, undefined) of
+        undefined ->
+            URIMap#{query => ""};
+        _ ->
+            URIMap
+    end.
+
+generate_request( PubSub
+                , Topic
+                , Client
+                , #{ method := Method
+                   , base_url := #{path := Path}
+                   , base_query := BaseQuery
+                   , headers := Headers
+                   , body := Body0
+                   }) ->
+    Body = replace_placeholders(maps:to_list(Body0), PubSub, Topic, Client),
+    NBaseQuery = replace_placeholders(BaseQuery, PubSub, Topic, Client),
+    case Method of
+        get  ->
+            NPath = append_query(Path, NBaseQuery ++ Body),
+            {NPath, Headers};
+        _ ->
+            NPath = append_query(Path, NBaseQuery),
+            NBody = serialize_body(
+                      proplists:get_value(<<"Accept">>, Headers, <<"application/json">>),
+                      Body
+                     ),
+            {NPath, Headers, NBody}
+    end.
+
+append_query(Path, []) ->
+    Path;
+append_query(Path, Query) ->
+    Path ++ "?" ++ binary_to_list(query_string(Query)).
+
 query_string(Body) ->
-    query_string(maps:to_list(Body), []).
+    query_string(Body, []).
 
 query_string([], Acc) ->
-    <<$&, Str/binary>> = iolist_to_binary(lists:reverse(Acc)),
-    Str;
+    case iolist_to_binary(lists:reverse(Acc)) of
+        <<$&, Str/binary>> ->
+            Str;
+        <<>> ->
+            <<>>
+    end;
 query_string([{K, V} | More], Acc) ->
     query_string( More
                 , [ ["&", emqx_http_lib:uri_encode(K), "=", emqx_http_lib:uri_encode(V)]
@@ -128,67 +159,36 @@ serialize_body(<<"application/json">>, Body) ->
 serialize_body(<<"application/x-www-form-urlencoded">>, Body) ->
     query_string(Body).
 
+replace_placeholders(KVs, PubSub, Topic, Client) ->
+    replace_placeholders(KVs, PubSub, Topic, Client, []).
+
+replace_placeholders([], _PubSub, _Topic, _Client, Acc) ->
+    lists:reverse(Acc);
+replace_placeholders([{K, V0} | More], PubSub, Topic, Client, Acc) ->
+    case replace_placeholder(V0, PubSub, Topic, Client) of
+        undefined ->
+            error({cannot_get_variable, V0});
+        V ->
+            replace_placeholders(More, PubSub, Topic, Client, [{bin(K), bin(V)} | Acc])
+    end.
 
-replvar_deep(Map, PubSub, Topic, Vars, VarEncode) when is_map(Map) ->
-    maps:from_list(
-      lists:map(
-        fun({Key, Value}) ->
-                {replvar(Key, PubSub, Topic, Vars, VarEncode),
-                 replvar_deep(Value, PubSub, Topic, Vars, VarEncode)}
-        end,
-        maps:to_list(Map)));
-replvar_deep(List, PubSub, Topic, Vars, VarEncode) when is_list(List) ->
-    lists:map(
-      fun(Value) ->
-              replvar_deep(Value, PubSub, Topic, Vars, VarEncode)
-      end,
-      List);
-replvar_deep(Number, _PubSub, _Topic, _Vars, _VarEncode) when is_number(Number) ->
-    Number;
-replvar_deep(Binary, PubSub, Topic, Vars, VarEncode) when is_binary(Binary) ->
-    replvar(Binary, PubSub, Topic, Vars, VarEncode).
-
-replvar(Str0, PubSub, Topic, [], VarEncode) ->
-    NTopic = emqx_http_lib:uri_encode(Topic),
-    Str1 = re:replace(Str0, emqx_authz:ph_to_re(?PH_S_TOPIC),
-                      VarEncode(NTopic), [global, {return, binary}]),
-    re:replace(Str1, emqx_authz:ph_to_re(?PH_S_ACTION),
-               VarEncode(PubSub), [global, {return, binary}]);
-
-
-replvar(Str, PubSub, Topic, [{username, Username} | Rest], VarEncode) ->
-    Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_USERNAME),
-                      VarEncode(Username), [global, {return, binary}]),
-    replvar(Str1, PubSub, Topic, Rest, VarEncode);
-
-replvar(Str, PubSub, Topic, [{clientid, Clientid} | Rest], VarEncode) ->
-    Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_CLIENTID),
-                      VarEncode(Clientid), [global, {return, binary}]),
-    replvar(Str1, PubSub, Topic, Rest, VarEncode);
-
-replvar(Str, PubSub, Topic, [{peerhost, IpAddress}  | Rest], VarEncode) ->
-    Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_PEERHOST),
-                      VarEncode(inet_parse:ntoa(IpAddress)), [global, {return, binary}]),
-    replvar(Str1, PubSub, Topic, Rest, VarEncode);
-
-replvar(Str, PubSub, Topic, [{protocol, Protocol} | Rest], VarEncode) ->
-    Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_PROTONAME),
-                      VarEncode(Protocol), [global, {return, binary}]),
-    replvar(Str1, PubSub, Topic, Rest, VarEncode);
-
-replvar(Str, PubSub, Topic, [{mountpoint, Mountpoint} | Rest], VarEncode) ->
-    Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_MOUNTPOINT),
-                      VarEncode(Mountpoint), [global, {return, binary}]),
-    replvar(Str1, PubSub, Topic, Rest, VarEncode);
-
-replvar(Str, PubSub, Topic, [_Unknown | Rest], VarEncode) ->
-    replvar(Str, PubSub, Topic, Rest, VarEncode).
-
-var_uri_encode(S) ->
-    emqx_http_lib:uri_encode(bin(S)).
-
-var_bin_encode(S) ->
-    bin(S).
+replace_placeholder(?PH_USERNAME, _PubSub, _Topic, Client) ->
+    bin(maps:get(username, Client, undefined));
+replace_placeholder(?PH_CLIENTID, _PubSub, _Topic, Client) ->
+    bin(maps:get(clientid, Client, undefined));
+replace_placeholder(?PH_HOST,     _PubSub, _Topic, Client) ->
+    inet_parse:ntoa(maps:get(peerhost, Client, undefined));
+replace_placeholder(?PH_PROTONAME, _PubSub, _Topic, Client) ->
+    bin(maps:get(protocol, Client, undefined));
+replace_placeholder(?PH_MOUNTPOINT, _PubSub, _Topic, Client) ->
+    bin(maps:get(mountpoint, Client, undefined));
+replace_placeholder(?PH_TOPIC, _PubSub, Topic, _Client) ->
+    bin(emqx_http_lib:uri_encode(Topic));
+replace_placeholder(?PH_ACTION, PubSub, _Topic, _Client) ->
+    bin(PubSub);
+
+replace_placeholder(Constant, _, _, _) ->
+    Constant.
 
 bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
 bin(B) when is_binary(B) -> B;

+ 51 - 7
apps/emqx_authz/src/emqx_authz_schema.erl

@@ -17,6 +17,7 @@
 -module(emqx_authz_schema).
 
 -include_lib("typerefl/include/types.hrl").
+-include_lib("emqx_connector/include/emqx_connector.hrl").
 
 -reflect_type([ permission/0
               , action/0
@@ -28,9 +29,15 @@
 -export([ namespace/0
         , roots/0
         , fields/1
+        , validations/0
+        ]).
+
+-export([ headers_no_content_type/1
+        , headers/1
         ]).
 
 -import(emqx_schema, [mk_duration/2]).
+-include_lib("hocon/include/hoconsc.hrl").
 
 %%--------------------------------------------------------------------
 %% Hocon Schema
@@ -138,11 +145,10 @@ fields(redis_cluster) ->
 http_common_fields() ->
     [ {type,            #{type => http}}
     , {enable,          #{type => boolean(), default => true}}
+    , {url,             fun url/1}
     , {request_timeout, mk_duration("request timeout", #{default => "30s"})}
     , {body,            #{type => map(), nullable => true}}
-    , {path,            #{type => string(), default => ""}}
-    , {query,           #{type => string(), default => ""}}
-    ] ++ emqx_connector_http:fields(config).
+    ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)).
 
 mongo_common_fields() ->
     [ {collection, #{type => atom()}}
@@ -152,22 +158,32 @@ mongo_common_fields() ->
                  default => true}}
     ].
 
-headers(type) -> map();
+validations() ->
+    [ {check_ssl_opts, fun check_ssl_opts/1}
+    , {check_headers, fun check_headers/1}
+    ].
+
+headers(type) -> list({binary(), binary()});
 headers(converter) ->
     fun(Headers) ->
-       maps:merge(default_headers(), transform_header_name(Headers))
+        maps:to_list(maps:merge(default_headers(), transform_header_name(Headers)))
     end;
 headers(default) -> default_headers();
 headers(_) -> undefined.
 
-headers_no_content_type(type) -> map();
+headers_no_content_type(type) -> list({binary(), binary()});
 headers_no_content_type(converter) ->
     fun(Headers) ->
-       maps:merge(default_headers_no_content_type(), transform_header_name(Headers))
+       maps:to_list(maps:merge(default_headers_no_content_type(), transform_header_name(Headers)))
     end;
 headers_no_content_type(default) -> default_headers_no_content_type();
 headers_no_content_type(_) -> undefined.
 
+url(type) -> binary();
+url(validator) -> [?NOT_EMPTY("the value of the field 'url' cannot be empty")];
+url(nullable) -> false;
+url(_) -> undefined.
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
@@ -190,6 +206,28 @@ transform_header_name(Headers) ->
                       maps:put(K, V, Acc)
               end, #{}, Headers).
 
+check_ssl_opts(Conf)
+  when Conf =:= #{} ->
+    true;
+check_ssl_opts(Conf) ->
+    case emqx_authz_http:parse_url(hocon_schema:get_value("config.url", Conf)) of
+        #{scheme := https} ->
+            case hocon_schema:get_value("config.ssl.enable", Conf) of
+                true -> ok;
+                false -> false
+            end;
+        #{scheme := http} ->
+            ok
+    end.
+
+check_headers(Conf)
+    when Conf =:= #{} ->
+    true;
+check_headers(Conf) ->
+    Method = to_bin(hocon_schema:get_value("config.method", Conf)),
+    Headers = hocon_schema:get_value("config.headers", Conf),
+    Method =:= <<"post">> orelse (not lists:member(<<"content-type">>, Headers)).
+
 union_array(Item) when is_list(Item) ->
     hoconsc:array(hoconsc:union(Item)).
 
@@ -225,3 +263,9 @@ to_list(A) when is_atom(A) ->
 to_list(B) when is_binary(B) ->
     binary_to_list(B).
 
+to_bin(A) when is_atom(A) ->
+    atom_to_binary(A);
+to_bin(B) when is_binary(B) ->
+    B;
+to_bin(L) when is_list(L) ->
+    list_to_binary(L).

+ 1 - 3
apps/emqx_authz/test/emqx_authz_SUITE.erl

@@ -64,9 +64,7 @@ set_special_configs(_App) ->
 
 -define(SOURCE1, #{<<"type">> => <<"http">>,
                    <<"enable">> => true,
-                   <<"base_url">> => <<"https://example.com:443/">>,
-                   <<"path">> => <<"a/b">>,
-                   <<"query">> => <<"c=d">>,
+                   <<"url">> => <<"https://example.com:443/a/b?c=d">>,
                    <<"headers">> => #{},
                    <<"method">> => <<"get">>,
                    <<"request_timeout">> => 5000

+ 1 - 3
apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl

@@ -30,9 +30,7 @@
 
 -define(SOURCE1, #{<<"type">> => <<"http">>,
                    <<"enable">> => true,
-                   <<"base_url">> => <<"https://fake.com:443/">>,
-                   <<"path">> => <<"foo">>,
-                   <<"query">> => <<"a=b">>,
+                   <<"url">> => <<"https://fake.com:443/acl?username=", ?PH_USERNAME/binary>>,
                    <<"headers">> => #{},
                    <<"method">> => <<"get">>,
                    <<"request_timeout">> => <<"5s">>

+ 27 - 90
apps/emqx_authz/test/emqx_authz_http_SUITE.erl

@@ -156,49 +156,14 @@ t_query_params(_Config) ->
                    Req = cowboy_req:reply(200, Req0),
                    {ok, Req, State}
            end,
-           #{<<"query">> => <<"username=${username}&"
-                             "clientid=${clientid}&"
-                             "peerhost=${peerhost}&"
-                             "proto_name=${proto_name}&"
-                             "mountpoint=${mountpoint}&"
-                             "topic=${topic}&"
-                             "action=${action}">>
-            }),
-
-    ClientInfo = #{clientid => <<"client id">>,
-                   username => <<"user name">>,
-                   peerhost => {127,0,0,1},
-                   protocol => <<"MQTT">>,
-                   mountpoint => <<"MOUNTPOINT">>,
-                   zone => default,
-                   listener => {tcp, default}
-                  },
-
-    ?assertEqual(
-        allow,
-        emqx_access_control:authorize(ClientInfo, publish, <<"t">>)).
-
-t_path_params(_Config) ->
-    ok = setup_handler_and_config(
-           fun(Req0, State) ->
-                   <<"/authz/"
-                     "username/user%20name/"
-                     "clientid/client%20id/"
-                     "peerhost/127.0.0.1/"
-                     "proto_name/MQTT/"
-                     "mountpoint/MOUNTPOINT/"
-                     "topic/t/"
-                     "action/publish">> = cowboy_req:path(Req0),
-                   Req = cowboy_req:reply(200, Req0),
-                   {ok, Req, State}
-           end,
-           #{<<"path">> => <<"username/${username}/"
-                             "clientid/${clientid}/"
-                             "peerhost/${peerhost}/"
-                             "proto_name/${proto_name}/"
-                             "mountpoint/${mountpoint}/"
-                             "topic/${topic}/"
-                             "action/${action}">>
+           #{<<"url">> => <<"http://127.0.0.1:33333/authz/users/?"
+                            "username=${username}&"
+                            "clientid=${clientid}&"
+                            "peerhost=${host}&"
+                            "proto_name=${proto_name}&"
+                            "mountpoint=${mountpoint}&"
+                            "topic=${topic}&"
+                            "action=${action}">>
             }),
 
     ClientInfo = #{clientid => <<"client id">>,
@@ -218,23 +183,16 @@ t_json_body(_Config) ->
     ok = setup_handler_and_config(
            fun(Req0, State) ->
                    ?assertEqual(
-                      <<"/authz/"
-                        "username/user%20name/"
-                        "clientid/client%20id/"
-                        "peerhost/127.0.0.1/"
-                        "proto_name/MQTT/"
-                        "mountpoint/MOUNTPOINT/"
-                        "topic/t/"
-                        "action/publish">>,
+                      <<"/authz/users/">>,
                       cowboy_req:path(Req0)),
 
                    {ok, RawBody, Req1} = cowboy_req:read_body(Req0),
 
                    ?assertMatch(
                       #{<<"username">> := <<"user name">>,
-                        <<"CLIENT_client id">> := <<"client id">>,
-                        <<"peerhost">> := [<<"127.0.0.1">>, 1],
-                        <<"proto_name">> := #{<<"proto">> := <<"MQTT">>},
+                        <<"CLIENT">> := <<"client id">>,
+                        <<"peerhost">> := <<"127.0.0.1">>,
+                        <<"proto_name">> := <<"MQTT">>,
                         <<"mountpoint">> := <<"MOUNTPOINT">>,
                         <<"topic">> := <<"t">>,
                         <<"action">> := <<"publish">>},
@@ -244,17 +202,10 @@ t_json_body(_Config) ->
                    {ok, Req, State}
            end,
            #{<<"method">> => <<"post">>,
-             <<"path">> => <<"username/${username}/"
-                             "clientid/${clientid}/"
-                             "peerhost/${peerhost}/"
-                             "proto_name/${proto_name}/"
-                             "mountpoint/${mountpoint}/"
-                             "topic/${topic}/"
-                             "action/${action}">>,
              <<"body">> => #{<<"username">> => <<"${username}">>,
-                             <<"CLIENT_${clientid}">> => <<"${clientid}">>,
-                             <<"peerhost">> => [<<"${peerhost}">>, 1],
-                             <<"proto_name">> => #{<<"proto">> => <<"${proto_name}">>},
+                             <<"CLIENT">> => <<"${clientid}">>,
+                             <<"peerhost">> => <<"${host}">>,
+                             <<"proto_name">> => <<"${proto_name}">>,
                              <<"mountpoint">> => <<"${mountpoint}">>,
                              <<"topic">> => <<"${topic}">>,
                              <<"action">> => <<"${action}">>}
@@ -278,17 +229,10 @@ t_form_body(_Config) ->
     ok = setup_handler_and_config(
            fun(Req0, State) ->
                    ?assertEqual(
-                      <<"/authz/"
-                        "username/user%20name/"
-                        "clientid/client%20id/"
-                        "peerhost/127.0.0.1/"
-                        "proto_name/MQTT/"
-                        "mountpoint/MOUNTPOINT/"
-                        "topic/t/"
-                        "action/publish">>,
+                      <<"/authz/users/">>,
                       cowboy_req:path(Req0)),
-                    
-                   {ok, PostVars, Req1} = cowboy_req:read_urlencoded_body(Req0),
+
+                   {ok, [{PostVars, true}], Req1} = cowboy_req:read_urlencoded_body(Req0),
 
                    ?assertMatch(
                       #{<<"username">> := <<"user name">>,
@@ -298,22 +242,15 @@ t_form_body(_Config) ->
                         <<"mountpoint">> := <<"MOUNTPOINT">>,
                         <<"topic">> := <<"t">>,
                         <<"action">> := <<"publish">>},
-                      maps:from_list(PostVars)),
+                     jiffy:decode(PostVars, [return_maps])),
 
                    Req = cowboy_req:reply(200, Req1),
                    {ok, Req, State}
            end,
            #{<<"method">> => <<"post">>,
-             <<"path">> => <<"username/${username}/"
-                             "clientid/${clientid}/"
-                             "peerhost/${peerhost}/"
-                             "proto_name/${proto_name}/"
-                             "mountpoint/${mountpoint}/"
-                             "topic/${topic}/"
-                             "action/${action}">>,
              <<"body">> => #{<<"username">> => <<"${username}">>,
                              <<"clientid">> => <<"${clientid}">>,
-                             <<"peerhost">> => <<"${peerhost}">>,
+                             <<"peerhost">> => <<"${host}">>,
                              <<"proto_name">> => <<"${proto_name}">>,
                              <<"mountpoint">> => <<"${mountpoint}">>,
                              <<"topic">> => <<"${topic}">>,
@@ -349,7 +286,8 @@ t_create_replace(_Config) ->
                    Req = cowboy_req:reply(200, Req0),
                    {ok, Req, State}
            end,
-           #{<<"base_url">> => <<"http://127.0.0.1:33333/authz">>}),
+           #{<<"url">> =>
+                 <<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}">>}),
 
     ?assertEqual(
         allow,
@@ -358,7 +296,8 @@ t_create_replace(_Config) ->
     %% Changing to other bad config does not work
     BadConfig = maps:merge(
                   raw_http_authz_config(),
-                  #{<<"base_url">> => <<"http://127.0.0.1:33332/authz">>}),
+                  #{<<"url">> =>
+                        <<"http://127.0.0.1:33332/authz/users/?topic=${topic}&action=${action}">>}),
 
     ?assertMatch(
         {error, _},
@@ -371,7 +310,8 @@ t_create_replace(_Config) ->
     %% Changing to valid config
     OkConfig = maps:merge(
                   raw_http_authz_config(),
-                  #{<<"base_url">> => <<"http://127.0.0.1:33333/authz">>}),
+                 #{<<"url">> =>
+                       <<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}">>}),
 
     ?assertMatch(
         {ok, _},
@@ -388,12 +328,9 @@ t_create_replace(_Config) ->
 raw_http_authz_config() ->
     #{
         <<"enable">> => <<"true">>,
-
         <<"type">> => <<"http">>,
         <<"method">> => <<"get">>,
-        <<"base_url">> => <<"http://127.0.0.1:33333/authz">>,
-        <<"path">> => <<"users/${username}/">>,
-        <<"query">> => <<"topic=${topic}&action=${action}">>,
+        <<"url">> => <<"http://127.0.0.1:33333/authz/users/?topic=${topic}&action=${action}">>,
         <<"headers">> => #{<<"X-Test-Header">> => <<"Test Value">>}
     }.
 

+ 20 - 9
apps/emqx_connector/src/emqx_connector_http.erl

@@ -262,11 +262,20 @@ preprocess_request(#{
      , request_timeout => maps:get(request_timeout, Req, 30000)
      }.
 
-preproc_headers(Headers) ->
+preproc_headers(Headers) when is_map(Headers) ->
     maps:fold(fun(K, V, Acc) ->
-            Acc#{emqx_plugin_libs_rule:preproc_tmpl(bin(K)) =>
-                 emqx_plugin_libs_rule:preproc_tmpl(bin(V))}
-        end, #{}, Headers).
+            [{
+                emqx_plugin_libs_rule:preproc_tmpl(bin(K)),
+                emqx_plugin_libs_rule:preproc_tmpl(bin(V))
+            } | Acc]
+        end, [], Headers);
+preproc_headers(Headers) when is_list(Headers) ->
+    lists:map(fun({K, V}) ->
+        {
+            emqx_plugin_libs_rule:preproc_tmpl(bin(K)),
+            emqx_plugin_libs_rule:preproc_tmpl(bin(V))
+        }
+              end, Headers).
 
 process_request(#{
             method := MethodTks,
@@ -278,7 +287,7 @@ process_request(#{
     Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg))
          , path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg)
          , body => process_request_body(BodyTks, Msg)
-         , headers => maps:to_list(proc_headers(HeadersTks, Msg))
+         , headers => proc_headers(HeadersTks, Msg)
          , request_timeout => ReqTimeout
          }.
 
@@ -288,10 +297,12 @@ process_request_body(BodyTks, Msg) ->
     emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg).
 
 proc_headers(HeaderTks, Msg) ->
-    maps:fold(fun(K, V, Acc) ->
-            Acc#{emqx_plugin_libs_rule:proc_tmpl(K, Msg) =>
-                 emqx_plugin_libs_rule:proc_tmpl(V, Msg)}
-        end, #{}, HeaderTks).
+    lists:map(fun({K, V}) ->
+            {
+                emqx_plugin_libs_rule:proc_tmpl(K, Msg),
+                emqx_plugin_libs_rule:proc_tmpl(V, Msg)
+            }
+              end, HeaderTks).
 
 make_method(M) when M == <<"POST">>; M == <<"post">> -> post;
 make_method(M) when M == <<"PUT">>; M == <<"put">> -> put;

+ 11 - 10
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -221,27 +221,26 @@ t_mqtt_conn_bridge_ingress(_) ->
 
     %% ... and a MQTT bridge, using POST
     %% we bind this bridge to the connector created just now
+    timer:sleep(50),
     {ok, 201, Bridge} = request(post, uri(["bridges"]),
         ?MQTT_BRIDGE_INGRESS(ConnctorID)#{
             <<"type">> => ?CONNECTR_TYPE,
             <<"name">> => ?BRIDGE_NAME_INGRESS
         }),
-
     #{ <<"id">> := BridgeIDIngress
      , <<"type">> := <<"mqtt">>
-     , <<"status">> := <<"connected">>
      , <<"connector">> := ConnctorID
      } = jsx:decode(Bridge),
+    wait_for_resource_ready(BridgeIDIngress, 5),
 
     %% we now test if the bridge works as expected
-
     RemoteTopic = <<"remote_topic/1">>,
     LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
     Payload = <<"hello">>,
     emqx:subscribe(LocalTopic),
+    timer:sleep(100),
     %% PUBLISH a message to the 'remote' broker, as we have only one broker,
     %% the remote broker is also the local one.
-    wait_for_resource_ready(BridgeIDIngress, 5),
     emqx:publish(emqx_message:make(RemoteTopic, Payload)),
     %% we should receive a message on the local broker, with specified topic
     ?assert(
@@ -295,22 +294,21 @@ t_mqtt_conn_bridge_egress(_) ->
             <<"type">> => ?CONNECTR_TYPE,
             <<"name">> => ?BRIDGE_NAME_EGRESS
         }),
-
     #{ <<"id">> := BridgeIDEgress
      , <<"type">> := ?CONNECTR_TYPE
      , <<"name">> := ?BRIDGE_NAME_EGRESS
-     , <<"status">> := <<"connected">>
      , <<"connector">> := ConnctorID
      } = jsx:decode(Bridge),
+    wait_for_resource_ready(BridgeIDEgress, 5),
 
     %% we now test if the bridge works as expected
     LocalTopic = <<"local_topic/1">>,
     RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
     Payload = <<"hello">>,
     emqx:subscribe(RemoteTopic),
+    timer:sleep(100),
     %% PUBLISH a message to the 'local' broker, as we have only one broker,
     %% the remote broker is also the local one.
-    wait_for_resource_ready(BridgeIDEgress, 5),
     emqx:publish(emqx_message:make(LocalTopic, Payload)),
 
     %% we should receive a message on the "remote" broker, with specified topic
@@ -369,10 +367,9 @@ t_mqtt_conn_update(_) ->
     #{ <<"id">> := BridgeIDEgress
      , <<"type">> := <<"mqtt">>
      , <<"name">> := ?BRIDGE_NAME_EGRESS
-     , <<"status">> := <<"connected">>
      , <<"connector">> := ConnctorID
      } = jsx:decode(Bridge),
-    wait_for_resource_ready(BridgeIDEgress, 2),
+    wait_for_resource_ready(BridgeIDEgress, 5),
 
     %% then we try to update 'server' of the connector, to an unavailable IP address
     %% the update should fail because of 'unreachable' or 'connrefused'
@@ -424,6 +421,7 @@ t_mqtt_conn_update2(_) ->
     {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
                                  ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)),
     {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
+    wait_for_resource_ready(BridgeIDEgress, 5),
     ?assertMatch(#{ <<"id">> := BridgeIDEgress
                   , <<"status">> := <<"connected">>
                   }, jsx:decode(BridgeStr)),
@@ -454,7 +452,7 @@ t_mqtt_conn_update3(_) ->
     #{ <<"id">> := BridgeIDEgress
      , <<"connector">> := ConnctorID
      } = jsx:decode(Bridge),
-    wait_for_resource_ready(BridgeIDEgress, 2),
+    wait_for_resource_ready(BridgeIDEgress, 5),
 
     %% delete the connector should fail because it is in use by a bridge
     {ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []),
@@ -505,6 +503,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
     LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
     Payload = <<"hello">>,
     emqx:subscribe(LocalTopic),
+    timer:sleep(100),
     %% PUBLISH a message to the 'remote' broker, as we have only one broker,
     %% the remote broker is also the local one.
     wait_for_resource_ready(BridgeIDIngress, 5),
@@ -570,6 +569,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
     RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
     Payload = <<"hello">>,
     emqx:subscribe(RemoteTopic),
+    timer:sleep(100),
     %% PUBLISH a message to the 'local' broker, as we have only one broker,
     %% the remote broker is also the local one.
     wait_for_resource_ready(BridgeIDEgress, 5),
@@ -593,6 +593,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
     RuleTopic = <<"t/1">>,
     RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>,
     emqx:subscribe(RemoteTopic2),
+    timer:sleep(100),
     wait_for_resource_ready(BridgeIDEgress, 5),
     emqx:publish(emqx_message:make(RuleTopic, Payload2)),
     {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),

+ 1 - 0
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -436,6 +436,7 @@ typename_to_spec("timeout()", _Mod) -> #{<<"oneOf">> => [#{type => string, examp
 typename_to_spec("bytesize()", _Mod) -> #{type => string, example => <<"32MB">>};
 typename_to_spec("wordsize()", _Mod) -> #{type => string, example => <<"1024KB">>};
 typename_to_spec("map()", _Mod) -> #{type => object, example => #{}};
+typename_to_spec("{binary(), binary()}", _Mod) -> #{type => object, example => #{}};
 typename_to_spec("comma_separated_list()", _Mod) ->
     #{type => string, example => <<"item1,item2">>};
 typename_to_spec("comma_separated_atoms()", _Mod) ->

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway_utils.erl

@@ -70,7 +70,7 @@
 -define(DEFAULT_IDLE_TIMEOUT, 30000).
 -define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}).
 -define(DEFAULT_OOM_POLICY, #{max_heap_size => 4194304,
-                              message_queue_len => 32000}).
+                              max_message_queue_len => 32000}).
 
 -elvis([{elvis_style, god_modules, disable}]).
 

+ 1 - 1
apps/emqx_modules/src/emqx_topic_metrics_api.erl

@@ -235,7 +235,7 @@ accumulate_metrics(TopicMetricsIn, TopicMetricsAcc) ->
 
 %% @doc MetricsIn :: #{'messages.dropped.rate' :: integer(), ...}
 do_accumulation_metrics(MetricsIn, undefined) -> MetricsIn;
-do_accumulation_metrics(MetricsIn, MetricsAcc) ->
+do_accumulation_metrics(MetricsIn, {MetricsAcc, _}) ->
     Keys = maps:keys(MetricsIn),
     lists:foldl(fun(Key, Acc) ->
         InVal = maps:get(Key, MetricsIn),

+ 63 - 1
apps/emqx_prometheus/src/emqx_prometheus.erl

@@ -22,9 +22,11 @@
 %% be used by the prometheus application
 -behaviour(prometheus_collector).
 
+-include("emqx_prometheus.hrl").
+
 -include_lib("prometheus/include/prometheus.hrl").
 -include_lib("prometheus/include/prometheus_model.hrl").
-
+-include_lib("emqx/include/logger.hrl").
 
 -import(prometheus_model_helpers,
         [ create_mf/5
@@ -32,6 +34,16 @@
         , counter_metric/1
         ]).
 
+-export([ update/1
+        , start/0
+        , stop/0
+        , restart/0
+        % for rpc
+        , do_start/0
+        , do_stop/0
+        , do_restart/0
+        ]).
+
 %% APIs
 -export([start_link/1]).
 
@@ -58,6 +70,56 @@
 
 -record(state, {push_gateway, timer, interval}).
 
+%%--------------------------------------------------------------------
+%% update new config
+update(Config) ->
+    case emqx_conf:update([prometheus], Config,
+                            #{rawconf_with_defaults => true, override_to => cluster}) of
+        {ok, #{raw_config := NewConfigRows}} ->
+            case maps:get(<<"enable">>, Config, true) of
+                true ->
+                    ok = restart();
+                false ->
+                    ok = stop()
+            end,
+            {ok, NewConfigRows};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+start()   -> cluster_call(do_start, []).
+stop()    -> cluster_call(do_stop, []).
+restart() -> cluster_call(do_restart, []).
+
+do_start() ->
+    emqx_prometheus_sup:start_child(?APP, emqx_conf:get([prometheus])).
+
+do_stop() ->
+    case emqx_prometheus_sup:stop_child(?APP) of
+        ok ->
+            ok;
+        {error, not_found} ->
+            ok
+    end.
+
+do_restart() ->
+    ok = do_stop(),
+    ok = do_start(),
+    ok.
+
+cluster_call(F, A) ->
+    [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()],
+    ok.
+
+rpc_call(N, F, A) ->
+    case rpc:call(N, ?MODULE, F, A, 5000) of
+        {badrpc, R} ->
+            ?LOG(error, "RPC Node: ~p ~p ~p failed, Reason: ~p", [N, ?MODULE, F, R]),
+            {error, {badrpc, R}};
+        Result ->
+            Result
+    end.
+
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------

+ 7 - 10
apps/emqx_prometheus/src/emqx_prometheus_api.erl

@@ -67,16 +67,13 @@ prometheus(get, _Params) ->
     {200, emqx:get_raw_config([<<"prometheus">>], #{})};
 
 prometheus(put, #{body := Body}) ->
-    {ok, Config} = emqx:update_config([prometheus], Body),
-    case maps:get(<<"enable">>, Body) of
-        true ->
-            _ = emqx_prometheus_sup:stop_child(?APP),
-            emqx_prometheus_sup:start_child(?APP, maps:get(config, Config));
-        false ->
-            _ = emqx_prometheus_sup:stop_child(?APP),
-            ok
-        end,
-    {200, emqx:get_raw_config([<<"prometheus">>], #{})}.
+    case emqx_prometheus:update(Body) of
+        {ok, NewConfig} ->
+            {200, NewConfig};
+        {error, Reason} ->
+            Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
+            {500, 'INTERNAL_ERROR', Message}
+    end.
 
 stats(get, #{headers := Headers}) ->
     Type =

+ 17 - 2
apps/emqx_retainer/src/emqx_retainer.erl

@@ -37,7 +37,9 @@
         , clean/0
         , delete/1
         , page_read/3
-        , post_config_update/5]).
+        , post_config_update/5
+        , stats_fun/0
+        ]).
 
 %% gen_server callbacks
 -export([ init/1
@@ -69,6 +71,7 @@
 -callback match_messages(context(), topic(), cursor()) -> {ok, list(), cursor()}.
 -callback clear_expired(context()) -> ok.
 -callback clean(context()) -> ok.
+-callback size(context()) -> non_neg_integer().
 
 %%--------------------------------------------------------------------
 %% Hook API
@@ -185,6 +188,9 @@ post_config_update(_, _UpdateReq, NewConf, OldConf, _AppEnvs) ->
 call(Req) ->
     gen_server:call(?MODULE, Req, infinity).
 
+stats_fun() ->
+    gen_server:cast(?MODULE, ?FUNCTION_NAME).
+
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %%--------------------------------------------------------------------
@@ -226,6 +232,12 @@ handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     {reply, ignored, State}.
 
+handle_cast(stats_fun, #{context := Context} = State) ->
+    Mod = get_backend_module(),
+    Size = Mod:size(Context),
+    emqx_stats:setstat('retained.count', 'retained.max', Size),
+    {noreply, State};
+
 handle_cast(Msg, State) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
     {noreply, State}.
@@ -485,8 +497,11 @@ close_resource(_) ->
 load(Context) ->
     _ = emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Context]}),
     _ = emqx:hook('message.publish', {?MODULE, on_message_publish, [Context]}),
+    emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0),
     ok.
 
 unload() ->
     emqx:unhook('message.publish', {?MODULE, on_message_publish}),
-    emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}).
+    emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}),
+    emqx_stats:cancel_update(emqx_retainer_stats),
+    ok.

+ 7 - 2
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -30,7 +30,9 @@
         , page_read/4
         , match_messages/3
         , clear_expired/1
-        , clean/1]).
+        , clean/1
+        , size/1
+        ]).
 
 -export([create_resource/1]).
 
@@ -73,7 +75,6 @@ store_retained(_, Msg =#message{topic = Topic}) ->
     ExpiryTime = emqx_retainer:get_expiry_time(Msg),
     case is_table_full() of
         false ->
-            ok = emqx_metrics:inc('messages.retained'),
             mria:dirty_write(?TAB,
                              #retained{topic = topic2tokens(Topic),
                                        msg = Msg,
@@ -155,6 +156,10 @@ match_messages(_, Topic, Cursor) ->
 clean(_) ->
     _ = mria:clear_table(?TAB),
     ok.
+
+size(_) ->
+    table_size().
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------

+ 2 - 2
apps/emqx_slow_subs/etc/emqx_slow_subs.conf

@@ -12,8 +12,8 @@ emqx_slow_subs {
 
     ## The eviction time of the record, which in the statistics record table
     ##
-    ## Default: 5m
-    expire_interval = 5m
+    ## Default: 300ms
+    expire_interval = 300ms
 
     ## The maximum number of records in the slow subscription statistics record table
     ##

+ 58 - 0
apps/emqx_statsd/src/emqx_statsd.erl

@@ -25,6 +25,18 @@
 
 -include("emqx_statsd.hrl").
 
+-include_lib("emqx/include/logger.hrl").
+
+-export([ update/1
+        , start/0
+        , stop/0
+        , restart/0
+        %% for rpc
+        , do_start/0
+        , do_stop/0
+        , do_restart/0
+        ]).
+
 %% Interface
 -export([start_link/1]).
 
@@ -44,6 +56,52 @@
     estatsd_pid          :: pid()
 }).
 
+update(Config) ->
+    case emqx_conf:update([statsd],
+                            Config,
+                            #{rawconf_with_defaults => true, override_to => cluster}) of
+        {ok, #{raw_config := NewConfigRows}} ->
+            ok = stop(),
+            case maps:get(<<"enable">>, Config, true) of
+                true ->
+                    ok = start();
+                false ->
+                    ignore
+            end,
+            {ok, NewConfigRows};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+
+start()   -> cluster_call(do_start, []).
+stop()    -> cluster_call(do_stop, []).
+restart() -> cluster_call(do_restart, []).
+
+do_start() ->
+    emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{})).
+
+do_stop() ->
+    emqx_statsd_sup:ensure_child_stopped(?APP).
+
+do_restart() ->
+    ok = do_stop(),
+    ok = do_start(),
+    ok.
+
+cluster_call(F, A) ->
+    [ok = rpc_call(N, F, A) || N <- mria_mnesia:running_nodes()],
+    ok.
+
+rpc_call(N, F, A) ->
+    case rpc:call(N, ?MODULE, F, A, 5000) of
+        {badrpc, R} ->
+            ?LOG(error, "RPC Node: ~p ~p ~p failed, Reason: ~p", [N, ?MODULE, F, R]),
+            {error, {badrpc, R}};
+        Result ->
+            Result
+    end.
+
 start_link(Opts) ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
 

+ 2 - 11
apps/emqx_statsd/src/emqx_statsd_api.erl

@@ -55,17 +55,8 @@ statsd(get, _Params) ->
     {200, emqx:get_raw_config([<<"statsd">>], #{})};
 
 statsd(put, #{body := Body}) ->
-    case emqx:update_config([statsd],
-                            Body,
-                            #{rawconf_with_defaults => true, override_to => cluster}) of
-        {ok, #{raw_config := NewConfig, config := Config}} ->
-            ok = emqx_statsd_sup:ensure_child_stopped(?APP),
-            case maps:get(<<"enable">>, Body) of
-                true ->
-                    ok = emqx_statsd_sup:ensure_child_started(?APP, maps:get(config, Config));
-                false ->
-                    ok
-            end,
+    case emqx_statsd:update(Body) of
+        {ok, NewConfig} ->
             {200, NewConfig};
         {error, Reason} ->
             Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),

+ 6 - 6
rebar.config.erl

@@ -122,42 +122,42 @@ profiles() ->
        , {relx, relx(Vsn, cloud, bin, ce)}
        , {overrides, prod_overrides()}
        , {project_app_dirs, project_app_dirs(ce)}
-       , {post_hooks, [{compile, "./build emqx doc"}]}
+       , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx doc"}]}
        ]}
     , {'emqx-pkg',
        [ {erl_opts, prod_compile_opts()}
        , {relx, relx(Vsn, cloud, pkg, ce)}
        , {overrides, prod_overrides()}
        , {project_app_dirs, project_app_dirs(ce)}
-       , {post_hooks, [{compile, "./build emqx-pkg doc"}]}
+       , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-pkg doc"}]}
        ]}
     , {'emqx-enterprise',
        [ {erl_opts, prod_compile_opts()}
        , {relx, relx(Vsn, cloud, bin, ee)}
        , {overrides, prod_overrides()}
        , {project_app_dirs, project_app_dirs(ee)}
-       , {post_hooks, [{compile, "./build emqx-enterprise doc"}]}
+       , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-enterprise doc"}]}
        ]}
     , {'emqx-enterprise-pkg',
        [ {erl_opts, prod_compile_opts()}
        , {relx, relx(Vsn, cloud, pkg, ee)}
        , {overrides, prod_overrides()}
        , {project_app_dirs, project_app_dirs(ee)}
-       , {post_hooks, [{compile, "./build emqx-enterprise-pkg doc"}]}
+       , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-enterprise-pkg doc"}]}
        ]}
     , {'emqx-edge',
        [ {erl_opts, prod_compile_opts()}
        , {relx, relx(Vsn, edge, bin, ce)}
        , {overrides, prod_overrides()}
        , {project_app_dirs, project_app_dirs(ce)}
-       , {post_hooks, [{compile, "./build emqx-edge doc"}]}
+       , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-edge doc"}]}
        ]}
     , {'emqx-edge-pkg',
        [ {erl_opts, prod_compile_opts()}
        , {relx, relx(Vsn, edge, pkg, ce)}
        , {overrides, prod_overrides()}
        , {project_app_dirs, project_app_dirs(ce)}
-       , {post_hooks, [{compile, "./build emqx-edge-pkg doc"}]}
+       , {post_hooks, [{"(linux|darwin|solaris|freebsd|netbsd|openbsd)", compile, "./build emqx-edge-pkg doc"}]}
        ]}
     , {check,
        [ {erl_opts, common_compile_opts()}