Explorar o código

Merge pull request #4072 from emqx/dev/v4.3.0

Auto-pull-request-on-2021-01-25
Zaiming Shi %!s(int64=5) %!d(string=hai) anos
pai
achega
937390fe0f

+ 12 - 14
apps/emqx_auth_http/src/emqx_auth_http_app.erl

@@ -61,20 +61,7 @@ translate_env(EnvName) ->
                                             _ -> 80
                                             _ -> 80
                                         end),
                                         end),
             Path = path(Path0),
             Path = path(Path0),
-            Host = case inet:parse_address(Host0) of
-                       {ok, {_,_,_,_} = Addr} -> Addr;
-                       {ok, {_,_,_,_,_,_,_,_} = Addr} -> Addr;
-                       {error, einval} -> Host0
-                   end,
-            Inet = case Host of
-                       {_,_,_,_} -> inet;
-                       {_,_,_,_,_,_,_,_} -> inet6;
-                       _ ->
-                           case inet:getaddr(Host, inet6) of
-                               {error, _} -> inet;
-                               {ok, _} -> inet6
-                           end
-                   end,
+            {Inet, Host} = parse_host(Host0),
             MoreOpts = case Scheme of
             MoreOpts = case Scheme of
                         "http" ->
                         "http" ->
                             [{transport_opts, [Inet]}];
                             [{transport_opts, [Inet]}];
@@ -152,6 +139,17 @@ unload_hooks() ->
     ehttpc_sup:stop_pool('emqx_auth_http/acl_req'),
     ehttpc_sup:stop_pool('emqx_auth_http/acl_req'),
     ok.
     ok.
 
 
+parse_host(Host) ->
+    case inet:parse_address(Host) of
+        {ok, Addr} when size(Addr) =:= 4 -> {inet, Addr};
+        {ok, Addr} when size(Addr) =:= 8 -> {inet6, Addr};
+        {error, einval} ->
+            case inet:getaddr(Host, inet6) of
+                {ok, _} -> {inet6, Host};
+                {error, _} -> {inet, Host}
+            end
+    end.
+
 to_lower(Headers) ->
 to_lower(Headers) ->
     [{string:to_lower(K), V} || {K, V} <- Headers].
     [{string:to_lower(K), V} || {K, V} <- Headers].
 
 

+ 1 - 1
apps/emqx_auth_redis/rebar.config

@@ -1,5 +1,5 @@
 {deps,
 {deps,
- [{eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.3"}}}
+ [{eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.4"}}}
  ]}.
  ]}.
 
 
 {erl_opts, [warn_unused_vars,
 {erl_opts, [warn_unused_vars,

+ 1 - 1
apps/emqx_bridge_mqtt/priv/emqx_bridge_mqtt.schema

@@ -100,7 +100,7 @@
 
 
 {mapping, "bridge.mqtt.$name.retry_interval", "emqx_bridge_mqtt.bridges", [
 {mapping, "bridge.mqtt.$name.retry_interval", "emqx_bridge_mqtt.bridges", [
   {default, "20s"},
   {default, "20s"},
-  {datatype, {duration, ms}}
+  {datatype, {duration, s}}
 ]}.
 ]}.
 
 
 {mapping, "bridge.mqtt.$name.max_inflight", "emqx_bridge_mqtt.bridges", [
 {mapping, "bridge.mqtt.$name.max_inflight", "emqx_bridge_mqtt.bridges", [

+ 1 - 1
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl

@@ -759,7 +759,7 @@ options(Options, PoolName) ->
                   {username, str(Get(<<"username">>))},
                   {username, str(Get(<<"username">>))},
                   {password, str(Get(<<"password">>))},
                   {password, str(Get(<<"password">>))},
                   {proto_ver, mqtt_ver(Get(<<"proto_ver">>))},
                   {proto_ver, mqtt_ver(Get(<<"proto_ver">>))},
-                  {retry_interval, cuttlefish_duration:parse(str(GetD(<<"retry_interval">>, "30s")), ms)},
+                  {retry_interval, cuttlefish_duration:parse(str(GetD(<<"retry_interval">>, "30s")), s)},
                   {ssl, cuttlefish_flag:parse(str(Get(<<"ssl">>)))},
                   {ssl, cuttlefish_flag:parse(str(Get(<<"ssl">>)))},
                   {ssl_opts, [{versions, tls_versions()},
                   {ssl_opts, [{versions, tls_versions()},
                               {ciphers, ciphers(Get(<<"ciphers">>))},
                               {ciphers, ciphers(Get(<<"ciphers">>))},

+ 13 - 3
apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl

@@ -377,9 +377,14 @@ common(_StateName, {call, From}, {ensure_present, What, Topic}, State) ->
 common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) ->
 common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) ->
     {Result, NewState} = ensure_absent(What, Topic, State),
     {Result, NewState} = ensure_absent(What, Topic, State),
     {keep_state, NewState, [{reply, From, Result}]};
     {keep_state, NewState, [{reply, From, Result}]};
-common(_StateName, info, {deliver, _, Msg}, #{replayq := Q, if_record_metrics := IfRecordMetric} = State) ->
-    bridges_metrics_inc(IfRecordMetric, 'bridge.mqtt.message_received'),
-    NewQ = replayq:append(Q, collect([Msg])),
+common(_StateName, info, {deliver, _, Msg},
+       State = #{replayq := Q, if_record_metrics := IfRecordMetric}) ->
+    Msgs = collect([Msg]),
+    bridges_metrics_inc(IfRecordMetric,
+                        'bridge.mqtt.message_received',
+                        length(Msgs)
+                       ),
+    NewQ = replayq:append(Q, Msgs),
     {keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}};
     {keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}};
 common(_StateName, info, {'EXIT', _, _}, State) ->
 common(_StateName, info, {'EXIT', _, _}, State) ->
     {keep_state, State};
     {keep_state, State};
@@ -586,3 +591,8 @@ bridges_metrics_inc(true, Metric) ->
     emqx_metrics:inc(Metric);
     emqx_metrics:inc(Metric);
 bridges_metrics_inc(_IsRecordMetric, _Metric) ->
 bridges_metrics_inc(_IsRecordMetric, _Metric) ->
     ok.
     ok.
+
+bridges_metrics_inc(true, Metric, Value) ->
+    emqx_metrics:inc(Metric, Value);
+bridges_metrics_inc(_IsRecordMetric, _Metric, _Value) ->
+    ok.

+ 1 - 1
apps/emqx_exproto/src/emqx_exproto_channel.erl

@@ -255,7 +255,7 @@ handle_call(close, Channel) ->
 
 
 handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) ->
 handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) ->
     ?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]),
     ?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]),
-    {ok, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
+    {reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
 handle_call({auth, ClientInfo0, Password},
 handle_call({auth, ClientInfo0, Password},
             Channel = #channel{conninfo = ConnInfo,
             Channel = #channel{conninfo = ConnInfo,
                                clientinfo = ClientInfo}) ->
                                clientinfo = ClientInfo}) ->

+ 1 - 1
apps/emqx_recon/rebar.config

@@ -1,5 +1,5 @@
 {deps, [
 {deps, [
-  {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.0"}}}
+%% recon "https://github.com/ferd/recon" at root rebar.config
 ]}.
 ]}.
 
 
 {edoc_opts, [{preprocess, true}]}.
 {edoc_opts, [{preprocess, true}]}.

+ 1 - 2
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -297,8 +297,7 @@ do_update_resource(#{id := Id, type := Type, description:= NewDescription, confi
                                                       type = Type,
                                                       type = Type,
                                                       config = Config,
                                                       config = Config,
                                                       description = NewDescription,
                                                       description = NewDescription,
-                                                      created_at = erlang:system_time(millisecond)}),
-            cluster_call(clear_resource, [Module, Destroy, Id])
+                                                      created_at = erlang:system_time(millisecond)})
     end.
     end.
 
 
 -spec(start_resource(resource_id()) -> ok | {error, Reason :: term()}).
 -spec(start_resource(resource_id()) -> ok | {error, Reason :: term()}).

+ 14 - 16
apps/emqx_web_hook/src/emqx_web_hook_actions.erl

@@ -340,24 +340,11 @@ pool_opts(Params = #{<<"url">> := URL}) ->
                                   end),
                                   end),
     PoolSize = maps:get(<<"pool_size">>, Params, 32),
     PoolSize = maps:get(<<"pool_size">>, Params, 32),
     ConnectTimeout = timer:seconds(maps:get(<<"connect_timeout">>, Params, 5)),
     ConnectTimeout = timer:seconds(maps:get(<<"connect_timeout">>, Params, 5)),
-    Host = case inet:parse_address(Host0) of
-                       {ok, {_,_,_,_} = Addr} -> Addr;
-                       {ok, {_,_,_,_,_,_,_,_} = Addr} -> Addr;
-                       {error, einval} -> Host0
-                   end,
-    Inet = case Host of
-               {_,_,_,_} -> inet;
-               {_,_,_,_,_,_,_,_} -> inet6;
-               _ ->
-                   case inet:getaddr(Host, inet6) of
-                       {error, _} -> inet;
-                       {ok, _} -> inet6
-                   end
-           end,
+    {Inet, Host} = parse_host(Host0),
     MoreOpts = case Scheme of
     MoreOpts = case Scheme of
-                   <<"http">> ->
+                   "http" ->
                        [{transport_opts, [Inet]}];
                        [{transport_opts, [Inet]}];
-                   <<"https">> ->
+                   "https" ->
                        KeyFile = maps:get(<<"keyfile">>, Params),
                        KeyFile = maps:get(<<"keyfile">>, Params),
                        CertFile = maps:get(<<"certfile">>, Params),
                        CertFile = maps:get(<<"certfile">>, Params),
                        CACertFile = maps:get(<<"cacertfile">>, Params),
                        CACertFile = maps:get(<<"cacertfile">>, Params),
@@ -388,3 +375,14 @@ pool_opts(Params = #{<<"url">> := URL}) ->
 
 
 pool_name(ResId) ->
 pool_name(ResId) ->
     list_to_atom("webhook:" ++ str(ResId)).
     list_to_atom("webhook:" ++ str(ResId)).
+
+parse_host(Host) ->
+    case inet:parse_address(Host) of
+        {ok, Addr} when size(Addr) =:= 4 -> {inet, Addr};
+        {ok, Addr} when size(Addr) =:= 8 -> {inet6, Addr};
+        {error, einval} ->
+            case inet:getaddr(Host, inet6) of
+                {ok, _} -> {inet6, Host};
+                {error, _} -> {inet, Host}
+            end
+    end.

+ 13 - 15
apps/emqx_web_hook/src/emqx_web_hook_app.erl

@@ -58,20 +58,7 @@ translate_env() ->
                                       _ -> 80
                                       _ -> 80
                                   end),
                                   end),
     Path = path(Path0),
     Path = path(Path0),
-    Host = case inet:parse_address(Host0) of
-                       {ok, {_,_,_,_} = Addr} -> Addr;
-                       {ok, {_,_,_,_,_,_,_,_} = Addr} -> Addr;
-                       {error, einval} -> Host0
-                   end,
-    Inet = case Host of
-                       {_,_,_,_} -> inet;
-                       {_,_,_,_,_,_,_,_} -> inet6;
-                       _ ->
-                           case inet:getaddr(Host, inet6) of
-                               {error, _} -> inet;
-                               {ok, _} -> inet6
-                           end
-                   end,
+    {Inet, Host} = parse_host(Host0),
     PoolSize = application:get_env(?APP, pool_size, 32),
     PoolSize = application:get_env(?APP, pool_size, 32),
     MoreOpts = case Scheme of
     MoreOpts = case Scheme of
                    "http" ->
                    "http" ->
@@ -118,4 +105,15 @@ path(Path) ->
 
 
 set_content_type(Headers) ->
 set_content_type(Headers) ->
     NHeaders = proplists:delete(<<"Content-Type">>, proplists:delete(<<"content-type">>, Headers)),
     NHeaders = proplists:delete(<<"Content-Type">>, proplists:delete(<<"content-type">>, Headers)),
-    [{<<"content-type">>, <<"application/json">>} | NHeaders].
+    [{<<"content-type">>, <<"application/json">>} | NHeaders].
+
+parse_host(Host) ->
+    case inet:parse_address(Host) of
+        {ok, Addr} when size(Addr) =:= 4 -> {inet, Addr};
+        {ok, Addr} when size(Addr) =:= 8 -> {inet6, Addr};
+        {error, einval} ->
+            case inet:getaddr(Host, inet6) of
+                {ok, _} -> {inet6, Host};
+                {error, _} -> {inet, Host}
+            end
+    end.

+ 1 - 0
rebar.config

@@ -54,6 +54,7 @@
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
     , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}}
     , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}}
     , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}}
     , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}}
+    , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
     , {getopt, "1.0.1"}
     , {getopt, "1.0.1"}
     ]}.
     ]}.
 
 

+ 2 - 8
src/emqx_vm.erl

@@ -275,14 +275,8 @@ util_alloc()->
     alloc(?UTIL_ALLOCATORS).
     alloc(?UTIL_ALLOCATORS).
 
 
 alloc(Type) ->
 alloc(Type) ->
-    [{{T, Instance}, Props} || {{T, Instance}, Props} <- allocators(), lists:member(T, Type)].
-
-allocators() ->
-    UtilAllocators = erlang:system_info(alloc_util_allocators),
-    Allocators = [sys_alloc, mseg_alloc|UtilAllocators],
-    [{{A, N}, lists:sort(proplists:delete(versions, Props))} ||
-        A <- Allocators, Allocs <- [erlang:system_info({allocator, A})],
-            Allocs =/= false, {_, N, Props} <- Allocs].
+    [{{T, Instance}, Props} ||
+     {{T, Instance}, Props} <- recon_alloc:allocators(), lists:member(T, Type)].
 
 
 container_size(Prop, Keyword, Container) ->
 container_size(Prop, Keyword, Container) ->
     Sbcs = container_value(Prop, Keyword, sbcs, Container),
     Sbcs = container_value(Prop, Keyword, sbcs, Container),